This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4154b8d08c1 [FLINK-35240][Connectors][format] Disable 
FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format (#24730)
4154b8d08c1 is described below

commit 4154b8d08c1a2b901b058b020d546de59ba6989d
Author: gongzhongqiang <[email protected]>
AuthorDate: Fri Jul 12 10:03:16 2024 +0800

    [FLINK-35240][Connectors][format] Disable FLUSH_AFTER_WRITE_VALUE to avoid 
flush per record for csv format (#24730)
    
    * [FLINK-35240][Connectors][format] Disable FLUSH_AFTER_WRITE_VALUE to 
avoid flush per record for csv format
    
    * Address comment
    
    * Add unit test
    
    * address comment
    
    * address comment
    
    * rebase to master
---
 .../apache/flink/formats/csv/CsvBulkWriter.java    |  24 ++-
 .../apache/flink/formats/csv/CsvBulkWriterIT.java  | 208 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 3 files changed, 252 insertions(+), 8 deletions(-)

diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
index f9a8e01bb07..1d7ea6a6b0d 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
@@ -21,10 +21,12 @@ package org.apache.flink.formats.csv;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
@@ -40,7 +42,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
     private final FSDataOutputStream stream;
     private final Converter<T, R, C> converter;
     @Nullable private final C converterContext;
-    private final ObjectWriter csvWriter;
+    private final JsonGenerator generator;
 
     CsvBulkWriter(
             CsvMapper mapper,
@@ -51,13 +53,18 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
         checkNotNull(mapper);
         checkNotNull(schema);
 
+        // Prevent Jackson's writeValue() method calls from closing the stream.
+        mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+        mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE);
+
         this.converter = checkNotNull(converter);
         this.stream = checkNotNull(stream);
         this.converterContext = converterContext;
-        this.csvWriter = mapper.writer(schema);
-
-        // Prevent Jackson's writeValue() method calls from closing the stream.
-        mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+        try {
+            this.generator = mapper.writer(schema).createGenerator(stream, 
JsonEncoding.UTF8);
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Could not create CSV generator.", 
e);
+        }
     }
 
     /**
@@ -98,16 +105,17 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
     @Override
     public void addElement(T element) throws IOException {
         final R r = converter.convert(element, converterContext);
-        csvWriter.writeValue(stream, r);
+        generator.writeObject(r);
     }
 
     @Override
     public void flush() throws IOException {
-        stream.flush();
+        generator.flush();
     }
 
     @Override
     public void finish() throws IOException {
+        generator.close();
         stream.sync();
     }
 }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
new file mode 100644
index 00000000000..aa070b22aba
--- /dev/null
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvBulkWriterIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.TestDataGenerators;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CsvBulkWriterIT {
+
+    @TempDir File outDir;
+
+    /**
+     * FLINK-35240 : Verifies that Jackson CSV writer does not flush per 
record but waits for a
+     * flush signal from Flink.
+     */
+    @Test
+    public void testNoDataIsWrittenBeforeFlinkFlush() throws Exception {
+
+        Configuration config = new Configuration();
+        config.set(
+                RestartStrategyOptions.RESTART_STRATEGY,
+                
RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY.getMainValue());
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+
+        // Workaround serialization limitations
+        File outDirRef = new File(outDir.getAbsolutePath());
+
+        FileSink<Pojo> sink =
+                FileSink.forBulkFormat(
+                                new 
org.apache.flink.core.fs.Path(outDir.getAbsolutePath()),
+                                out -> {
+                                    FSDataOutputStreamWrapper 
outputStreamWrapper =
+                                            new FSDataOutputStreamWrapper(out);
+                                    return new CsvBulkWriterWrapper<>(
+                                            CsvBulkWriter.forPojo(Pojo.class, 
outputStreamWrapper),
+                                            outputStreamWrapper,
+                                            outDirRef);
+                                })
+                        .build();
+
+        List<Pojo> integers = Arrays.asList(new Pojo(1), new Pojo(2));
+        DataGeneratorSource<Pojo> generatorSource =
+                TestDataGenerators.fromDataWithSnapshotsLatch(
+                        integers, TypeInformation.of(Pojo.class));
+        env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), 
"").sinkTo(sink);
+        env.execute();
+        assertThat(getResultsFromSinkFiles(outDir)).containsSequence("1", "2", 
"1", "2");
+    }
+
+    private static class CsvBulkWriterWrapper<T> implements BulkWriter<T> {
+
+        private static int addedElements = 0;
+
+        private static int expectedFlushedElements = 0;
+
+        private final CsvBulkWriter<T, ?, ?> csvBulkWriter;
+
+        private final File outDir;
+
+        private final FSDataOutputStreamWrapper stream;
+
+        CsvBulkWriterWrapper(
+                CsvBulkWriter<T, ?, ?> csvBulkWriter,
+                FSDataOutputStreamWrapper stream,
+                File outDir) {
+            this.csvBulkWriter = csvBulkWriter;
+            this.stream = stream;
+            this.outDir = outDir;
+        }
+
+        @Override
+        public void addElement(T element) throws IOException {
+            addedElements++;
+            csvBulkWriter.addElement(element);
+            
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            csvBulkWriter.flush();
+            expectedFlushedElements = addedElements;
+            
assertThat(getResultsFromSinkFiles(outDir)).hasSize(expectedFlushedElements);
+        }
+
+        @Override
+        public void finish() throws IOException {
+            csvBulkWriter.finish();
+            // The stream should not be closed by the CsvBulkWriter.finish() 
method
+            assertThat(stream.closed).isFalse();
+        }
+    }
+
+    private static class FSDataOutputStreamWrapper extends FSDataOutputStream {
+
+        private boolean closed = false;
+
+        private final FSDataOutputStream stream;
+
+        FSDataOutputStreamWrapper(FSDataOutputStream stream) {
+            this.stream = stream;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return stream.getPos();
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            stream.write(b);
+        }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            stream.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            stream.write(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            stream.flush();
+        }
+
+        @Override
+        public void sync() throws IOException {
+            stream.sync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            stream.close();
+            closed = true;
+        }
+    }
+
+    public static class Pojo {
+        public long x;
+
+        public Pojo(long x) {
+            this.x = x;
+        }
+
+        public Pojo() {}
+    }
+
+    private static List<String> getResultsFromSinkFiles(File outDir) throws 
IOException {
+        final Map<File, String> contents = getFileContentByPath(outDir);
+        return contents.entrySet().stream()
+                .flatMap(e -> Arrays.stream(e.getValue().split("\n")))
+                .filter(s -> !s.isEmpty())
+                .collect(Collectors.toList());
+    }
+
+    private static Map<File, String> getFileContentByPath(File directory) 
throws IOException {
+        Map<File, String> contents = new HashMap<>();
+
+        final Collection<File> filesInBucket = FileUtils.listFiles(directory, 
null, true);
+        for (File file : filesInBucket) {
+            contents.put(file, FileUtils.readFileToString(file));
+        }
+        return contents;
+    }
+}
diff --git a/flink-formats/flink-csv/src/test/resources/log4j2-test.properties 
b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..835c2ec9a3d
--- /dev/null
+++ b/flink-formats/flink-csv/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

Reply via email to