fapaul commented on a change in pull request #17598:
URL: https://github.com/apache/flink/pull/17598#discussion_r778924754



##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.common.Converter;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+
+// TODO: ? PublicEvolving?
+/** A simple {@link BulkWriter} implementation based on Jackson CSV 
transformations. */
+class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
+
+    private final FSDataOutputStream stream;
+    private final Converter<T, R, C> converter;
+    private final C converterContext;
+    private final ObjectWriter csvWriter;
+
+    CsvBulkWriter(
+            CsvMapper mapper,
+            CsvSchema schema,
+            Converter<T, R, C> converter,
+            C converterContext,
+            FSDataOutputStream stream) {
+        this.converter = converter;
+        this.converterContext = converterContext;
+        this.stream = stream;
+
+        this.csvWriter = mapper.writer(schema);
+
+        // Prevent Jackson's writeValue() method calls from closing the stream.
+        mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);

Review comment:
       Nit: Can you try to use `Preconditions.checkNotNull()` for not nullable 
ctor arguments?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.formats.common.Converter;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code StreamFormat} for reading CSV files.
+ *
+ * @param <T> The type of the returned elements.
+ */
+@PublicEvolving
+public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final CsvMapper mapper;
+    private final CsvSchema schema;
+    private final Class<Object> rootType;
+    private final Converter<Object, T, Void> converter;
+    private final TypeInformation<T> typeInformation;
+
+    @SuppressWarnings("unchecked")
+    <R> CsvReaderFormat(
+            CsvMapper mapper,
+            CsvSchema schema,
+            Class<R> rootType,
+            Converter<R, T, Void> converter,
+            TypeInformation<T> typeInformation) {
+        this.mapper = checkNotNull(mapper);
+        this.schema = schema;

Review comment:
       Nit: checkNotNull

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.Converter;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** MiniCluster-based integration tests CSV data format. */
+public class DataStreamCsvITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());

Review comment:
       I am a bit confused by this class. Is this mini-cluster even used? After 
looking more thoroughly this class seems more like a normal unit test rather 
than an IT case

##########
File path: 
flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/Converter.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.common;
+
+import java.io.Serializable;
+
+/**
+ * A generic interface for converting data types.
+ *
+ * @param <From> The type of the element to be converted.
+ * @param <To> The output type.
+ * @param <C> The context for passing optional conversion instructions.
+ */
+public interface Converter<From, To, C> extends Serializable {

Review comment:
       I guess this interface is supposed to be `@PublicEvolving` or 
`@Experimental` since it might be used in the future by a lot of formats.

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
+import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+
+/** A class with common CSV format constants and utility methods. */
+@Internal

Review comment:
       You don't need annotations for package-private classes.

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.Converter;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** MiniCluster-based integration tests CSV data format. */
+public class DataStreamCsvITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    // ------------------------------------------------------------------------
+    //  test cases
+    // ------------------------------------------------------------------------
+    @Test
+    public void testBoundedTextFileSourceWithJackson() throws Exception {
+        final File testDir = TMP_FOLDER.newFolder();
+        writeFile(testDir, "data.csv", CSV_LINES);
+
+        final CsvReaderFormat<CityPojo> csvFormat = 
CsvReaderFormat.forPojo(CityPojo.class);
+        final List<CityPojo> result = initializeSourceAndReadData(testDir, 
csvFormat);
+
+        assertEquals(Arrays.asList(pojos), result);
+    }
+
+    @Test
+    public void testCustomBulkWriter() throws Exception {
+        final File outDir = TMP_FOLDER.newFolder();
+
+        StreamingFileSink<CityPojo> sink =
+                StreamingFileSink.forBulkFormat(
+                                new Path(outDir.toURI()), 
factoryForPojo(CityPojo.class))
+                        .withBucketAssigner(new BasePathBucketAssigner<>())
+                        .withBucketCheckInterval(10L)
+                        .withRollingPolicy(build())
+                        .build();
+
+        OneInputStreamOperatorTestHarness<CityPojo, Object> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 1, 1, 0);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement(new StreamRecord<>(pojos[0], 1L));
+        testHarness.processElement(new StreamRecord<>(pojos[1], 1L));
+        testHarness.processElement(new StreamRecord<>(pojos[2], 1L));
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(2L);
+
+        Map<File, String> contents = getFileContentByPath(outDir);
+        Map.Entry<File, String> onlyEntry = 
contents.entrySet().iterator().next();
+        String[] result = onlyEntry.getValue().split("\n");
+
+        assertThat(result, is(CSV_LINES));
+    }
+
+    static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) {

Review comment:
       private?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/TableCsvFormatFactory.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.BulkWriter.Factory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.formats.common.Converter;
+import 
org.apache.flink.formats.csv.RowDataToCsvConverters.RowDataToCsvConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
+import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+
+//TODO: clarify scope for naming Table/File:
+//https://github.com/apache/flink/pull/17598#discussion_r768753623
+/** CSV format factory for file system. */
+@Internal
+public class TableCsvFormatFactory implements BulkReaderFormatFactory, 
BulkWriterFormatFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return CsvCommons.IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return CsvCommons.optionalOptions();
+    }
+
+    @Override
+    public BulkDecodingFormat<RowData> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
+
+        return new CsvBulkDecodingFormat(formatOptions);
+    }
+
+    private static class CsvBulkDecodingFormat
+            implements BulkDecodingFormat<RowData>,
+                    ProjectableDecodingFormat<BulkFormat<RowData, 
FileSourceSplit>> {
+
+        private final ReadableConfig formatOptions;
+
+        public CsvBulkDecodingFormat(ReadableConfig formatOptions) {
+            this.formatOptions = formatOptions;

Review comment:
       nit: checkNotNull

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.Converter;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** MiniCluster-based integration tests CSV data format. */
+public class DataStreamCsvITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    // ------------------------------------------------------------------------
+    //  test cases
+    // ------------------------------------------------------------------------
+    @Test

Review comment:
       Do you also want to add an ITCase for `CsvReaderFormat.fromSchema`?

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.formats.common.TimeFormats;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.utils.DateTimeUtils.toLocalDateTime;
+
+/** Tests for the CSV file format. */
+public class TableCsvFormatITCase extends JsonPlanTestBase {
+
+    @Test

Review comment:
       Same as for the other test class can you use junit 5 and assertJ, please.

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.common.Converter;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+
+// TODO: ? PublicEvolving?
+/** A simple {@link BulkWriter} implementation based on Jackson CSV 
transformations. */

Review comment:
       Is this TODO still up to date? The class is package-private so 
PublicEvolving feels unlikely

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.Converter;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** MiniCluster-based integration tests CSV data format. */
+public class DataStreamCsvITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .withHaLeadershipControl()
+                            .build());
+
+    // ------------------------------------------------------------------------
+    //  test cases
+    // ------------------------------------------------------------------------
+    @Test
+    public void testBoundedTextFileSourceWithJackson() throws Exception {
+        final File testDir = TMP_FOLDER.newFolder();
+        writeFile(testDir, "data.csv", CSV_LINES);
+
+        final CsvReaderFormat<CityPojo> csvFormat = 
CsvReaderFormat.forPojo(CityPojo.class);
+        final List<CityPojo> result = initializeSourceAndReadData(testDir, 
csvFormat);
+
+        assertEquals(Arrays.asList(pojos), result);
+    }
+
+    @Test
+    public void testCustomBulkWriter() throws Exception {
+        final File outDir = TMP_FOLDER.newFolder();
+
+        StreamingFileSink<CityPojo> sink =
+                StreamingFileSink.forBulkFormat(
+                                new Path(outDir.toURI()), 
factoryForPojo(CityPojo.class))
+                        .withBucketAssigner(new BasePathBucketAssigner<>())
+                        .withBucketCheckInterval(10L)
+                        .withRollingPolicy(build())
+                        .build();
+
+        OneInputStreamOperatorTestHarness<CityPojo, Object> testHarness =
+                new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 1, 1, 0);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement(new StreamRecord<>(pojos[0], 1L));
+        testHarness.processElement(new StreamRecord<>(pojos[1], 1L));
+        testHarness.processElement(new StreamRecord<>(pojos[2], 1L));
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(2L);
+
+        Map<File, String> contents = getFileContentByPath(outDir);
+        Map.Entry<File, String> onlyEntry = 
contents.entrySet().iterator().next();
+        String[] result = onlyEntry.getValue().split("\n");
+
+        assertThat(result, is(CSV_LINES));
+    }
+
+    static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) {
+        final Converter<T, T, Void> converter = (value, context) -> value;
+        final CsvMapper csvMapper = new CsvMapper();
+        final CsvSchema schema = 
csvMapper.schemaFor(pojoClass).withoutQuoteChar();
+        return (out) -> new CsvBulkWriter<>(csvMapper, schema, converter, 
null, out);
+    }
+
+    static Map<File, String> getFileContentByPath(File directory) throws 
IOException {

Review comment:
       private?

##########
File path: 
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.Converter;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** MiniCluster-based integration tests CSV data format. */
+public class DataStreamCsvITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule

Review comment:
       Can you change the newly added tests to use junit 5 and assertJ?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/TableCsvFormatFactory.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.BulkWriter.Factory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
+import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
+import org.apache.flink.formats.common.Converter;
+import 
org.apache.flink.formats.csv.RowDataToCsvConverters.RowDataToCsvConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.formats.csv.CsvFormatOptions.ALLOW_COMMENTS;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.ARRAY_ELEMENT_DELIMITER;
+import static 
org.apache.flink.formats.csv.CsvFormatOptions.DISABLE_QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.ESCAPE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
+import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+
+//TODO: clarify scope for naming Table/File:
+//https://github.com/apache/flink/pull/17598#discussion_r768753623

Review comment:
       TODO?

##########
File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.formats.common.Converter;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+
+// TODO: ? PublicEvolving?
+/** A simple {@link BulkWriter} implementation based on Jackson CSV 
transformations. */
+class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
+
+    private final FSDataOutputStream stream;
+    private final Converter<T, R, C> converter;
+    private final C converterContext;
+    private final ObjectWriter csvWriter;
+
+    CsvBulkWriter(
+            CsvMapper mapper,
+            CsvSchema schema,
+            Converter<T, R, C> converter,
+            C converterContext,
+            FSDataOutputStream stream) {
+        this.converter = converter;
+        this.converterContext = converterContext;
+        this.stream = stream;

Review comment:
       Where is the stream closed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to