This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 15c422b483e [FLINK-28208][hive] Set parallelism for map operator in
class HiveTableSink's method createBatchSink (#20471)
15c422b483e is described below
commit 15c422b483ee0c5084eb243edad6de3aad65e6a1
Author: Liu Jiangang <[email protected]>
AuthorDate: Mon Aug 15 11:10:24 2022 +0800
[FLINK-28208][hive] Set parallelism for map operator in class
HiveTableSink's method createBatchSink (#20471)
---
.../main/java/org/apache/flink/connectors/hive/HiveTableSink.java | 1 +
.../resources/explain/testHiveTableSinkWithParallelismInBatch.out | 8 ++++----
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 08bacf06851..2c7b7857d71 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -343,6 +343,7 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
builder.setOutputFileConfig(fileNaming);
return dataStream
.map((MapFunction<RowData, Row>) value -> (Row)
converter.toExternal(value))
+ .setParallelism(parallelism)
.writeUsingOutputFormat(builder.build())
.setParallelism(parallelism);
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
index 5858300eea6..81bec916671 100644
---
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
@@ -37,10 +37,10 @@ Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0,
EXPR$1])
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
- "parallelism" : 1,
+ "parallelism" : 8,
"predecessors" : [ {
"id" : ,
- "ship_strategy" : "FORWARD",
+ "ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
@@ -51,8 +51,8 @@ Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0,
EXPR$1])
"parallelism" : 8,
"predecessors" : [ {
"id" : ,
- "ship_strategy" : "REBALANCE",
+ "ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
-}
\ No newline at end of file
+}