chenshzh commented on code in PR #5991:
URL: https://github.com/apache/hudi/pull/5991#discussion_r1001686558


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java:
##########
@@ -74,14 +74,14 @@ public class CompactFunction extends 
ProcessFunction<CompactionPlanEvent, Compac
 
   public CompactFunction(Configuration conf) {
     this.conf = conf;
-    this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+    this.asyncCompactionOperation = 
OptionsResolver.needsAsyncCompactionOperation(conf);
   }

Review Comment:
   Normally we use compaction.async.enabled to turn on compaction. But we could 
not make it sync because it's already been true.
   ```java
       if (asyncCompaction) {
         // executes the compaction task asynchronously to not block the 
checkpoint barrier propagate.
         executor.execute(
             () -> doCompaction(instantTime, compactionOperation, collector, 
reloadWriteConfig()),
             (errMsg, t) -> collector.collect(new 
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
             "Execute compaction for instant %s from task %d", instantTime, 
taskID);
       } else {
         // executes the compaction task synchronously for batch mode.
         LOG.info("Execute compaction for instant {} from task {}", 
instantTime, taskID);
         doCompaction(instantTime, compactionOperation, collector, 
writeClient.getConfig());
       }
   ```
   
   
   > support sync compaction for bounded source
   
   We will use sync compaction mode for unbounded source in some scenarios. And 
actually the bounded source sync compaction seems weird. It use 
compaction.async.enabled  true to turn on compaction, and then switch it to 
fasle for sync mode.
   
   ```java  
        // compaction
         if (OptionsResolver.needsAsyncCompaction(conf)) {   // here 
FlinkOptions.COMPACTION_ASYNC_ENABLED decides that we need compaction
           // use synchronous compaction for bounded source.
           if (context.isBounded()) {
             conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // 
we come here because it is true, and it's so weird to turn  it false, actually 
we just want the operation to be executed sync.
           }
           return Pipelines.compact(conf, pipeline);
         } else {
           return Pipelines.clean(conf, pipeline);
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to