This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f39125441e0 [HUDI-6396] Flink supports schedule the clustering in 
batch execution mode and code refactor (#9004)
f39125441e0 is described below

commit f39125441e011e2043924b8e47aa55e36c918dbc
Author: ksmou <[email protected]>
AuthorDate: Mon Jun 19 09:51:44 2023 +0800

    [HUDI-6396] Flink supports schedule the clustering in batch execution mode 
and code refactor (#9004)
    
    * Flink supports schedule the clustering in batch execution mode and code 
refactor
    * change method name
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e6076171863..df473584841 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -256,16 +256,8 @@ public class StreamWriteOperatorCoordinator
           // the stream write task snapshot and flush the data buffer 
synchronously in sequence,
           // so a successful checkpoint subsumes the old one(follows the 
checkpoint subsuming contract)
           final boolean committed = commitInstant(this.instant, checkpointId);
-
-          if (tableState.scheduleCompaction) {
-            // if async compaction is on, schedule the compaction
-            CompactionUtil.scheduleCompaction(metaClient, writeClient, 
tableState.isDeltaTimeCompaction, committed);
-          }
-
-          if (tableState.scheduleClustering) {
-            // if async clustering is on, schedule the clustering
-            ClusteringUtil.scheduleClustering(conf, writeClient, committed);
-          }
+          // schedules the compaction or clustering if it is enabled in stream 
execution mode
+          scheduleTableServices(committed);
 
           if (committed) {
             // start new instant.
@@ -452,15 +444,23 @@ public class StreamWriteOperatorCoordinator
         
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
         // sync Hive synchronously if it is enabled in batch mode.
         syncHive();
-        // schedules the compaction plan in batch execution mode
-        if (tableState.scheduleCompaction) {
-          // if async compaction is on, schedule the compaction
-          CompactionUtil.scheduleCompaction(metaClient, writeClient, 
tableState.isDeltaTimeCompaction, true);
-        }
+        // schedules the compaction or clustering if it is enabled in batch 
execution mode
+        scheduleTableServices(true);
       }
     }
   }
 
+  private void scheduleTableServices(Boolean committed) {
+    // if compaction is on, schedule the compaction
+    if (tableState.scheduleCompaction) {
+      CompactionUtil.scheduleCompaction(metaClient, writeClient, 
tableState.isDeltaTimeCompaction, committed);
+    }
+    // if clustering is on, schedule the clustering
+    if (tableState.scheduleClustering) {
+      ClusteringUtil.scheduleClustering(conf, writeClient, committed);
+    }
+  }
+
   private void handleWriteMetaEvent(WriteMetadataEvent event) {
     // the write task does not block after checkpointing(and before it 
receives a checkpoint success event),
     // if it checkpoints succeed then flushes the data buffer again before 
this coordinator receives a checkpoint

Reply via email to