chenshzh commented on code in PR #5991:
URL: https://github.com/apache/hudi/pull/5991#discussion_r1001686558
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java:
##########
@@ -74,14 +74,14 @@ public class CompactFunction extends
ProcessFunction<CompactionPlanEvent, Compac
public CompactFunction(Configuration conf) {
this.conf = conf;
- this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+ this.asyncCompactionOperation =
OptionsResolver.needsAsyncCompactionOperation(conf);
}
Review Comment:
Normally we use compaction.async.enabled to turn on compaction. But we could
not make it sync because it's already been true.
```java
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the
checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector,
reloadWriteConfig()),
(errMsg, t) -> collector.collect(new
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime,
taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}",
instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig());
}
```
> support sync compaction for bounded source
We will use sync compaction mode for unbounded source in some scenarios. And
actually the bounded source sync compaction seems weird. It use
compaction.async.enabled true to turn on compaction, and then switch it to
fasle for sync mode.
```java
// compaction
if (OptionsResolver.needsAsyncCompaction(conf)) { // here
FlinkOptions.COMPACTION_ASYNC_ENABLED decides that we need compaction
// use synchronous compaction for bounded source.
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); //
we come here because it is true, and it's so weird to turn it false, actually
we just want the operation to be executed sync.
}
return Pipelines.compact(conf, pipeline);
} else {
return Pipelines.clean(conf, pipeline);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]