wuchong commented on a change in pull request #14727:
URL: https://github.com/apache/flink/pull/14727#discussion_r569925499



##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -375,6 +375,35 @@ public class AnalysisCommitPolicy implements 
PartitionCommitPolicy {
 </div>
 </div>
 
+### Sink Parallelism
+
+The parallelism of writing files into external file system can be configured 
by the corresponding table option. By default, the parallelism is configured to 
being the same as the parallelism of its last upstream chained operator.

Review comment:
       Would be better to mention which operators will be influenced by this 
parallelism configuration, e.g. the operator writing files to external system, 
the operator compacting small files. 

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
##########
@@ -250,7 +250,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
                         
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
 
         boolean autoCompaction = 
conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
-
+        // todo hive set parallelism, will be implemented in case of 
FLINK-19945 being finished.

Review comment:
       Do we need this TODO? You are referencing the JIRA id of this PR. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -379,6 +390,33 @@ private Object createWriter(Context sinkContext) {
         }
     }
 
+    private void checkAllowanceOfSettingParallelism(
+            int inputParallelism, int configuredParallelism) {
+
+        if (inputParallelism != configuredParallelism) {
+            ChangelogMode mode;
+            if (bulkWriterFormat != null) {
+                mode = bulkWriterFormat.getChangelogMode();
+            } else if (serializationFormat != null) {
+                mode = serializationFormat.getChangelogMode();
+            } else {
+                throw new TableException("Can not find format factory.");
+            }

Review comment:
       We should reuse the changelog mode inference logic with 
`getChangelogMode(...)` method. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {
+
+    private static final TableSchema TEST_SCHEMA =
+            TableSchema.builder()
+                    .field("f0", DataTypes.STRING())
+                    .field("f1", DataTypes.BIGINT())
+                    .field("f2", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testFileSytemTableSinkWithParaallelismInChangeLogMode() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(
+                TestCsvFileSystemFormatFactory.IDENTIFIER
+                        + "."
+                        + 
TestCsvFileSystemFormatFactory.USE_UPSERT_CHANGELOG_MODE.key(),
+                "true");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        try {
+            dataStreamSinkProvider.consumeDataStream(createInputDataStream());

Review comment:
       We should add `fail(...)` to make sure it will fail if no exception is 
thrown. 
   
   This can be simplified into 
   
   ```java
   import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
   assertThrows(
                   "xxxx",
                   ValidationException.class,
                   () -> 
dataStreamSinkProvider.consumeDataStream(createInputDataStream()));
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -93,6 +95,7 @@
         implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
 
     // For compaction reading
+

Review comment:
       remove empty line.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -379,6 +390,33 @@ private Object createWriter(Context sinkContext) {
         }
     }
 
+    private void checkAllowanceOfSettingParallelism(

Review comment:
       `checkConfiguredParallelismAllowed()`

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {
+
+    private static final TableSchema TEST_SCHEMA =
+            TableSchema.builder()
+                    .field("f0", DataTypes.STRING())
+                    .field("f1", DataTypes.BIGINT())
+                    .field("f2", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testFileSytemTableSinkWithParaallelismInChangeLogMode() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(
+                TestCsvFileSystemFormatFactory.IDENTIFIER
+                        + "."
+                        + 
TestCsvFileSystemFormatFactory.USE_UPSERT_CHANGELOG_MODE.key(),
+                "true");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        try {
+            dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        } catch (Exception e) {
+            Assert.assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    e, "is configured, which is different from 
input parallelism")
+                            .isPresent());
+        }
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInStreaming() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        final DataStreamSink<?> dataStreamSink =
+                
dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        final List<Transformation<?>> inputs = 
dataStreamSink.getTransformation().getInputs();
+        Assert.assertTrue(inputs.get(0).getParallelism() == parallelism);
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInBatch() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(true));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        final DataStreamSink<?> dataStreamSink =
+                
dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        Assert.assertTrue(dataStreamSink.getTransformation().getParallelism() 
== parallelism);
+    }
+
+    private static DataStream<RowData> createInputDataStream() {
+        final MockTransformation<RowData> mockTransformation =
+                MockTransformation.createMockTransformation();
+        final DummyStreamExecutionEnvironment mockEnv = new 
DummyStreamExecutionEnvironment();
+
+        return new DataStream<>(mockEnv, mockTransformation);

Review comment:
       We should [Avoid 
Mock](https://flink.apache.org/contributing/code-style-and-quality-common.html) 
as much as possible, because it tends to be costly to maintain in the long run. 
   
   Creating an input DataStream is very easy by 
`env.fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo)`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {
+
+    private static final TableSchema TEST_SCHEMA =
+            TableSchema.builder()
+                    .field("f0", DataTypes.STRING())
+                    .field("f1", DataTypes.BIGINT())
+                    .field("f2", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testFileSytemTableSinkWithParaallelismInChangeLogMode() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(
+                TestCsvFileSystemFormatFactory.IDENTIFIER
+                        + "."
+                        + 
TestCsvFileSystemFormatFactory.USE_UPSERT_CHANGELOG_MODE.key(),
+                "true");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        try {
+            dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        } catch (Exception e) {
+            Assert.assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    e, "is configured, which is different from 
input parallelism")
+                            .isPresent());
+        }
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInStreaming() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        final DataStreamSink<?> dataStreamSink =
+                
dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        final List<Transformation<?>> inputs = 
dataStreamSink.getTransformation().getInputs();
+        Assert.assertTrue(inputs.get(0).getParallelism() == parallelism);

Review comment:
       Simplify assertion: 
   
   ```
   assertEquals(parallelism, inputs.get(0).getParallelism());
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -379,6 +390,33 @@ private Object createWriter(Context sinkContext) {
         }
     }
 
+    private void checkAllowanceOfSettingParallelism(
+            int inputParallelism, int configuredParallelism) {
+
+        if (inputParallelism != configuredParallelism) {
+            ChangelogMode mode;
+            if (bulkWriterFormat != null) {
+                mode = bulkWriterFormat.getChangelogMode();
+            } else if (serializationFormat != null) {
+                mode = serializationFormat.getChangelogMode();
+            } else {
+                throw new TableException("Can not find format factory.");
+            }
+
+            if (!mode.containsOnly(RowKind.INSERT)) {
+                throw new ValidationException(
+                        String.format(
+                                "sink parallelism: [%d] is configured, which 
is different from input parallelism: [%d]. "

Review comment:
       We can improve the exception message a bit:
   
   "Currently, filesystem sink doesn't support setting a different paralleslim 
(%d) from input stream parallelism (%d) when the input stream is not INSERT 
only. "

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {
+
+    private static final TableSchema TEST_SCHEMA =
+            TableSchema.builder()
+                    .field("f0", DataTypes.STRING())
+                    .field("f1", DataTypes.BIGINT())
+                    .field("f2", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testFileSytemTableSinkWithParaallelismInChangeLogMode() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(
+                TestCsvFileSystemFormatFactory.IDENTIFIER
+                        + "."
+                        + 
TestCsvFileSystemFormatFactory.USE_UPSERT_CHANGELOG_MODE.key(),
+                "true");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        try {
+            dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        } catch (Exception e) {
+            Assert.assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    e, "is configured, which is different from 
input parallelism")
+                            .isPresent());
+        }
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInStreaming() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        final DataStreamSink<?> dataStreamSink =
+                
dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        final List<Transformation<?>> inputs = 
dataStreamSink.getTransformation().getInputs();
+        Assert.assertTrue(inputs.get(0).getParallelism() == parallelism);
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInBatch() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(true));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        final DataStreamSink<?> dataStreamSink =
+                
dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        Assert.assertTrue(dataStreamSink.getTransformation().getParallelism() 
== parallelism);

Review comment:
       ditto.

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
##########
@@ -114,7 +127,15 @@ private static void writeCsvToStream(DataType[] types, 
RowData rowData, OutputSt
 
             @Override
             public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
+

Review comment:
       Remove empty line. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {
+
+    private static final TableSchema TEST_SCHEMA =
+            TableSchema.builder()
+                    .field("f0", DataTypes.STRING())
+                    .field("f1", DataTypes.BIGINT())
+                    .field("f2", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testFileSytemTableSinkWithParaallelismInChangeLogMode() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(
+                TestCsvFileSystemFormatFactory.IDENTIFIER
+                        + "."
+                        + 
TestCsvFileSystemFormatFactory.USE_UPSERT_CHANGELOG_MODE.key(),
+                "true");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));
+        Assert.assertTrue(provider instanceof DataStreamSinkProvider);
+
+        final DataStreamSinkProvider dataStreamSinkProvider = 
(DataStreamSinkProvider) provider;
+        try {
+            dataStreamSinkProvider.consumeDataStream(createInputDataStream());
+        } catch (Exception e) {
+            Assert.assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    e, "is configured, which is different from 
input parallelism")
+                            .isPresent());
+        }
+    }
+
+    @Test
+    public void testFileSystemTableSinkWithParallelismInStreaming() {
+
+        int parallelism = 2;
+
+        DescriptorProperties descriptor = new DescriptorProperties();
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+        descriptor.putString("path", "/tmp");
+        descriptor.putString("format", "testcsv");
+        descriptor.putString(FactoryUtil.SINK_PARALLELISM.key(), 
String.valueOf(parallelism));
+
+        final DynamicTableSink tableSink = createSink(descriptor);
+        Assert.assertTrue(tableSink instanceof FileSystemTableSink);
+
+        final DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new MockSinkContext(false));

Review comment:
       Use `new SinkRuntimeProviderContext(false)`. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
##########
@@ -114,7 +127,15 @@ private static void writeCsvToStream(DataType[] types, 
RowData rowData, OutputSt
 
             @Override
             public ChangelogMode getChangelogMode() {
-                return ChangelogMode.insertOnly();
+
+                return formatOptions.get(USE_UPSERT_CHANGELOG_MODE)
+                        ? ChangelogMode.newBuilder()
+                                .addContainedKind(RowKind.INSERT)
+                                .addContainedKind(RowKind.DELETE)
+                                .addContainedKind(RowKind.UPDATE_AFTER)
+                                .addContainedKind(RowKind.UPDATE_BEFORE)

Review comment:
       upsert mode doesn't contain `UPDATE_BEFORE`. 

##########
File path: docs/dev/table/connectors/filesystem.zh.md
##########
@@ -375,6 +375,35 @@ public class AnalysisCommitPolicy implements 
PartitionCommitPolicy {
 </div>
 </div>
 
+### Sink Parallelism
+
+The parallelism of writing files into external file system can be configured 
by the corresponding table option. By default, the parallelism is configured to 
being the same as the parallelism of its last upstream chained operator.

Review comment:
       Ditto. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSinkTest.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Test for {@link FileSystemTableSink}. */
+public class FileSystemTableSinkTest {

Review comment:
       I think an easier way to verify the paralleslim of sink operators is 
using `tEnv.explain`. It can print all the operators parallelism including 
compactor operator. You can see the example in 
`org.apache.flink.table.api.TableEnvironmentTest#testStreamTableEnvironmentExecutionExplain`.
   
   Please also add tests for Hive sink.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -135,14 +138,21 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
sinkContext) {
     }
 
     private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context 
sinkContext) {
+

Review comment:
       remove empty line. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to