This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1017c66aa Clean the marker files for flink compaction (#5611)
a1017c66aa is described below
commit a1017c66aaa377dad7e5e62f773bb714d53fc353
Author: luokey <[email protected]>
AuthorDate: Wed May 18 11:21:14 2022 +0800
Clean the marker files for flink compaction (#5611)
Co-authored-by: [email protected] <loukey_7821>
---
.../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 48d4f48989..338352d4b0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;
@@ -134,6 +135,9 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Execute compaction plan for instant {} as {} file groups",
compactionInstantTime, operations.size());
+ WriteMarkersFactory
+ .get(table.getConfig().getMarkersType(), table,
compactionInstantTime)
+ .deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new
CompactionPlanEvent(compactionInstantTime, operation)));
}