This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 15c422b483e [FLINK-28208][hive] Set parallelism for map operator in 
class HiveTableSink's method createBatchSink (#20471)
15c422b483e is described below

commit 15c422b483ee0c5084eb243edad6de3aad65e6a1
Author: Liu Jiangang <[email protected]>
AuthorDate: Mon Aug 15 11:10:24 2022 +0800

    [FLINK-28208][hive] Set parallelism for map operator in class 
HiveTableSink's method createBatchSink (#20471)
---
 .../main/java/org/apache/flink/connectors/hive/HiveTableSink.java | 1 +
 .../resources/explain/testHiveTableSinkWithParallelismInBatch.out | 8 ++++----
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 08bacf06851..2c7b7857d71 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -343,6 +343,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         builder.setOutputFileConfig(fileNaming);
         return dataStream
                 .map((MapFunction<RowData, Row>) value -> (Row) 
converter.toExternal(value))
+                .setParallelism(parallelism)
                 .writeUsingOutputFormat(builder.build())
                 .setParallelism(parallelism);
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
index 5858300eea6..81bec916671 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
@@ -37,10 +37,10 @@ Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, 
EXPR$1])
     "type" : "Map",
     "pact" : "Operator",
     "contents" : "Map",
-    "parallelism" : 1,
+    "parallelism" : 8,
     "predecessors" : [ {
       "id" : ,
-      "ship_strategy" : "FORWARD",
+      "ship_strategy" : "REBALANCE",
       "side" : "second"
     } ]
   }, {
@@ -51,8 +51,8 @@ Sink(table=[test-catalog.db1.test_table], fields=[EXPR$0, 
EXPR$1])
     "parallelism" : 8,
     "predecessors" : [ {
       "id" : ,
-      "ship_strategy" : "REBALANCE",
+      "ship_strategy" : "FORWARD",
       "side" : "second"
     } ]
   } ]
-}
\ No newline at end of file
+}

Reply via email to