This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5254126  [FLINK-13544][connectors] Set parallelism of table sink 
operator to input transformation parallelism
5254126 is described below

commit 5254126244b07719d695c26f10bd25c1dddccb96
Author: Jark Wu <[email protected]>
AuthorDate: Fri Aug 2 14:21:34 2019 +0800

    [FLINK-13544][connectors] Set parallelism of table sink operator to input 
transformation parallelism
    
    This closes #9332
---
 .../streaming/connectors/cassandra/CassandraAppendTableSink.java     | 1 +
 .../connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java   | 1 +
 .../apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java  | 5 ++++-
 .../java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java  | 1 +
 .../java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java  | 1 +
 .../src/main/java/org/apache/flink/table/sinks/CsvTableSink.java     | 4 ++++
 .../java/org/apache/flink/table/sinks/OutputFormatTableSink.java     | 4 +++-
 .../apache/flink/walkthrough/common/table/SpendReportTableSink.java  | 3 ++-
 8 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
index edf8c88..72afc3b 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -92,6 +92,7 @@ public class CassandraAppendTableSink implements 
AppendStreamTableSink<Row> {
 
                return dataStream
                                .addSink(sink)
+                               .setParallelism(dataStream.getParallelism())
                                
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
 
        }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
index 427bb6e..c2c5181 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -186,6 +186,7 @@ public abstract class ElasticsearchUpsertTableSinkBase 
implements UpsertStreamTa
                        sinkOptions,
                        upsertFunction);
                return dataStream.addSink(sinkFunction)
+                       .setParallelism(dataStream.getParallelism())
                        
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), 
getFieldNames()));
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index ecadace..b7d2ce6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -96,7 +96,10 @@ public abstract class KafkaTableSinkBase implements 
AppendStreamTableSink<Row> {
                        properties,
                        serializationSchema,
                        partitioner);
-               return 
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(),
 getFieldNames()));
+               return dataStream
+                       .addSink(kafkaProducer)
+                       .setParallelism(dataStream.getParallelism())
+                       
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), 
getFieldNames()));
        }
 
        @Override
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index c53bbc1..7c7e7ba 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -64,6 +64,7 @@ public class JDBCAppendTableSink implements 
AppendStreamTableSink<Row>, BatchTab
        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
                return dataStream
                        .addSink(new JDBCSinkFunction(outputFormat))
+                       .setParallelism(dataStream.getParallelism())
                        
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
        }
 
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
index f6d0b9b..afb118f 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -91,6 +91,7 @@ public class JDBCUpsertTableSink implements 
UpsertStreamTableSink<Row> {
        public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
                return dataStream
                                .addSink(new 
JDBCUpsertSinkFunction(newFormat()))
+                               .setParallelism(dataStream.getParallelism())
                                
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), 
schema.getFieldNames()));
        }
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
index f950f4d..56ae63b 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -117,6 +117,10 @@ public class CsvTableSink implements BatchTableSink<Row>, 
AppendStreamTableSink<
                if (numFiles > 0) {
                        csvRows.setParallelism(numFiles);
                        sink.setParallelism(numFiles);
+               } else {
+                       // if file number is not set, use input parallelism to 
make it chained.
+                       csvRows.setParallelism(dataStream.getParallelism());
+                       sink.setParallelism(dataStream.getParallelism());
                }
 
                
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
index 5ab2606..692082f 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
@@ -44,6 +44,8 @@ public abstract class OutputFormatTableSink<T> implements 
StreamTableSink<T> {
 
        @Override
        public final DataStreamSink<T> consumeDataStream(DataStream<T> 
dataStream) {
-               return dataStream.writeUsingOutputFormat(getOutputFormat());
+               return dataStream
+                       .writeUsingOutputFormat(getOutputFormat())
+                       .setParallelism(dataStream.getParallelism());
        }
 }
diff --git 
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
index 6c85717..ae167fd 100644
--- 
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
+++ 
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
@@ -60,7 +60,8 @@ public class SpendReportTableSink implements 
AppendStreamTableSink<Row>, BatchTa
        public void emitDataStream(DataStream<Row> dataStream) {
                dataStream
                        .map(SpendReportTableSink::format)
-                       .writeUsingOutputFormat(new LoggerOutputFormat());
+                       .writeUsingOutputFormat(new LoggerOutputFormat())
+                       .setParallelism(dataStream.getParallelism());
        }
 
        @Override

Reply via email to