This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1017c66aa Clean the marker files for flink compaction (#5611)
a1017c66aa is described below

commit a1017c66aaa377dad7e5e62f773bb714d53fc353
Author: luokey <[email protected]>
AuthorDate: Wed May 18 11:21:14 2022 +0800

    Clean the marker files for flink compaction (#5611)
    
    Co-authored-by: [email protected] <loukey_7821>
---
 .../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java     | 4 ++++
 1 file changed, 4 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 48d4f48989..338352d4b0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkTables;
 
@@ -134,6 +135,9 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
       List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
           
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
       LOG.info("Execute compaction plan for instant {} as {} file groups", 
compactionInstantTime, operations.size());
+      WriteMarkersFactory
+          .get(table.getConfig().getMarkersType(), table, 
compactionInstantTime)
+          .deleteMarkerDir(table.getContext(), 
table.getConfig().getMarkersDeleteParallelism());
       for (CompactionOperation operation : operations) {
         output.collect(new StreamRecord<>(new 
CompactionPlanEvent(compactionInstantTime, operation)));
       }

Reply via email to