This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4154b8d08c1 [FLINK-35240][Connectors][format] Disable
FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format (#24730)
4154b8d08c1 is described below
commit 4154b8d08c1a2b901b058b020d546de59ba6989d
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Jul 12 10:03:16 2024 +0800
[FLINK-35240][Connectors][format] Disable FLUSH_AFTER_WRITE_VALUE to avoid
flush per record for csv format (#24730)
* [FLINK-35240][Connectors][format] Disable FLUSH_AFTER_WRITE_VALUE to
avoid flush per record for csv format
* Address comment
* Add unit test
* address comment
* address comment
* rebase to master
---
.../apache/flink/formats/csv/CsvBulkWriter.java | 24 ++-
.../apache/flink/formats/csv/CsvBulkWriterIT.java | 208 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 +++
3 files changed, 252 insertions(+), 8 deletions(-)
diff --git
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
index f9a8e01bb07..1d7ea6a6b0d 100644
---
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
+++
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
@@ -21,10 +21,12 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.jackson.JacksonMapperFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
@@ -40,7 +42,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
private final FSDataOutputStream stream;
private final Converter<T, R, C> converter;
@Nullable private final C converterContext;
- private final ObjectWriter csvWriter;
+ private final JsonGenerator generator;
CsvBulkWriter(
CsvMapper mapper,
@@ -51,13 +53,18 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
checkNotNull(mapper);
checkNotNull(schema);
+ // Prevent Jackson's writeValue() method calls from closing the stream.
+ mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE);
+
this.converter = checkNotNull(converter);
this.stream = checkNotNull(stream);
this.converterContext = converterContext;
- this.csvWriter = mapper.writer(schema);
-
- // Prevent Jackson's writeValue() method calls from closing the stream.
- mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ try {
+ this.generator = mapper.writer(schema).createGenerator(stream,
JsonEncoding.UTF8);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not create CSV generator.",
e);
+ }
}
/**
@@ -98,16 +105,17 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
@Override
public void addElement(T element) throws IOException {
final R r = converter.convert(element, converterContext);
- csvWriter.writeValue(stream, r);
+ generator.writeObject(r);
}
@Override
public void flush() throws IOException {
- stream.flush();
+ generator.flush();
}
@Override
public void finish() throws IOException {
+ generator.close();
stream.sync();
}
}
diff --git
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
new file mode 100644
index 00000000000..aa070b22aba
--- /dev/null
+++
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CsvBulkWriterIT {
+
+ @TempDir File outDir;
+
+ /**
+ * FLINK-35240 : Verifies that Jackson CSV writer does not flush per
record but waits for a
+ * flush signal from Flink.
+ */
+ @Test
+ public void testNoDataIsWrittenBeforeFlinkFlush() throws Exception {
+
+ Configuration config = new Configuration();
+ config.set(
+ RestartStrategyOptions.RESTART_STRATEGY,
+
RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+
+ // Workaround serialization limitations
+ File outDirRef = new File(outDir.getAbsolutePath());
+
+ FileSink<Pojo> sink =
+ FileSink.forBulkFormat(
+ new
org.apache.flink.core.fs.Path(outDir.getAbsolutePath()),
+ out -> {
+ FSDataOutputStreamWrapper
outputStreamWrapper =
+ new FSDataOutputStreamWrapper(out);
+ return new CsvBulkWriterWrapper<>(
+ CsvBulkWriter.forPojo(Pojo.class,
outputStreamWrapper),
+ outputStreamWrapper,
+ outDirRef);
+ })
+ .build();
+
+ List<Pojo> integers = Arrays.asList(new Pojo(1), new Pojo(2));
+ DataGeneratorSource<Pojo> generatorSource =
+ TestDataGenerators.fromDataWithSnapshotsLatch(
+ integers, TypeInformation.of(Pojo.class));
+ env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(),
"").sinkTo(sink);
+ env.execute();
+ assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2",
"1", "2");
+ }
+
+ private static class CsvBulkWriterWrapper<T> implements BulkWriter<T> {
+
+ private static int addedElements = 0;
+
+ private static int expectedFlushedElements = 0;
+
+ private final CsvBulkWriter<T, ?, ?> csvBulkWriter;
+
+ private final File outDir;
+
+ private final FSDataOutputStreamWrapper stream;
+
+ CsvBulkWriterWrapper(
+ CsvBulkWriter<T, ?, ?> csvBulkWriter,
+ FSDataOutputStreamWrapper stream,
+ File outDir) {
+ this.csvBulkWriter = csvBulkWriter;
+ this.stream = stream;
+ this.outDir = outDir;
+ }
+
+ @Override
+ public void addElement(T element) throws IOException {
+ addedElements++;
+ csvBulkWriter.addElement(element);
+
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ csvBulkWriter.flush();
+ expectedFlushedElements = addedElements;
+
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ csvBulkWriter.finish();
+ // The stream should not be closed by the CsvBulkWriter.finish()
method
+ assertThat(stream.closed).isFalse();
+ }
+ }
+
+ private static class FSDataOutputStreamWrapper extends FSDataOutputStream {
+
+ private boolean closed = false;
+
+ private final FSDataOutputStream stream;
+
+ FSDataOutputStreamWrapper(FSDataOutputStream stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ stream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ stream.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ stream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ closed = true;
+ }
+ }
+
+ public static class Pojo {
+ public long x;
+
+ public Pojo(long x) {
+ this.x = x;
+ }
+
+ public Pojo() {}
+ }
+
+ private static List<String> getResultsFromSinkFiles(File outDir) throws
IOException {
+ final Map<File, String> contents = getFileContentByPath(outDir);
+ return contents.entrySet().stream()
+ .flatMap(e -> Arrays.stream(e.getValue().split("\n")))
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ }
+
+ private static Map<File, String> getFileContentByPath(File directory)
throws IOException {
+ Map<File, String> contents = new HashMap<>();
+
+ final Collection<File> filesInBucket = FileUtils.listFiles(directory,
null, true);
+ for (File file : filesInBucket) {
+ contents.put(file, FileUtils.readFileToString(file));
+ }
+ return contents;
+ }
+}
diff --git a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties
b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..835c2ec9a3d
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n