This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0178b12b5b5 [HUDI-6304] Handle cases when inflight compaction files
are removed and job failed leaving pending rollback files (#12644)
0178b12b5b5 is described below
commit 0178b12b5b55b835af5119369298de08794f3b5c
Author: fhan <[email protected]>
AuthorDate: Sat Jan 18 10:42:10 2025 +0800
[HUDI-6304] Handle cases when inflight compaction files are removed and job
failed leaving pending rollback files (#12644)
---
.../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java | 7 ++++++-
.../src/main/java/org/apache/hudi/util/CompactionUtil.java | 4 ++--
.../src/test/java/org/apache/hudi/utils/TestCompactionUtil.java | 2 +-
3 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 75f5779cb9e..17732f3aca7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,6 +30,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -70,6 +72,8 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
private transient FlinkCompactionMetrics compactionMetrics;
+ private transient HoodieFlinkWriteClient writeClient;
+
public CompactionPlanOperator(Configuration conf) {
this.conf = conf;
}
@@ -79,10 +83,11 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
super.open();
registerMetrics();
this.table = FlinkTables.createTable(conf, getRuntimeContext());
+ this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
// when starting up, rolls back all the inflight compaction instants if
there exists,
// these instants are in priority for scheduling task because the
compaction instants are
// scheduled from earliest(FIFO sequence).
- CompactionUtil.rollbackCompaction(table);
+ CompactionUtil.rollbackCompaction(table, this.writeClient);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 657bbdbea60..f0059ff34af 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -149,14 +149,14 @@ public class CompactionUtil {
*
* @param table The hoodie table
*/
- public static void rollbackCompaction(HoodieFlinkTable<?> table) {
+ public static void rollbackCompaction(HoodieFlinkTable<?> table,
HoodieFlinkWriteClient writeClient) {
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the inflight compaction instant: " + inflightInstant
+ " for failover");
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
});
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 111680bdcdb..93e9f7e9ab6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -98,7 +98,7 @@ public class TestCompactionUtil {
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
.getInstants();
assertThat("all the instants should be in pending state", instants.size(),
is(3));
- CompactionUtil.rollbackCompaction(table);
+ CompactionUtil.rollbackCompaction(table,
FlinkWriteClients.createWriteClient(conf));
boolean allRolledBack =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstantsAsStream()
.allMatch(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
assertTrue(allRolledBack, "all the instants should be rolled back");