This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2437a817d [flink] optimize code style and remove useless code in 
SortCompactAction (#2042)
2437a817d is described below

commit 2437a817d202de523bcafa70718c30a8c7b7c733
Author: YeJunHao <[email protected]>
AuthorDate: Wed Sep 20 21:02:24 2023 +0800

    [flink] optimize code style and remove useless code in SortCompactAction 
(#2042)
---
 .../org/apache/paimon/flink/action/SortCompactAction.java | 15 ++++++---------
 .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java    | 11 ++++++++++-
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 522aace00..f83d86d36 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -116,16 +116,13 @@ public class SortCompactAction extends CompactAction {
         DataStream<RowData> source = 
sourceBuilder.withEnv(env).withContinuousMode(false).build();
         TableSorter sorter =
                 TableSorter.getSorter(env, source, fileStoreTable, 
sortStrategy, orderColumns);
-        DataStream<RowData> sorted = sorter.sort();
 
-        FlinkSinkBuilder flinkSinkBuilder = new 
FlinkSinkBuilder(fileStoreTable);
-        flinkSinkBuilder.withInput(sorted).withOverwritePartition(new 
HashMap<>());
-        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
-        if (sinkParallelism != null) {
-            
flinkSinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
-        }
-
-        flinkSinkBuilder.build();
+        new FlinkSinkBuilder(fileStoreTable)
+                .withInput(sorter.sort())
+                // This should use empty map to tag it on overwrite action, 
otherwise there is no
+                // overwrite action.
+                .withOverwritePartition(new HashMap<>())
+                .build();
     }
 
     public SortCompactAction withOrderStrategy(String sortStrategy) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index cd9edb252..2e3a783f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -54,7 +54,16 @@ public class FlinkSinkBuilder {
         return this;
     }
 
-    public FlinkSinkBuilder withOverwritePartition(Map<String, String> 
overwritePartition) {
+    /**
+     * Whether we need to overwrite partitions.
+     *
+     * @param overwritePartition If we pass null, it means not overwrite. If 
we pass an empty map,
+     *     it means to overwrite every partition it received. If we pass a 
non-empty map, it means
+     *     we only overwrite the partitions match the map.
+     * @return returns this.
+     */
+    public FlinkSinkBuilder withOverwritePartition(
+            @Nullable Map<String, String> overwritePartition) {
         this.overwritePartition = overwritePartition;
         return this;
     }

Reply via email to