This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5254126 [FLINK-13544][connectors] Set parallelism of table sink
operator to input transformation parallelism
5254126 is described below
commit 5254126244b07719d695c26f10bd25c1dddccb96
Author: Jark Wu <[email protected]>
AuthorDate: Fri Aug 2 14:21:34 2019 +0800
[FLINK-13544][connectors] Set parallelism of table sink operator to input
transformation parallelism
This closes #9332
---
.../streaming/connectors/cassandra/CassandraAppendTableSink.java | 1 +
.../connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java | 1 +
.../apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java | 5 ++++-
.../java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java | 1 +
.../java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java | 1 +
.../src/main/java/org/apache/flink/table/sinks/CsvTableSink.java | 4 ++++
.../java/org/apache/flink/table/sinks/OutputFormatTableSink.java | 4 +++-
.../apache/flink/walkthrough/common/table/SpendReportTableSink.java | 3 ++-
8 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
index edf8c88..72afc3b 100644
---
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -92,6 +92,7 @@ public class CassandraAppendTableSink implements
AppendStreamTableSink<Row> {
return dataStream
.addSink(sink)
+ .setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
}
diff --git
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
index 427bb6e..c2c5181 100644
---
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
+++
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -186,6 +186,7 @@ public abstract class ElasticsearchUpsertTableSinkBase
implements UpsertStreamTa
sinkOptions,
upsertFunction);
return dataStream.addSink(sinkFunction)
+ .setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(),
getFieldNames()));
}
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index ecadace..b7d2ce6 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -96,7 +96,10 @@ public abstract class KafkaTableSinkBase implements
AppendStreamTableSink<Row> {
properties,
serializationSchema,
partitioner);
- return
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(),
getFieldNames()));
+ return dataStream
+ .addSink(kafkaProducer)
+ .setParallelism(dataStream.getParallelism())
+
.name(TableConnectorUtils.generateRuntimeName(this.getClass(),
getFieldNames()));
}
@Override
diff --git
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index c53bbc1..7c7e7ba 100644
---
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -64,6 +64,7 @@ public class JDBCAppendTableSink implements
AppendStreamTableSink<Row>, BatchTab
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.addSink(new JDBCSinkFunction(outputFormat))
+ .setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
}
diff --git
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
index f6d0b9b..afb118f 100644
---
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
+++
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -91,6 +91,7 @@ public class JDBCUpsertTableSink implements
UpsertStreamTableSink<Row> {
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean,
Row>> dataStream) {
return dataStream
.addSink(new
JDBCUpsertSinkFunction(newFormat()))
+ .setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(),
schema.getFieldNames()));
}
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
index f950f4d..56ae63b 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -117,6 +117,10 @@ public class CsvTableSink implements BatchTableSink<Row>,
AppendStreamTableSink<
if (numFiles > 0) {
csvRows.setParallelism(numFiles);
sink.setParallelism(numFiles);
+ } else {
+ // if file number is not set, use input parallelism to
make it chained.
+ csvRows.setParallelism(dataStream.getParallelism());
+ sink.setParallelism(dataStream.getParallelism());
}
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class,
fieldNames));
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
index 5ab2606..692082f 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
@@ -44,6 +44,8 @@ public abstract class OutputFormatTableSink<T> implements
StreamTableSink<T> {
@Override
public final DataStreamSink<T> consumeDataStream(DataStream<T>
dataStream) {
- return dataStream.writeUsingOutputFormat(getOutputFormat());
+ return dataStream
+ .writeUsingOutputFormat(getOutputFormat())
+ .setParallelism(dataStream.getParallelism());
}
}
diff --git
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
index 6c85717..ae167fd 100644
---
a/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
+++
b/flink-walkthroughs/flink-walkthrough-common/src/main/java/org/apache/flink/walkthrough/common/table/SpendReportTableSink.java
@@ -60,7 +60,8 @@ public class SpendReportTableSink implements
AppendStreamTableSink<Row>, BatchTa
public void emitDataStream(DataStream<Row> dataStream) {
dataStream
.map(SpendReportTableSink::format)
- .writeUsingOutputFormat(new LoggerOutputFormat());
+ .writeUsingOutputFormat(new LoggerOutputFormat())
+ .setParallelism(dataStream.getParallelism());
}
@Override