This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d6eb59677474 feat: Integrate the mdt compaction with existing flink
compaction pipeline (#17991)
d6eb59677474 is described below
commit d6eb596774747e96a0ea970de2cac4bd82d34023
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Jan 30 15:16:43 2026 +0800
feat: Integrate the mdt compaction with existing flink compaction pipeline
(#17991)
* feat: Integrate the mdt compaction with existing flink compaction pipeline
---------
Co-authored-by: danny0405 <[email protected]>
---
.../hudi/table/action/compact/CompactHelpers.java | 15 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 8 +
.../FlinkHoodieBackedTableMetadataWriter.java | 15 +-
.../org/apache/hudi/table/HoodieFlinkTable.java | 2 +-
.../src/main/java/org/apache/hudi/util/Lazy.java | 3 +
.../apache/hudi/configuration/FlinkOptions.java | 21 ++
.../apache/hudi/configuration/OptionsResolver.java | 21 +-
.../java/org/apache/hudi/sink/CleanFunction.java | 61 ++---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 26 ++-
.../hudi/sink/clustering/ClusteringCommitSink.java | 2 +-
.../apache/hudi/sink/compact/CompactOperator.java | 133 +++--------
.../hudi/sink/compact/CompactionCommitEvent.java | 14 +-
.../hudi/sink/compact/CompactionCommitSink.java | 152 +++---------
.../hudi/sink/compact/CompactionPlanEvent.java | 17 +-
.../hudi/sink/compact/CompactionPlanOperator.java | 124 +++-------
.../hudi/sink/compact/FlinkCompactionConfig.java | 2 +-
.../hudi/sink/compact/handler/CleanHandler.java | 118 ++++++++++
.../sink/compact/handler/CompactCommitHandler.java | 257 +++++++++++++++++++++
.../hudi/sink/compact/handler/CompactHandler.java | 201 ++++++++++++++++
.../compact/handler/CompactionPlanHandler.java | 220 ++++++++++++++++++
.../handler/MetadataCompactCommitHandler.java | 138 +++++++++++
.../compact/handler/MetadataCompactHandler.java | 114 +++++++++
.../handler/MetadataCompactionPlanHandler.java | 179 ++++++++++++++
.../org/apache/hudi/sink/v2/utils/PipelinesV2.java | 2 +-
.../org/apache/hudi/table/HoodieTableSink.java | 4 +-
.../java/org/apache/hudi/util/CompactionUtil.java | 76 ++++++
.../java/org/apache/hudi/util/StreamerUtil.java | 15 ++
.../apache/hudi/sink/ITTestDataStreamWrite.java | 4 +-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 47 +++-
.../hudi/sink/utils/CompactFunctionWrapper.java | 8 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 2 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 9 +-
.../apache/hudi/table/ITTestSchemaEvolution.java | 2 +-
.../org/apache/hudi/utils/TestCompactionUtil.java | 128 +++++++++-
.../test/java/org/apache/hudi/utils/TestUtils.java | 19 ++
35 files changed, 1760 insertions(+), 399 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
index 9fd77a93e712..bc6e90fd460e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieCompactionException;
@@ -62,16 +63,24 @@ public class CompactHelpers<T, I, K, O> {
public HoodieCommitMetadata createCompactionMetadata(
HoodieTable table, String compactionInstantTime, HoodieData<WriteStatus>
writeStatuses,
String schema) throws IOException {
+ return createCompactionMetadata(table, compactionInstantTime,
writeStatuses, schema, WriteOperationType.COMPACT);
+ }
+
+ public HoodieCommitMetadata createCompactionMetadata(
+ HoodieTable table, String compactionInstantTime, HoodieData<WriteStatus>
writeStatuses,
+ String schema, WriteOperationType operationType) throws IOException {
InstantGenerator instantGenerator = table.getInstantGenerator();
- HoodieCompactionPlan compactionPlan =
table.getActiveTimeline().readCompactionPlan(
- instantGenerator.getCompactionRequestedInstant(compactionInstantTime));
+ HoodieInstant requestedInstant = operationType ==
WriteOperationType.COMPACT
+ ? instantGenerator.getCompactionRequestedInstant(compactionInstantTime)
+ :
instantGenerator.getLogCompactionRequestedInstant(compactionInstantTime);
+ HoodieCompactionPlan compactionPlan =
table.getActiveTimeline().readCompactionPlan(requestedInstant);
List<HoodieWriteStat> updateStatusMap =
writeStatuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
metadata.addMetadata(org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY,
schema);
- metadata.setOperationType(WriteOperationType.COMPACT);
+ metadata.setOperationType(operationType);
if (compactionPlan.getExtraMetadata() != null) {
compactionPlan.getExtraMetadata().forEach(metadata::addMetadata);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1acb5dfde3db..c0a92dafed7b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -57,6 +57,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.Path;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -439,6 +440,13 @@ public class HoodieFlinkWriteClient<T>
throw new HoodieNotSupportedException("Clustering is not supported yet");
}
+ /**
+ * Commit log compaction and track metrics.
+ */
+ public void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
table, String compactionCommitTime) {
+ tableServiceClient.completeLogCompaction(metadata, table,
compactionCommitTime, Collections.emptyList());
+ }
+
private void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 58acec5705c1..268ae942b92e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -104,6 +104,14 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
}
}
+ /**
+ * Return the write client for metadata table, which will be used for
compaction scheduling in flink write coordinator.
+ */
+ @Override
+ public BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>>
getWriteClient() {
+ return super.getWriteClient();
+ }
+
@Override
protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex)
{
// no op. HUDI-8801 to fix.
@@ -133,7 +141,12 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
protected void commitInternal(String instantTime, Map<String,
HoodieData<HoodieRecord>> partitionRecordsMap, boolean isInitializing,
Option<BulkInsertPartitioner>
bulkInsertPartitioner) {
- performTableServices(Option.ofNullable(instantTime), false);
+ // this method will be invoked during compaction of data table, here table
services for mdt is
+ // not performed if streaming writes mdt is enabled, since the
compaction/clean will be performed
+ // asynchronously in the dedicated compaction pipeline.
+ if (!dataWriteConfig.getMetadataConfig().isStreamingWriteEnabled()) {
+ performTableServices(Option.ofNullable(instantTime), false);
+ }
metadataMetaClient.reloadActiveTimeline();
super.commitInternal(instantTime, partitionRecordsMap, isInitializing,
bulkInsertPartitioner);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index ca7b7a2fa585..fee038151f6e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -118,7 +118,7 @@ public abstract class HoodieFlinkTable<T>
if (config.isMetadataTableEnabled() ||
getMetaClient().getTableConfig().isMetadataTableAvailable()) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(
getContext().getStorageConf(), config, failedWritesCleaningPolicy,
getContext(),
- Option.of(triggeringInstantTimestamp), streamingWrites));
+ Option.ofNullable(triggeringInstantTimestamp), streamingWrites));
} else {
return Option.empty();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
index 2b135a0f383c..d0c9e80ae822 100644
--- a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
+++ b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java
@@ -18,6 +18,8 @@
package org.apache.hudi.util;
+import lombok.Getter;
+
import javax.annotation.concurrent.ThreadSafe;
import java.util.function.Supplier;
@@ -30,6 +32,7 @@ import java.util.function.Supplier;
@ThreadSafe
public class Lazy<T> {
+ @Getter
private volatile boolean initialized;
private Supplier<T> initializer;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e4795950e51e..7ce83e737324 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -228,6 +228,20 @@ public class FlinkOptions extends HoodieConfig {
.withFallbackKeys(HoodieMetadataConfig.ENABLE.key())
.withDescription("Enable the internal metadata table which serves table
metadata like level file listings, default enabled");
+ @AdvancedConfig
+ public static final ConfigOption<Boolean>
METADATA_COMPACTION_SCHEDULE_ENABLED = ConfigOptions
+ .key("metadata.compaction.schedule.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Schedule the compaction plan for metadata table,
enabled by default.");
+
+ public static final ConfigOption<Boolean> METADATA_COMPACTION_ASYNC_ENABLED
= ConfigOptions
+ .key("metadata.compaction.async.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to enable async compaction for metadata table,"
+ + "if true, the compaction for metadata table will be performed in
the compaction pipeline, default enabled.");
+
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS
= ConfigOptions
.key("metadata.compaction.delta_commits")
.intType()
@@ -894,6 +908,13 @@ public class FlinkOptions extends HoodieConfig {
.noDefaultValue()
.withDescription("Parallelism of tasks that do actual compaction,
default same as the write task parallelism");
+ @AdvancedConfig
+ public static final ConfigOption<Boolean>
COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED = ConfigOptions
+ .key("compaction.operation.execute.async.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether the compaction operation should be executed
asynchronously on compact operator, default enabled.");
+
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
public static final String NUM_AND_TIME = "num_and_time";
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index b0734d9525e2..edd626886628 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -264,8 +264,25 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsAsyncCompaction(Configuration conf) {
- return OptionsResolver.isMorTable(conf)
- && conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+ return OptionsResolver.isMorTable(conf) &&
conf.get(FlinkOptions.COMPACTION_ASYNC_ENABLED);
+ }
+
+ /**
+ * Returns whether there is need to schedule the async metadata compaction.
+ *
+ * @param conf The flink configuration.
+ */
+ public static boolean needsAsyncMetadataCompaction(Configuration conf) {
+ return isStreamingIndexWriteEnabled(conf) &&
conf.get(FlinkOptions.METADATA_COMPACTION_ASYNC_ENABLED);
+ }
+
+ /**
+ * Returns whether there is need to schedule the compaction plan for the
metadata table.
+ *
+ * @param conf The flink configuration.
+ */
+ public static boolean needsScheduleMdtCompaction(Configuration conf) {
+ return isStreamingIndexWriteEnabled(conf) &&
conf.get(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 9a2636383676..53452018baa5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -21,9 +21,12 @@ package org.apache.hudi.sink;
import org.apache.hudi.adapter.AbstractRichFunctionAdapter;
import org.apache.hudi.adapter.SinkFunctionAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.compact.handler.CleanHandler;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -47,9 +50,11 @@ public class CleanFunction<T> extends
AbstractRichFunctionAdapter
protected HoodieFlinkWriteClient writeClient;
- private NonThrownExecutor executor;
+ // clean handler for data table
+ private transient Option<CleanHandler> cleanHandlerOpt;
- protected volatile boolean isCleaning;
+ // clean handler for metadata table
+ private transient Option<CleanHandler> mdtCleanHandlerOpt;
public CleanFunction(Configuration conf) {
this.conf = conf;
@@ -59,44 +64,29 @@ public class CleanFunction<T> extends
AbstractRichFunctionAdapter
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
- this.executor =
NonThrownExecutor.builder(log).waitForTasksFinish(true).build();
if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
- executor.execute(() -> {
- this.isCleaning = true;
- try {
- this.writeClient.clean();
- } finally {
- this.isCleaning = false;
- }
- }, "wait for cleaning finish");
+ this.cleanHandlerOpt = Option.of(new CleanHandler(writeClient));
+ this.mdtCleanHandlerOpt =
OptionsResolver.isStreamingIndexWriteEnabled(conf)
+ ? Option.of(new
CleanHandler(StreamerUtil.createMetadataWriteClient(writeClient))) :
Option.empty();
+ } else {
+ this.cleanHandlerOpt = Option.empty();
+ this.mdtCleanHandlerOpt = Option.empty();
}
+
+ cleanHandlerOpt.ifPresent(CleanHandler::clean);
+ mdtCleanHandlerOpt.ifPresent(CleanHandler::clean);
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
- if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
- executor.execute(() -> {
- try {
- this.writeClient.waitForCleaningFinish();
- } finally {
- // ensure to switch the isCleaning flag
- this.isCleaning = false;
- }
- }, "wait for cleaning finish");
- }
+ cleanHandlerOpt.ifPresent(CleanHandler::waitForCleaningFinish);
+ mdtCleanHandlerOpt.ifPresent(CleanHandler::waitForCleaningFinish);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
- if (conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
- try {
- this.writeClient.startAsyncCleaning();
- this.isCleaning = true;
- } catch (Throwable throwable) {
- // catch the exception to not affect the normal checkpointing
- log.warn("Failed to start async cleaning", throwable);
- }
- }
+ cleanHandlerOpt.ifPresent(CleanHandler::startAsyncCleaning);
+ mdtCleanHandlerOpt.ifPresent(CleanHandler::startAsyncCleaning);
}
@Override
@@ -106,12 +96,7 @@ public class CleanFunction<T> extends
AbstractRichFunctionAdapter
@Override
public void close() throws Exception {
- if (executor != null) {
- executor.close();
- }
-
- if (this.writeClient != null) {
- this.writeClient.close();
- }
+ cleanHandlerOpt.ifPresent(CleanHandler::close);
+ mdtCleanHandlerOpt.ifPresent(CleanHandler::close);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index cea0fda848ad..107496834d06 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -129,8 +129,6 @@ public class StreamWriteOperatorCoordinator
*/
protected final Context context;
- private final boolean isStreamingIndexWriteEnabled;
-
/**
* Gateways for sending events to sub-tasks.
*/
@@ -141,6 +139,11 @@ public class StreamWriteOperatorCoordinator
*/
private transient HoodieFlinkWriteClient writeClient;
+ /**
+ * Write client for the metadata table.
+ */
+ private transient HoodieFlinkWriteClient metadataWriteClient;
+
/**
* Meta client.
*/
@@ -209,7 +212,6 @@ public class StreamWriteOperatorCoordinator
this.context = context;
this.parallelism = context.currentParallelism();
this.storageConf =
HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHiveConf(conf));
- this.isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
}
@Override
@@ -227,6 +229,11 @@ public class StreamWriteOperatorCoordinator
this.writeClient = FlinkWriteClients.createWriteClient(conf);
this.writeClient.tryUpgrade(instant, this.metaClient);
initMetadataTable(this.writeClient);
+
+ if (tableState.scheduleMdtCompaction) {
+ this.metadataWriteClient =
StreamerUtil.createMetadataWriteClient(writeClient);
+ }
+
// start the executor
this.executor = NonThrownExecutor.builder(log)
.threadFactory(getThreadFactory("meta-event-handle"))
@@ -268,6 +275,9 @@ public class StreamWriteOperatorCoordinator
if (writeClient != null) {
writeClient.close();
}
+ if (metadataWriteClient != null) {
+ metadataWriteClient.close();
+ }
this.eventBuffers = null;
if (this.clientIds != null) {
this.clientIds.close();
@@ -458,6 +468,10 @@ public class StreamWriteOperatorCoordinator
if (tableState.scheduleCompaction) {
CompactionUtil.scheduleCompaction(writeClient,
tableState.isDeltaTimeCompaction, committed);
}
+ if (tableState.scheduleMdtCompaction) {
+ // schedule compaction for the metadata table
+ CompactionUtil.scheduleMetadataCompaction(metadataWriteClient,
committed);
+ }
// if clustering is on, schedule the clustering
if (tableState.scheduleClustering) {
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
@@ -491,7 +505,7 @@ public class StreamWriteOperatorCoordinator
this.instant = this.writeClient.startCommit(tableState.commitAction,
this.metaClient);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction,
this.instant);
// start commit for MDT if streaming writes to MDT is enabled
- if (isStreamingIndexWriteEnabled) {
+ if (tableState.isStreamingIndexWriteEnabled) {
this.writeClient.startCommitForMetadataTable(this.instant,
this.writeClient.getHoodieTable());
}
this.writeClient.setWriteTimer(tableState.commitAction);
@@ -725,10 +739,12 @@ public class StreamWriteOperatorCoordinator
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
+ final boolean scheduleMdtCompaction;
final boolean scheduleClustering;
final boolean syncHive;
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
+ final boolean isStreamingIndexWriteEnabled;
private TableState(Configuration conf) {
this.operationType =
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -737,9 +753,11 @@ public class StreamWriteOperatorCoordinator
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
+ this.scheduleMdtCompaction =
OptionsResolver.needsScheduleMdtCompaction(conf);
this.syncHive = conf.get(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
+ this.isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
}
public static TableState create(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 97ef564b8776..c4c1efc21c0a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -220,7 +220,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
clusteringMetrics.updateCommitMetrics(instant,
writeMetadata.getCommitMetadata().get());
// whether to clean up the input base parquet files used for clustering
- if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
+ if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
log.info("Running inline clean");
this.writeClient.clean();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 21cd4f2fbfd5..60d117a373af 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -20,22 +20,14 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.handler.CompactHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactHandler;
import org.apache.hudi.sink.utils.NonThrownExecutor;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.table.HoodieFlinkTable;
-import
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
-import org.apache.hudi.table.format.FlinkRowDataReaderContext;
-import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.RuntimeContextUtils;
import lombok.extern.slf4j.Slf4j;
@@ -50,11 +42,6 @@ import
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
-import org.apache.flink.util.Collector;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Supplier;
/**
* Operator to execute the actual compaction task assigned by the compaction
plan task.
@@ -69,26 +56,6 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
*/
private final Configuration conf;
- /**
- * Write Client.
- */
- private transient HoodieFlinkWriteClient<?> writeClient;
-
- /**
- * Hoodie Flink table.
- */
- private transient HoodieFlinkTable<?> flinkTable;
-
- /**
- * Whether to execute compaction asynchronously.
- */
- private final boolean asyncCompaction;
-
- /**
- * Id of current subtask.
- */
- private int taskID;
-
/**
* Executor service to execute the compaction task.
*/
@@ -109,19 +76,12 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
*/
private transient String prevCompactInstant = "";
- /**
- * Whether FileGroup reader based compaction should be used;
- */
- private transient boolean useFileGroupReaderBasedCompaction;
+ private transient Lazy<CompactHandler> compactHandler;
- /**
- * InternalSchema manager used for handling schema evolution.
- */
- private transient InternalSchemaManager internalSchemaManager;
+ private transient Lazy<CompactHandler> mdtCompactHandler;
public CompactOperator(Configuration conf) {
this.conf = conf;
- this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
}
/**
@@ -144,13 +104,16 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
@Override
public void open() throws Exception {
- this.taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
- this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
- this.flinkTable = this.writeClient.getHoodieTable();
- if (this.asyncCompaction) {
+ // ID of current subtask
+ int taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+ if (conf.get(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED)) {
+ // executes compaction asynchronously.
this.executor = NonThrownExecutor.builder(log).build();
}
this.collector = new StreamRecordCollector<>(output);
+ this.compactHandler = Lazy.lazily(() -> new CompactHandler(writeClient,
taskID));
+ this.mdtCompactHandler = Lazy.lazily(() -> new
MetadataCompactHandler(StreamerUtil.createMetadataWriteClient(writeClient),
taskID));
registerMetrics();
}
@@ -158,61 +121,14 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
public void processElement(StreamRecord<CompactionPlanEvent> record) throws
Exception {
final CompactionPlanEvent event = record.getValue();
final String instantTime = event.getCompactionInstantTime();
- final CompactionOperation compactionOperation = event.getOperation();
boolean needReloadMetaClient = !instantTime.equals(prevCompactInstant);
prevCompactInstant = instantTime;
- if (asyncCompaction) {
- // executes the compaction task asynchronously to not block the
checkpoint barrier propagate.
- executor.execute(
- () -> doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig(), needReloadMetaClient),
- (errMsg, t) -> collector.collect(new
CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
- "Execute compaction for instant %s from task %d", instantTime,
taskID);
- } else {
- // executes the compaction task synchronously for batch mode.
- log.info("Execute compaction for instant {} from task {}", instantTime,
taskID);
- doCompaction(instantTime, compactionOperation, collector,
writeClient.getConfig(), needReloadMetaClient);
- }
- }
- private void doCompaction(String instantTime,
- CompactionOperation compactionOperation,
- Collector<CompactionCommitEvent> collector,
- HoodieWriteConfig writeConfig,
- boolean needReloadMetaClient) throws Exception {
- compactionMetrics.startCompaction();
- HoodieFlinkMergeOnReadTableCompactor<?> compactor = new
HoodieFlinkMergeOnReadTableCompactor<>();
- HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
- if (needReloadMetaClient) {
- // reload the timeline
- metaClient.reload();
+ if (event.isMetadataTable()) {
+ mdtCompactHandler.get().compact(executor, event, collector,
needReloadMetaClient, compactionMetrics);
+ } else {
+ compactHandler.get().compact(executor, event, collector,
needReloadMetaClient, compactionMetrics);
}
- // schema evolution
- CompactionUtil.setAvroSchema(writeConfig, metaClient);
- List<WriteStatus> writeStatuses = compactor.compact(
- writeConfig,
- compactionOperation,
- instantTime,
- flinkTable.getTaskContextSupplier(),
- createReaderContext(writeClient, needReloadMetaClient),
- flinkTable);
- compactionMetrics.endCompaction();
- collector.collect(new CompactionCommitEvent(instantTime,
compactionOperation.getFileId(), writeStatuses, taskID));
- }
-
- private HoodieReaderContext<?> createReaderContext(HoodieFlinkWriteClient<?>
writeClient, boolean needReloadMetaClient) {
- HoodieTableMetaClient metaClient = flinkTable.getMetaClient();
- // CAUTION: InternalSchemaManager will scan timeline, reusing the meta
client so that the timeline is updated.
- // Instantiate internalSchemaManager lazily here since it may not be
needed for FG reader, e.g., schema evolution
- // for log files in FG reader do not use internalSchemaManager.
- Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
- if (internalSchemaManager == null || needReloadMetaClient) {
- internalSchemaManager =
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
- }
- return internalSchemaManager;
- };
- // initialize storage conf lazily.
- StorageConfiguration<?> readerConf =
writeClient.getEngineContext().getStorageConf();
- return new FlinkRowDataReaderContext(readerConf,
internalSchemaManagerSupplier, Collections.emptyList(),
metaClient.getTableConfig(), Option.empty());
}
@VisibleForTesting
@@ -220,14 +136,21 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
this.executor = executor;
}
+ @VisibleForTesting
+ public void setOutput(Output<StreamRecord<CompactionCommitEvent>> output) {
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
@Override
public void close() throws Exception {
if (null != this.executor) {
this.executor.close();
}
- if (null != this.writeClient) {
- this.writeClient.close();
- this.writeClient = null;
+ if (compactHandler.isInitialized()) {
+ compactHandler.get().close();
+ }
+ if (mdtCompactHandler.isInitialized()) {
+ mdtCompactHandler.get().close();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
index b03448294bb8..0eae1fe37397 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java
@@ -57,11 +57,21 @@ public class CompactionCommitEvent implements Serializable {
*/
private int taskID;
+ /**
+ * Whether the event is for metadata table.
+ */
+ private boolean isMetadataTable;
+
+ /**
+ * Whether the event is for log compaction.
+ */
+ private boolean isLogCompaction;
+
/**
* An event with NULL write statuses that represents a failed compaction.
*/
- public CompactionCommitEvent(String instant, String fileId, int taskID) {
- this(instant, fileId, null, taskID);
+ public CompactionCommitEvent(String instant, String fileId, int taskID,
boolean isMetadataTable, boolean isLogCompaction) {
+ this(instant, fileId, null, taskID, isMetadataTable, isLogCompaction);
}
public boolean isFailed() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index b6dbe8bfa11b..d704b21b9601 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -19,30 +19,20 @@
package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.data.HoodieListData;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.sink.CleanFunction;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.sink.compact.handler.CompactCommitHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactCommitHandler;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
/**
* Function to check and commit the compaction action.
*
@@ -62,25 +52,9 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
*/
private final Configuration conf;
- /**
- * Buffer to collect the event from each compact task {@code
CompactFunction}.
- *
- * <p>Stores the mapping of instant_time -> file_id -> event. Use a map to
collect the
- * events because the rolling back of intermediate compaction tasks
generates corrupt
- * events.
- */
- private transient Map<String, Map<String, CompactionCommitEvent>>
commitBuffer;
+ private transient Lazy<CompactCommitHandler> compactCommitHandler;
- /**
- * Cache to store compaction plan for each instant.
- * Stores the mapping of instant_time -> compactionPlan.
- */
- private transient Map<String, HoodieCompactionPlan> compactionPlanCache;
-
- /**
- * The hoodie table.
- */
- private transient HoodieFlinkTable<?> table;
+ private transient Lazy<CompactCommitHandler> mdtCompactCommitHandler;
/**
* Compaction metrics.
@@ -95,12 +69,9 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- if (writeClient == null) {
- this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
- }
- this.commitBuffer = new HashMap<>();
- this.compactionPlanCache = new HashMap<>();
- this.table = this.writeClient.getHoodieTable();
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+ this.compactCommitHandler = Lazy.lazily(() -> new
CompactCommitHandler(conf, writeClient));
+ this.mdtCompactCommitHandler = Lazy.lazily(() -> new
MetadataCompactCommitHandler(conf,
StreamerUtil.createMetadataWriteClient(writeClient)));
registerMetrics();
}
@@ -114,102 +85,31 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
+ " is failed: {}, error record count: {}",
instant, event.getTaskID(), event.isFailed(),
getNumErrorRecords(event));
}
- commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
- .put(event.getFileId(), event);
- commitIfNecessary(instant, commitBuffer.get(instant).values());
- }
-
- private long getNumErrorRecords(CompactionCommitEvent event) {
- if (event.getWriteStatuses() == null) {
- return -1L;
+ if (event.isMetadataTable()) {
+ mdtCompactCommitHandler.get().commitIfNecessary(event,
compactionMetrics);
+ } else {
+ compactCommitHandler.get().commitIfNecessary(event, compactionMetrics);
}
- return event.getWriteStatuses().stream()
- .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
}
- /**
- * Condition to commit: the commit buffer has equal size with the compaction
plan operations
- * and all the compact commit event {@link CompactionCommitEvent} has the
same compaction instant time.
- *
- * @param instant Compaction commit instant time
- * @param events Commit events ever received for the instant
- */
- private void commitIfNecessary(String instant,
Collection<CompactionCommitEvent> events) throws IOException {
- HoodieCompactionPlan compactionPlan =
compactionPlanCache.computeIfAbsent(instant, k -> {
- try {
- return CompactionUtils.getCompactionPlan(
- this.writeClient.getHoodieTable().getMetaClient(), instant);
- } catch (Exception e) {
- throw new HoodieException(e);
- }
- });
-
- boolean isReady = compactionPlan.getOperations().size() == events.size();
- if (!isReady) {
- return;
- }
-
- if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
- try {
- // handle failure case
- CompactionUtil.rollbackCompaction(table, instant,
writeClient.getTransactionManager());
- } finally {
- // remove commitBuffer to avoid obsolete metadata commit
- reset(instant);
- this.compactionMetrics.markCompactionRolledBack();
- }
- return;
+ @Override
+ public void close() throws Exception {
+ if (compactCommitHandler.isInitialized()) {
+ compactCommitHandler.get().close();
}
+ if (mdtCompactCommitHandler.isInitialized()) {
+ mdtCompactCommitHandler.get().close();
- try {
- doCommit(instant, events);
- } catch (Throwable throwable) {
- // make it fail-safe
- log.error("Error while committing compaction instant: " + instant,
throwable);
- this.compactionMetrics.markCompactionRolledBack();
- } finally {
- // reset the status
- reset(instant);
}
+ super.close();
}
- @SuppressWarnings("unchecked")
- private void doCommit(String instant, Collection<CompactionCommitEvent>
events) throws IOException {
- List<WriteStatus> statuses = events.stream()
- .map(CompactionCommitEvent::getWriteStatuses)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
-
- long numErrorRecords =
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
-
- if (numErrorRecords > 0 && !this.conf.get(FlinkOptions.IGNORE_FAILED)) {
- // handle failure case
- log.error("Got {} error records during compaction of instant {},\n"
- + "option '{}' is configured as false,"
- + "rolls back the compaction", numErrorRecords, instant,
FlinkOptions.IGNORE_FAILED.key());
- CompactionUtil.rollbackCompaction(table, instant,
writeClient.getTransactionManager());
- this.compactionMetrics.markCompactionRolledBack();
- return;
- }
-
- HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(
- table, instant, HoodieListData.eager(statuses),
writeClient.getConfig().getSchema());
-
- // commit the compaction
- this.writeClient.completeCompaction(metadata, table, instant);
-
- this.compactionMetrics.updateCommitMetrics(instant, metadata);
- this.compactionMetrics.markCompactionCompleted();
-
- // Whether to clean up the old log file when compaction
- if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
- this.writeClient.clean();
+ private long getNumErrorRecords(CompactionCommitEvent event) {
+ if (event.getWriteStatuses() == null) {
+ return -1L;
}
- }
-
- private void reset(String instant) {
- this.commitBuffer.remove(instant);
- this.compactionPlanCache.remove(instant);
+ return event.getWriteStatuses().stream()
+ .map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
}
private void registerMetrics() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
index 8e8228541c77..19ec2a3d0729 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanEvent.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.common.model.CompactionOperation;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@@ -30,7 +29,6 @@ import java.io.Serializable;
/**
* Represents a compact command from the compaction plan task {@link
CompactionPlanOperator}.
*/
-@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@@ -43,8 +41,23 @@ public class CompactionPlanEvent implements Serializable {
private int index;
+ private boolean isMetadataTable;
+
+ private boolean isLogCompaction;
+
public CompactionPlanEvent(String instantTime, CompactionOperation
operation) {
+ this(instantTime, operation, 0);
+ }
+
+ public CompactionPlanEvent(String instantTime, CompactionOperation
operation, int index) {
+ this(instantTime, operation, index, false, false);
+ }
+
+ public CompactionPlanEvent(String instantTime, CompactionOperation
operation, int index, boolean isMetadataTable, boolean isLogCompaction) {
this.compactionInstantTime = instantTime;
this.operation = operation;
+ this.index = index;
+ this.isMetadataTable = isMetadataTable;
+ this.isLogCompaction = isLogCompaction;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 1ef6278fab6d..3fcc8dcfa61e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -18,19 +18,14 @@
package org.apache.hudi.sink.compact;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metrics.FlinkCompactionMetrics;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
-import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.sink.compact.handler.CompactionPlanHandler;
+import org.apache.hudi.sink.compact.handler.MetadataCompactionPlanHandler;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
@@ -46,13 +41,6 @@ import
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.util.stream.Collectors.toList;
-
/**
* Operator that generates the compaction plan with pluggable strategies on
finished checkpoints.
*
@@ -67,15 +55,11 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
*/
private final Configuration conf;
- /**
- * Meta Client.
- */
- @SuppressWarnings("rawtypes")
- private transient HoodieFlinkTable table;
-
private transient FlinkCompactionMetrics compactionMetrics;
- private transient HoodieFlinkWriteClient writeClient;
+ private transient Option<CompactionPlanHandler> compactionPlanHandler;
+
+ private transient Option<CompactionPlanHandler> mdtCompactionPlanHandler;
public CompactionPlanOperator(Configuration conf) {
this.conf = conf;
@@ -85,12 +69,17 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
public void open() throws Exception {
super.open();
registerMetrics();
- this.table = FlinkTables.createTable(conf, getRuntimeContext());
- this.writeClient = FlinkWriteClients.createWriteClient(conf,
getRuntimeContext());
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
+ this.compactionPlanHandler = OptionsResolver.needsAsyncCompaction(conf)
+ ? Option.of(new CompactionPlanHandler(writeClient)) : Option.empty();
+ this.mdtCompactionPlanHandler =
OptionsResolver.needsAsyncMetadataCompaction(conf)
+ ? Option.of(new
MetadataCompactionPlanHandler(StreamerUtil.createMetadataWriteClient(writeClient)))
: Option.empty();
+
// when starting up, rolls back all the inflight compaction instants if
there exists,
// these instants are in priority for scheduling task because the
compaction instants are
// scheduled from earliest(FIFO sequence).
- CompactionUtil.rollbackCompaction(table, this.writeClient);
+
this.compactionPlanHandler.ifPresent(CompactionPlanHandler::rollbackCompaction);
+
this.mdtCompactionPlanHandler.ifPresent(CompactionPlanHandler::rollbackCompaction);
}
/**
@@ -118,76 +107,12 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
@Override
public void notifyCheckpointComplete(long checkpointId) {
- try {
- table.getMetaClient().reloadActiveTimeline();
- // There is no good way to infer when the compaction task for an instant
crushed
- // or is still undergoing. So we use a configured timeout threshold to
control the rollback:
- // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},
- // when the earliest inflight instant has timed out, assumes it has
failed
- // already and just rolls it back.
-
- // comment out: do we really need the timeout rollback ?
- // CompactionUtil.rollbackEarliestCompaction(table, conf);
- scheduleCompaction(table, checkpointId);
- } catch (Throwable throwable) {
- // make it fail-safe
- log.error("Error while scheduling compaction plan for checkpoint: " +
checkpointId, throwable);
- }
- }
-
- private void scheduleCompaction(HoodieFlinkTable<?> table, long
checkpointId) throws IOException {
- HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
-
- // the first instant takes the highest priority.
- Option<HoodieInstant> firstRequested = pendingCompactionTimeline
- .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).firstInstant();
- // record metrics
- compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
-
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
-
- if (!firstRequested.isPresent()) {
- // do nothing.
- log.info("No compaction plan for checkpoint " + checkpointId);
- return;
- }
-
- String compactionInstantTime = firstRequested.get().requestedTime();
-
- // generate compaction plan
- // should support configurable commit metadata
- HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
- table.getMetaClient(), compactionInstantTime);
-
- if (compactionPlan == null || (compactionPlan.getOperations() == null)
- || (compactionPlan.getOperations().isEmpty())) {
- // do nothing.
- log.info("Empty compaction plan for instant " + compactionInstantTime);
- } else {
- HoodieInstant instant =
table.getInstantGenerator().getCompactionRequestedInstant(compactionInstantTime);
- // Mark instant as compaction inflight
-
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
- table.getMetaClient().reloadActiveTimeline();
-
- List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
-
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
- log.info("Execute compaction plan for instant {} as {} file groups",
compactionInstantTime, operations.size());
- WriteMarkersFactory
- .get(table.getConfig().getMarkersType(), table,
compactionInstantTime)
- .deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
- Map<String, Integer> fileIdIndexMap = new HashMap<>();
- int index = 0;
- for (CompactionOperation operation : operations) {
- int operationIndex;
- if (fileIdIndexMap.containsKey(operation.getFileId())) {
- operationIndex = fileIdIndexMap.get(operation.getFileId());
- } else {
- operationIndex = index;
- fileIdIndexMap.put(operation.getFileId(), operationIndex);
- index++;
- }
- output.collect(new StreamRecord<>(new
CompactionPlanEvent(compactionInstantTime, operation, operationIndex)));
- }
- }
+ // comment out: do we really need the timeout rollback ?
+ // CompactionUtil.rollbackEarliestCompaction(table, conf);
+ // schedule data table compaction if enabled
+ this.compactionPlanHandler.ifPresent(handler ->
handler.collectCompactionOperations(checkpointId, compactionMetrics, output));
+ // Also schedule metadata table compaction if enabled
+ this.mdtCompactionPlanHandler.ifPresent(handler ->
handler.collectCompactionOperations(checkpointId, compactionMetrics, output));
}
@VisibleForTesting
@@ -195,6 +120,13 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
this.output = output;
}
+ @Override
+ public void close() throws Exception {
+ this.compactionPlanHandler.ifPresent(CompactionPlanHandler::close);
+ this.mdtCompactionPlanHandler.ifPresent(CompactionPlanHandler::close);
+ super.close();
+ }
+
@Override
public void endInput() throws Exception {
// Called when the input data ends, only used in batch mode.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
index 908b48e31d14..ca547c50e195 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
@@ -213,7 +213,7 @@ public class FlinkCompactionConfig extends Configuration {
conf.set(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
conf.set(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
// use synchronous compaction always
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);
// Map memory
conf.setString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
config.spillableMapPath);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
new file mode 100644
index 000000000000..c46b18bc5dac
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CleanHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Closeable;
+
+/**
+ * Handler for managing table cleaning operations in clean operator.
+ *
+ * <p>This class provides mechanisms to execute cleaning operations
asynchronously,
+ * ensuring proper coordination with Flink's checkpointing mechanism.
+ *
+ * <p>The handler uses a {@link NonThrownExecutor} to manage cleaning tasks
and tracks
+ * the cleaning state to prevent concurrent cleaning operations.
+ */
+@Slf4j
+public class CleanHandler implements Closeable {
+ private final HoodieFlinkWriteClient writeClient;
+ private volatile boolean isCleaning;
+ private final NonThrownExecutor executor;
+
+ public CleanHandler(HoodieFlinkWriteClient writeClient) {
+ this.writeClient = writeClient;
+ this.executor =
NonThrownExecutor.builder(log).waitForTasksFinish(true).build();
+ }
+
+ /**
+ * Executes a synchronous cleaning operation.
+ *
+ * <p>The actual cleaning is performed by the underlying write client, which
removes
+ * old file versions based on the configured retention policy.
+ */
+ public void clean() {
+ executor.execute(() -> {
+ this.isCleaning = true;
+ try {
+ this.writeClient.clean();
+ } finally {
+ this.isCleaning = false;
+ }
+ }, "wait for cleaning finish");
+ }
+
+ /**
+ * Waits for an ongoing cleaning operation to finish.
+ *
+ * <p>If no cleaning operation is in progress, this method returns
immediately
+ * without performing any action.
+ */
+ public void waitForCleaningFinish() {
+ if (isCleaning) {
+ executor.execute(() -> {
+ try {
+ this.writeClient.waitForCleaningFinish();
+ } finally {
+ // ensure to switch the isCleaning flag
+ this.isCleaning = false;
+ }
+ }, "wait for cleaning finish");
+ }
+ }
+
+ /**
+ * Starts an asynchronous cleaning operation.
+ *
+ * <p>Any exceptions thrown during the start of async cleaning are caught
and logged
+ * as warnings to prevent interference with normal Flink checkpointing
operations.
+ * This ensures that cleaning failures do not disrupt the streaming job's
progress.
+ */
+ public void startAsyncCleaning() {
+ if (!isCleaning) {
+ try {
+ this.writeClient.startAsyncCleaning();
+ this.isCleaning = true;
+ } catch (Throwable throwable) {
+ // catch the exception to not affect the normal checkpointing
+ log.warn("Failed to start async cleaning", throwable);
+ }
+ }
+ }
+
+ /**
+ * Closes the CleanHandler and releases associated resources.
+ */
+ @Override
+ public void close() {
+ if (executor != null) {
+ try {
+ executor.close();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close executor of clean
handler.", e);
+ }
+ }
+ this.writeClient.clean();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
new file mode 100644
index 000000000000..175e0c3329b0
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactCommitHandler.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for committing compaction operations in compaction sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ * <li>Buffers compaction commit events from multiple parallel tasks;</li>
+ * <li>Determines whether all compaction operations for an instant are
collected as complete;</li>
+ * <li>Commits the compaction to the timeline;</li>
+ * <li>Rolls back failed compactions;</li>
+ * <li>Triggers cleaning operations after successful compaction.</li>
+ * </ul>
+ *
+ * <p>The handler uses a commit buffer to collect events from all parallel
compaction tasks.
+ * Once all operations for a compaction instant are complete, it validates the
results and
+ * either commits the compaction or rolls it back based on the success/failure
status.
+ *
+ * <p>The commit condition is met when the commit buffer has the same number
of events as
+ * the compaction plan operations, and all events share the same compaction
instant time.
+ *
+ * @see CompactionCommitEvent
+ * @see HoodieCompactionPlan
+ */
+@Slf4j
+public class CompactCommitHandler implements Closeable {
+ protected final HoodieFlinkTable table;
+ protected final HoodieFlinkWriteClient writeClient;
+ protected final Configuration conf;
+ /**
+ * Buffer to collect the event from each compact task {@code
CompactFunction}.
+ *
+ * <p>Stores the mapping of instant_time -> file_id -> event. Use a map to
collect the
+ * events because the rolling back of intermediate compaction tasks
generates corrupt
+ * events.
+ */
+ private transient Map<String, Map<String, CompactionCommitEvent>>
commitBuffer;
+
+ /**
+ * Cache to store compaction plan for each instant.
+ * Stores the mapping of instant_time -> compactionPlan.
+ */
+ protected transient Map<String, HoodieCompactionPlan> compactionPlanCache;
+
+ public CompactCommitHandler(Configuration conf, HoodieFlinkWriteClient
writeClient) {
+ this.conf = conf;
+ this.table = writeClient.getHoodieTable();
+ this.writeClient = writeClient;
+ this.commitBuffer = new HashMap<>();
+ this.compactionPlanCache = new HashMap<>();
+ }
+
+ /**
+ * Commits the compaction if all operations for the instant are complete.
+ *
+ * <p>Condition to commit: the commit buffer has equal size with the
compaction plan operations
+ * and all the compact commit event {@link CompactionCommitEvent} has the
same compaction instant time.
+ *
+ * @param event The compaction commit event
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ */
+ public void commitIfNecessary(CompactionCommitEvent event,
FlinkCompactionMetrics compactionMetrics) {
+ String instant = event.getInstant();
+ commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
+ .put(event.getFileId(), event);
+
+ boolean isLogCompaction = event.isLogCompaction();
+ HoodieCompactionPlan compactionPlan = getCompactionPlan(instant,
isLogCompaction);
+ Collection<CompactionCommitEvent> events =
commitBuffer.get(instant).values();
+
+ boolean isReady = compactionPlan.getOperations().size() == events.size();
+ if (!isReady) {
+ return;
+ }
+
+ if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
+ try {
+ // handle the failure case
+ rollbackCompaction(instant, isLogCompaction);
+ } finally {
+ // remove commitBuffer to avoid obsolete metadata commit
+ reset(instant);
+ compactionMetrics.markCompactionRolledBack();
+ }
+ return;
+ }
+
+ try {
+ doCommit(instant, isLogCompaction, events, compactionMetrics);
+ } catch (Throwable throwable) {
+ // make it fail-safe
+ log.error("Error while committing compaction instant: {}", instant,
throwable);
+ compactionMetrics.markCompactionRolledBack();
+ } finally {
+ // reset the status
+ reset(instant);
+ }
+ }
+
+ /**
+ * Performs the actual commit operation for a compaction instant.
+ *
+ * <p>This method aggregates write statuses from all compaction events,
checks for errors,
+ * and either completes the compaction or rolls it back based on the error
count and
+ * configuration. If successful and cleaning is enabled, it triggers a
cleaning operation.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ * @param events All compaction commit events for this instant
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ * @throws IOException If an I/O error occurs during commit
+ */
+ @SuppressWarnings("unchecked")
+ private void doCommit(
+ String instant,
+ boolean isLogCompaction,
+ Collection<CompactionCommitEvent> events,
+ FlinkCompactionMetrics compactionMetrics) throws IOException {
+ List<WriteStatus> statuses = events.stream()
+ .map(CompactionCommitEvent::getWriteStatuses)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ long numErrorRecords =
statuses.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
+
+ if (numErrorRecords > 0 && !this.conf.get(FlinkOptions.IGNORE_FAILED)) {
+ // handle failure case
+ log.error("Got {} error records during compaction of instant {},\n"
+ + "option '{}' is configured as false,"
+ + "rolls back the compaction", numErrorRecords, instant,
FlinkOptions.IGNORE_FAILED.key());
+ rollbackCompaction(instant, isLogCompaction);
+ compactionMetrics.markCompactionRolledBack();
+ return;
+ }
+
+ // complete the compaction
+ completeCompaction(instant, isLogCompaction, statuses, compactionMetrics);
+
+ // Whether to clean up the old log file when compaction
+ if (!conf.get(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
+ writeClient.clean();
+ }
+ }
+
+ /**
+ * Rolls back a failed compaction operation.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ */
+ protected void rollbackCompaction(String instant, boolean isLogCompaction) {
+ CompactionUtil.rollbackCompaction(table, instant,
writeClient.getTransactionManager());
+ }
+
+ /**
+ * Completes a successful compaction operation by creating metadata and
committing to the timeline.
+ *
+ * <p>This method creates compaction metadata from the write statuses,
commits the compaction
+ * to the timeline, and updates compaction metrics.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ * @param statuses List of write statuses from all compaction
operations
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ * @throws IOException If an I/O error occurs during completion
+ */
+ protected void completeCompaction(String instant,
+ boolean isLogCompaction,
+ List<WriteStatus> statuses,
+ FlinkCompactionMetrics compactionMetrics)
throws IOException {
+ HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(
+ table, instant, HoodieListData.eager(statuses),
writeClient.getConfig().getSchema());
+ writeClient.completeCompaction(metadata, table, instant);
+ compactionMetrics.updateCommitMetrics(instant, metadata);
+ compactionMetrics.markCompactionCompleted();
+ }
+
+ /**
+ * Retrieves the compaction plan for a given instant.
+ *
+ * <p>This method uses a cache to avoid repeatedly reading the compaction
plan from storage.
+ * If the plan is not in the cache, it reads it from the timeline and caches
it.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ *
+ * @return The compaction plan for the instant
+ */
+ protected HoodieCompactionPlan getCompactionPlan(String instant, boolean
isLogCompaction) {
+ return compactionPlanCache.computeIfAbsent(instant, k -> {
+ try {
+ return CompactionUtils.getCompactionPlan(
+ this.writeClient.getHoodieTable().getMetaClient(), instant);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ });
+ }
+
+ /**
+ * Resets the internal state for a completed or failed compaction instant.
+ *
+ * @param instant The compaction instant time to reset
+ */
+ private void reset(String instant) {
+ this.commitBuffer.remove(instant);
+ this.compactionPlanCache.remove(instant);
+ }
+
+ @Override
+ public void close() {
+ writeClient.close();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
new file mode 100644
index 000000000000..e6bda07a47f8
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.table.HoodieFlinkTable;
+import
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+import org.apache.hudi.table.format.FlinkRowDataReaderContext;
+import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Handler for executing compaction operations in compaction sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ * <li>Executes compaction tasks for individual file groups;</li>
+ * <li>Handles schema evolution during compaction;</li>
+ * <li>Collects compaction metadata and generates commit events {@code
CompactionCommitEvent}.</li>
+ * </ul>
+ *
+ * <p>The handler supports both synchronous and asynchronous compaction
execution.
+ * It uses {@link HoodieFlinkMergeOnReadTableCompactor} to perform the actual
compaction
+ * operations.
+ *
+ * @see CompactionPlanEvent
+ * @see CompactionCommitEvent
+ * @see HoodieFlinkMergeOnReadTableCompactor
+ */
+@Slf4j
+public class CompactHandler implements Closeable {
+ protected final HoodieFlinkTable table;
+ protected final HoodieFlinkWriteClient writeClient;
+ protected final int taskID;
+ /**
+ * InternalSchema manager used for handling schema evolution.
+ */
+ private transient InternalSchemaManager internalSchemaManager;
+
+ public CompactHandler(HoodieFlinkWriteClient writeClient, int taskId) {
+ this.table = writeClient.getHoodieTable();
+ this.writeClient = writeClient;
+ this.taskID = taskId;
+ }
+
+ /**
+ * Executes a compaction operation for a single file group.
+ *
+ * <p>This method supports both asynchronous and synchronous execution. When
an executor
+ * is provided, the compaction runs asynchronously with error handling.
Otherwise, it
+ * runs synchronously.
+ *
+ * @param executor The executor for asynchronous execution, or
null for synchronous execution
+ * @param event The compaction plan event containing
operation details
+ * @param collector The collector for emitting compaction commit
events
+ * @param needReloadMetaClient Whether to reload the meta client before
compaction
+ * @param compactionMetrics Metrics collector for compaction operations
+ * @throws Exception If compaction fails
+ */
+ public void compact(@Nullable NonThrownExecutor executor,
+ CompactionPlanEvent event,
+ Collector<CompactionCommitEvent> collector,
+ boolean needReloadMetaClient,
+ FlinkCompactionMetrics compactionMetrics) throws
Exception {
+ String instantTime = event.getCompactionInstantTime();
+ if (executor != null) {
+ executor.execute(
+ () -> doCompaction(event, collector, needReloadMetaClient,
compactionMetrics),
+ (errMsg, t) -> collector.collect(createFailedCommitEvent(event)),
+ "Execute compaction for instant %s from task %d", instantTime,
taskID);
+ } else {
+ // executes the compaction task synchronously for batch mode.
+ log.info("Execute compaction for instant {} from task {}", instantTime,
taskID);
+ doCompaction(event, collector, needReloadMetaClient, compactionMetrics);
+ }
+ }
+
+ /**
+ * Performs the actual compaction operation.
+ *
+ * @param event The compaction plan event containing
operation details
+ * @param collector The collector for emitting compaction commit
events
+ * @param needReloadMetaClient Whether to reload the meta client before
compaction
+ * @param compactionMetrics Metrics collector for compaction operations
+ * @throws Exception If compaction fails
+ */
+ protected void doCompaction(CompactionPlanEvent event,
+ Collector<CompactionCommitEvent> collector,
+ boolean needReloadMetaClient,
+ FlinkCompactionMetrics compactionMetrics) throws
Exception {
+ compactionMetrics.startCompaction();
+ HoodieFlinkMergeOnReadTableCompactor<?> compactor = new
HoodieFlinkMergeOnReadTableCompactor<>();
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ if (needReloadMetaClient) {
+ // reload the timeline
+ metaClient.reload();
+ }
+ // schema evolution
+ CompactionUtil.setAvroSchema(writeClient.getConfig(), metaClient);
+ List<WriteStatus> writeStatuses = compactor.compact(
+ writeClient.getConfig(),
+ event.getOperation(),
+ event.getCompactionInstantTime(),
+ table.getTaskContextSupplier(),
+ createReaderContext(needReloadMetaClient),
+ table);
+ compactionMetrics.endCompaction();
+ collector.collect(createCommitEvent(event, writeStatuses));
+ }
+
+ /**
+ * Creates a compaction commit event for successful compaction.
+ *
+ * @param event The compaction plan event
+ * @param writeStatuses The write statuses from the compaction operation
+ * @return a new compaction commit event with the write statuses
+ */
+ protected CompactionCommitEvent createCommitEvent(CompactionPlanEvent event,
List<WriteStatus> writeStatuses) {
+ return new CompactionCommitEvent(event.getCompactionInstantTime(),
event.getOperation().getFileId(), writeStatuses, taskID,
event.isMetadataTable(), event.isLogCompaction());
+ }
+
+ /**
+ * Creates a compaction commit event for failed compaction.
+ *
+ * @param event the compaction plan event
+ * @return a new compaction commit event marked as failed
+ */
+ private CompactionCommitEvent createFailedCommitEvent(CompactionPlanEvent
event) {
+ return new CompactionCommitEvent(event.getCompactionInstantTime(),
event.getOperation().getFileId(), taskID, event.isMetadataTable(),
event.isLogCompaction());
+ }
+
+ /**
+ * Creates a reader context for reading data during compaction.
+ *
+ * <p>The internal schema manager is instantiated lazily and reused across
operations
+ * unless the meta client needs to be reloaded.
+ *
+ * @param needReloadMetaClient whether to reload the meta client and schema
manager
+ * @return a new Flink row data reader context
+ */
+ protected HoodieReaderContext<?> createReaderContext(boolean
needReloadMetaClient) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ // CAUTION: InternalSchemaManager will scan timeline, reusing the meta
client so that the timeline is updated.
+ // Instantiate internalSchemaManager lazily here since it may not be
needed for FG reader, e.g., schema evolution
+ // for log files in FG reader do not use internalSchemaManager.
+ Supplier<InternalSchemaManager> internalSchemaManagerSupplier = () -> {
+ if (internalSchemaManager == null || needReloadMetaClient) {
+ internalSchemaManager =
InternalSchemaManager.get(metaClient.getStorageConf(), metaClient);
+ }
+ return internalSchemaManager;
+ };
+ // initialize storage conf lazily.
+ StorageConfiguration<?> readerConf =
writeClient.getEngineContext().getStorageConf();
+ return new FlinkRowDataReaderContext(readerConf,
internalSchemaManagerSupplier, Collections.emptyList(),
metaClient.getTableConfig(), Option.empty());
+ }
+
+ /**
+ * Closes the handler and releases resources.
+ *
+ * <p>This method closes the write client and any associated resources.
+ */
+ @Override
+ public void close() {
+ this.writeClient.close();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
new file mode 100644
index 000000000000..04264a723f1a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/CompactionPlanHandler.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Handler for scheduling and managing compaction plans in compaction
sub-pipeline.
+ *
+ * <p>The responsibilities:
+ * <ul>
+ * <li>Retrieves pending compaction plans from the timeline;</li>
+ * <li>Transitions compaction instants from REQUESTED to INFLIGHT state;</li>
+ * <li>Distributes compaction operations to downstream tasks;</li>
+ * <li>Rolls back failed compactions.</li>
+ * </ul>
+ *
+ * <p>The handler reads the Hudi timeline to detect the target pending
compaction
+ * instant and generates compaction plan events for each compaction plan
operation.
+ *
+ * @see CompactionPlanEvent
+ * @see HoodieFlinkWriteClient
+ */
+@Slf4j
+public class CompactionPlanHandler implements Closeable {
+ protected final HoodieFlinkTable table;
+ protected final HoodieFlinkWriteClient writeClient;
+
+ /**
+ * Constructs a new CompactionPlanHandler.
+ *
+ * @param writeClient the Hudi Flink write client for table operations
+ */
+ public CompactionPlanHandler(HoodieFlinkWriteClient writeClient) {
+ this.table = writeClient.getHoodieTable();
+ this.writeClient = writeClient;
+ }
+
+ /**
+ * Retrieve the first pending compaction plan and distribute compaction
operations to downstream tasks.
+ *
+ * @param checkpointId The Flink checkpoint ID
+ * @param compactionMetrics Metrics collector for compaction operations
+ * @param output The output collector for emitting compaction
plan events
+ */
+ public void collectCompactionOperations(
+ long checkpointId,
+ FlinkCompactionMetrics compactionMetrics,
+ Output<StreamRecord<CompactionPlanEvent>> output) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ metaClient.reloadActiveTimeline();
+
+ HoodieTimeline pendingCompactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+ Option<Pair<String, HoodieCompactionPlan>> instantAndPlanOpt =
+ getCompactionPlan(metaClient, pendingCompactionTimeline, checkpointId,
compactionMetrics, CompactionUtils::getCompactionPlan);
+ if (instantAndPlanOpt.isEmpty()) {
+ return;
+ }
+ doCollectCompactionOperations(instantAndPlanOpt.get().getLeft(),
instantAndPlanOpt.get().getRight(), output);
+ }
+
+ /**
+ * Rolls back any pending compaction operations.
+ *
+ * <p>This method is typically called during failure recovery to clean up
+ * incomplete compaction attempts.
+ */
+ public void rollbackCompaction() {
+ CompactionUtil.rollbackCompaction(table, this.writeClient);
+ }
+
+ /**
+ * Retrieves the first pending compaction plan from the timeline.
+ *
+ * @param metaClient The table meta client
+ * @param pendingCompactionTimeline The timeline containing pending
compaction instants
+ * @param checkpointId The Flink checkpoint ID
+ * @param compactionMetrics Metrics collector for compaction
operations
+ * @param planGenerator Function to generate the compaction plan
from meta client and instant
+ * @return an optional pair of instant time and compaction plan, empty if no
valid plan exists
+ */
+ protected Option<Pair<String, HoodieCompactionPlan>> getCompactionPlan(
+ HoodieTableMetaClient metaClient,
+ HoodieTimeline pendingCompactionTimeline,
+ long checkpointId,
+ FlinkCompactionMetrics compactionMetrics,
+ BiFunction<HoodieTableMetaClient, String, HoodieCompactionPlan>
planGenerator) {
+ // the first instant takes the highest priority.
+ Option<HoodieInstant> firstRequested = pendingCompactionTimeline
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).firstInstant();
+ // record metrics
+ compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
+
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
+
+ if (firstRequested.isEmpty()) {
+ log.info("No compaction plan for checkpoint {}", checkpointId);
+ return Option.empty();
+ }
+
+ String compactionInstantTime = firstRequested.get().requestedTime();
+ // generate compaction plan
+ // should support configurable commit metadata
+ HoodieCompactionPlan compactionPlan = planGenerator.apply(metaClient,
compactionInstantTime);
+ if (compactionPlan == null || (compactionPlan.getOperations() == null)
+ || (compactionPlan.getOperations().isEmpty())) {
+ log.info("Empty compaction plan for instant {}", compactionInstantTime);
+ return Option.empty();
+ }
+ return Option.of(Pair.of(compactionInstantTime, compactionPlan));
+ }
+
+ /**
+ * Collects and distributes compaction operations for a Hudi table.
+ *
+ * <p>This method:
+ * <ul>
+ * <li>Transitions the compaction instant from REQUESTED to INFLIGHT;</li>
+ * <li>Deletes any existing marker directories;</li>
+ * <li>Creates a compaction plan event for each operation;</li>
+ * <li>Assigns operation indices to ensure proper task distribution;</li>
+ * </ul>
+ *
+ * @param compactionInstantTime The instant time for this compaction
+ * @param compactionPlan The compaction plan containing operations to
execute
+ * @param output The output collector for emitting compaction
plan events
+ */
+ protected void doCollectCompactionOperations(
+ String compactionInstantTime,
+ HoodieCompactionPlan compactionPlan,
+ Output<StreamRecord<CompactionPlanEvent>> output) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ // Mark instant as compaction inflight
+ HoodieInstant instant =
metaClient.getInstantGenerator().getCompactionRequestedInstant(compactionInstantTime);
+
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+ metaClient.reloadActiveTimeline();
+
+ List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
+
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+ log.info("Execute compaction plan for instant {} as {} file groups",
compactionInstantTime, operations.size());
+
+ WriteMarkersFactory
+ .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+ .deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
+
+ Map<String, Integer> fileIdIndexMap = new HashMap<>();
+ int index = 0;
+ for (CompactionOperation operation : operations) {
+ int operationIndex;
+ if (fileIdIndexMap.containsKey(operation.getFileId())) {
+ operationIndex = fileIdIndexMap.get(operation.getFileId());
+ } else {
+ operationIndex = index;
+ fileIdIndexMap.put(operation.getFileId(), operationIndex);
+ index++;
+ }
+ output.collect(new StreamRecord<>(createPlanEvent(compactionInstantTime,
operation, operationIndex)));
+ }
+ }
+
+ /**
+ * Creates a compaction plan event for a single compaction operation.
+ *
+ * @param compactionInstantTime The instant time for this compaction
+ * @param operation The compaction operation to execute
+ * @param operationIndex The index of this operation for task
distribution
+ * @return a new compaction plan event
+ */
+ protected CompactionPlanEvent createPlanEvent(String compactionInstantTime,
CompactionOperation operation, int operationIndex) {
+ return new CompactionPlanEvent(compactionInstantTime, operation,
operationIndex);
+ }
+
+ /**
+ * Closes the handler and releases resources.
+ */
+ @Override
+ public void close() {
+ this.writeClient.close();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
new file mode 100644
index 000000000000..87ff1d583f32
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactCommitHandler.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.util.CompactionUtil;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handler for committing compaction metadata to metadata table timeline.
+ *
+ * <p>This handler extends {@link CompactCommitHandler} to support metadata
table specific
+ * compaction commit actions, including:
+ * <ul>
+ * <li>Retrieves compaction plans for both compaction and log
compaction;</li>
+ * <li>Commits compaction metadata;</li>
+ * <li>Commits log compaction metadata;</li>
+ * <li>Rolls back failed compactions;</li>
+ * </ul>
+ *
+ * <p>The handler distinguishes between compaction(full compaction) and log
compaction(minor compaction) based on
+ * the {@link CompactionCommitEvent#isLogCompaction()} flag, and triggers
compaction completion or rollback
+ * appropriately with the write client.
+ *
+ * @see CompactCommitHandler
+ * @see CompactionCommitEvent
+ */
+public class MetadataCompactCommitHandler extends CompactCommitHandler {
+
+ public MetadataCompactCommitHandler(Configuration conf,
HoodieFlinkWriteClient writeClient) {
+ super(conf, writeClient);
+ }
+
+ /**
+ * Completes a compaction for metadata tables.
+ *
+ * <p>This method is overridden to support both compaction(full compaction)
+ * and log compaction(minor compaction) for metadata tables. It creates
appropriate metadata based on the
+ * operation type and completes the compaction.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ * @param statuses List of write statuses from all compaction
operations
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ */
+ @Override
+
+ protected void completeCompaction(String instant,
+ boolean isLogCompaction,
+ List<WriteStatus> statuses,
+ FlinkCompactionMetrics compactionMetrics)
throws IOException {
+ WriteOperationType operationType = isLogCompaction ?
WriteOperationType.LOG_COMPACT : WriteOperationType.COMPACT;
+ HoodieCommitMetadata metadata =
CompactHelpers.getInstance().createCompactionMetadata(
+ table, instant, HoodieListData.eager(statuses),
writeClient.getConfig().getSchema(), operationType);
+
+ // commit the compaction
+ if (isLogCompaction) {
+ writeClient.completeLogCompaction(metadata, table, instant);
+ } else {
+ writeClient.completeCompaction(metadata, table, instant);
+ }
+
+ compactionMetrics.updateCommitMetrics(instant, metadata);
+ compactionMetrics.markCompactionCompleted();
+ }
+
+ /**
+ * Rolls back a failed compaction.
+ *
+ * <p>This method is overridden to support rolling back of both compaction
and log compaction.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ */
+ @Override
+ protected void rollbackCompaction(String instant, boolean isLogCompaction) {
+ if (isLogCompaction) {
+ CompactionUtil.rollbackLogCompaction(table, instant,
writeClient.getTransactionManager());
+ } else {
+ CompactionUtil.rollbackCompaction(table, instant,
writeClient.getTransactionManager());
+ }
+ }
+
+ /**
+ * Retrieves the compaction plan for a metadata table compaction instant.
+ *
+ * <p>This method is overridden to support retrieving both
+ * compaction plans and log compaction plans based on the compaction type.
+ * It uses a cache to avoid repeatedly reading plans from storage.
+ *
+ * @param instant The compaction instant time
+ * @param isLogCompaction Whether the compaction is log compaction
+ *
+ * @return The compaction plan for the instant
+ * @throws HoodieException If the compaction plan cannot be retrieved
+ */
+ @Override
+ protected HoodieCompactionPlan getCompactionPlan(String instant, boolean
isLogCompaction) {
+ return compactionPlanCache.computeIfAbsent(instant, k -> {
+ try {
+ return isLogCompaction ?
CompactionUtils.getLogCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(),
instant)
+ :
CompactionUtils.getCompactionPlan(this.writeClient.getHoodieTable().getMetaClient(),
instant);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ });
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
new file mode 100644
index 000000000000..4ae3ee0c9477
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.AvroReaderContextFactory;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.action.compact.CompactHelpers;
+import
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
+
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+
+/**
+ * Handler for compaction operation execution on Hudi metadata tables.
+ *
+ * <p>This handler extends {@link CompactHandler} to support metadata table
specific
+ * compaction operations, covering compaction and log compaction.
+ *
+ * <p>The handler uses {@link AvroReaderContextFactory} to create reader
contexts
+ * for metadata table payloads, which differs from data table records(engine
native).
+ *
+ * @see CompactHandler
+ * @see CompactionPlanEvent
+ * @see AvroReaderContextFactory
+ */
+public class MetadataCompactHandler extends CompactHandler {
+
+ public MetadataCompactHandler(HoodieFlinkWriteClient writeClient, int
taskId) {
+ super(writeClient, taskId);
+ }
+
+ /**
+ * Creates a reader context for reading metadata table records.
+ *
+ * <p>This method is overridden to create an Avro-based reader context
+ * adapted for metadata table file reading, which has a custom payload class.
+ *
+ * @param needReloadMetaClient Whether the meta client needs to be reloaded
+ *
+ * @return A reader context configured for metadata table file reading
+ */
+ @Override
+ protected HoodieReaderContext<?> createReaderContext(boolean
needReloadMetaClient) {
+ String payloadClass =
ConfigUtils.getPayloadClass(writeClient.getConfig().getProps());
+ AvroReaderContextFactory readerContextFactory = new
AvroReaderContextFactory(table.getMetaClient(), payloadClass,
writeClient.getConfig().getProps());
+ return readerContextFactory.getContext();
+ }
+
+ /**
+ * Executes a compaction operation.
+ *
+ * <p>This method is overridden to support both compaction(full compaction)
+ * and log compaction(minor compaction) for metadata tables.
+ *
+ * @param event The compaction plan event containing the
operation details
+ * @param collector Collector for emitting compaction commit
events
+ * @param needReloadMetaClient Whether the meta client needs to be reloaded
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ */
+ @Override
+ protected void doCompaction(CompactionPlanEvent event,
+ Collector<CompactionCommitEvent> collector,
+ boolean needReloadMetaClient,
+ FlinkCompactionMetrics compactionMetrics) throws
Exception {
+ if (!event.isLogCompaction()) {
+ super.doCompaction(event, collector, needReloadMetaClient,
compactionMetrics);
+ } else {
+ compactionMetrics.startCompaction();
+ // Create a write client specifically for the metadata table
+ HoodieFlinkMergeOnReadTableCompactor<?> compactor = new
HoodieFlinkMergeOnReadTableCompactor<>();
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ if (needReloadMetaClient) {
+ // reload the timeline
+ metaClient.reload();
+ }
+ Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
+ List<WriteStatus> writeStatuses = compactor.logCompact(
+ writeClient.getConfig(),
+ event.getOperation(),
+ event.getCompactionInstantTime(),
+ instantRange,
+ table,
+ table.getTaskContextSupplier());
+ compactionMetrics.endCompaction();
+ collector.collect(createCommitEvent(event, writeStatuses));
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
new file mode 100644
index 000000000000..3a8ef982eea7
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/handler/MetadataCompactionPlanHandler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact.handler;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hudi.util.CompactionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Handler for scheduling compaction plans on Hudi metadata tables.
+ *
+ * <p>This handler extends {@link CompactionPlanHandler} to support metadata
table specific
+ * compaction operations, including:
+ * <ul>
+ * <li>Compaction;</li>
+ * <li>Log compaction(if enabled);</li>
+ * <li>Rollback of the compactions;</li>
+ * </ul>
+ *
+ * <p>The handler first attempts to collect compaction operations. If no
compaction
+ * plan is scheduled and log compaction is enabled, it collects the log
compaction operations instead.
+ * This ensures efficient file layout of metadata table storage.
+ *
+ * @see CompactionPlanHandler
+ * @see CompactionPlanEvent
+ */
+@Slf4j
+public class MetadataCompactionPlanHandler extends CompactionPlanHandler {
+
+ public MetadataCompactionPlanHandler(HoodieFlinkWriteClient writeClient) {
+ super(writeClient);
+ }
+
+ /**
+ * Collects compaction operations for metadata tables.
+ *
+ * <p>This method is overridden to support both compaction(full compaction)
+ * and log compaction(minor compaction) for metadata tables. It first
attempts to collect compaction operations.
+ * If no compaction plan is scheduled and log compaction is enabled, it
collects the log compaction operations instead.
+ *
+ * @param checkpointId The Flink checkpoint ID triggering this
scheduling
+ * @param compactionMetrics Metrics collector for tracking compaction
progress
+ * @param output Output stream for emitting compaction plan events
+ */
+ @Override
+ public void collectCompactionOperations(long checkpointId,
FlinkCompactionMetrics compactionMetrics,
Output<StreamRecord<CompactionPlanEvent>> output) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ metaClient.reloadActiveTimeline();
+ // retrieve compaction plan
+ HoodieTimeline pendingCompactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
+ Option<Pair<String, HoodieCompactionPlan>> instantAndPlanOpt =
getCompactionPlan(
+ metaClient, pendingCompactionTimeline, checkpointId,
compactionMetrics, CompactionUtils::getCompactionPlan);
+ if (instantAndPlanOpt.isPresent()) {
+ doCollectCompactionOperations(instantAndPlanOpt.get().getLeft(),
instantAndPlanOpt.get().getRight(), output);
+ return;
+ }
+ if (!writeClient.getConfig().isLogCompactionEnabled()) {
+ return;
+ }
+ // for log compaction
+ HoodieTimeline pendingLogCompactionTimeline =
metaClient.getActiveTimeline().filterPendingLogCompactionTimeline();
+ instantAndPlanOpt = getCompactionPlan(
+ metaClient, pendingLogCompactionTimeline, checkpointId,
compactionMetrics, CompactionUtils::getLogCompactionPlan);
+ if (instantAndPlanOpt.isPresent()) {
+ doCollectLogCompactions(instantAndPlanOpt.get().getLeft(),
instantAndPlanOpt.get().getRight(), output);
+ }
+ }
+
+ /**
+ * Rolls back pending compaction operations for metadata tables.
+ *
+ * <p>This method is overridden to support rolling back both compaction and
log compaction.
+ */
+ @Override
+ public void rollbackCompaction() {
+ super.rollbackCompaction();
+ if (writeClient.getConfig().isLogCompactionEnabled()) {
+ CompactionUtil.rollbackLogCompaction(table, writeClient);
+ }
+ }
+
+ /**
+ * Creates a compaction plan event for metadata table operations.
+ *
+ * <p>This method is overridden to create the metadata table compaction plan
events.
+ *
+ * @param compactionInstantTime The instant time for the compaction
+ * @param operation The compaction operation details
+ * @param operationIndex The index of this operation in the
compaction plan
+ *
+ * @return A compaction plan event configured for metadata table compaction
+ */
+ @Override
+ protected CompactionPlanEvent createPlanEvent(String compactionInstantTime,
CompactionOperation operation, int operationIndex) {
+ return new CompactionPlanEvent(compactionInstantTime, operation,
operationIndex, true, false);
+ }
+
+ /**
+ * Collects and emits log compaction plan events for metadata tables.
+ *
+ * <p>This method transitions the log compaction instant from requested to
inflight,
+ * extracts compaction operations from the plan, deletes marker directories,
and
+ * emits compaction plan events for each operation. Operations with the same
file ID
+ * are assigned with the same operation index to ensure they are processed
together.
+ *
+ * @param compactionInstantTime The instant time for the log compaction
+ * @param compactionPlan The log compaction plan containing
operations to execute
+ * @param output Output stream for emitting compaction plan
events
+ */
+ public void doCollectLogCompactions(
+ String compactionInstantTime,
+ HoodieCompactionPlan compactionPlan,
+ Output<StreamRecord<CompactionPlanEvent>> output) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ // Mark log compaction as inflight
+ HoodieInstant instant =
metaClient.getInstantGenerator().getLogCompactionRequestedInstant(compactionInstantTime);
+
metaClient.getActiveTimeline().transitionLogCompactionRequestedToInflight(instant);
+ metaClient.reloadActiveTimeline();
+
+ List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
+
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+ log.info("Execute log compaction plan for instant {} as {} file groups",
compactionInstantTime, operations.size());
+
+ WriteMarkersFactory
+ .get(table.getConfig().getMarkersType(), table, compactionInstantTime)
+ .deleteMarkerDir(table.getContext(),
table.getConfig().getMarkersDeleteParallelism());
+
+ Map<String, Integer> fileIdIndexMap = new HashMap<>();
+ int index = 0;
+ for (CompactionOperation operation : operations) {
+ int operationIndex;
+ if (fileIdIndexMap.containsKey(operation.getFileId())) {
+ operationIndex = fileIdIndexMap.get(operation.getFileId());
+ } else {
+ operationIndex = index;
+ fileIdIndexMap.put(operation.getFileId(), operationIndex);
+ index++;
+ }
+ output.collect(new StreamRecord<>(
+ new CompactionPlanEvent(compactionInstantTime, operation,
operationIndex, true, true)));
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
index fb9852ef4a22..6e93c02bc137 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/v2/utils/PipelinesV2.java
@@ -135,7 +135,7 @@ public class PipelinesV2 {
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
if (isBounded) {
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED,
false);
}
return compactV2(conf, pipeline);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 46727a1540fa..f46e2e672222 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -125,10 +125,10 @@ public class HoodieTableSink implements
final DataStream<HoodieFlinkInternalRow> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
pipeline = Pipelines.hoodieStreamWrite(conf, rowType,
hoodieRecordDataStream);
// compaction
- if (OptionsResolver.needsAsyncCompaction(conf)) {
+ if (OptionsResolver.needsAsyncCompaction(conf) ||
OptionsResolver.needsAsyncMetadataCompaction(conf)) {
// use synchronous compaction for bounded source.
if (context.isBounded()) {
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED,
false);
}
return Pipelines.compact(conf, pipeline);
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index b2e5072cdbbb..8e5456b8f1a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -65,6 +65,55 @@ public class CompactionUtil {
}
}
+ /**
+ * Schedules a new compaction/log compaction for the metadata table.
+ *
+ * @param writeClient The metadata write client
+ * @param committed Whether the triggering instant was committed
successfully
+ */
+ public static void scheduleMetadataCompaction(
+ HoodieFlinkWriteClient<?> writeClient,
+ boolean committed) {
+ if (!committed) {
+ return;
+ }
+ if (canScheduleMetadataCompaction(writeClient.getConfig(),
writeClient.getHoodieTable().getMetaClient())) {
+ Option<String> compactInstant =
writeClient.scheduleCompaction(Option.empty());
+ if (compactInstant.isPresent()) {
+ log.info("Scheduled compaction {} for metadata table.",
compactInstant.get());
+ return;
+ }
+ if (!writeClient.getConfig().isLogCompactionEnabled()) {
+ return;
+ }
+ Option<String> logCompactionInstant =
writeClient.scheduleLogCompaction(Option.empty());
+ if (logCompactionInstant.isPresent()) {
+ log.info("Scheduled log compaction {} for metadata table.",
logCompactionInstant.get());
+ }
+ }
+ }
+
+ /**
+ * Validates the timeline for both data and metadata tables to ensure
compaction on MDT can be scheduled.
+ *
+ * @param metadataWriteConfig The write config for metadata write client
+ * @param metadataMetaClient The table metadata client for metadata table
+ *
+ * @return True if the compaction/log compaction can be scheduled for
metadata table.
+ */
+ private static boolean canScheduleMetadataCompaction(HoodieWriteConfig
metadataWriteConfig, HoodieTableMetaClient metadataMetaClient) {
+ // Under the log compaction scope, the sequence of the log-compaction and
compaction needs to be ensured because metadata items such as RLI
+ // only has proc-time ordering semantics. For "ensured", it means the
completion sequence of the log-compaction/compaction is the same as the start
sequence.
+ if (metadataWriteConfig.isLogCompactionEnabled()) {
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ return !pendingLogCompactionInstant.isPresent() &&
!pendingCompactionInstant.isPresent();
+ }
+ return true;
+ }
+
/**
* Sets up the avro schema string into the give configuration {@code conf}
* through reading from the hoodie table metadata.
@@ -158,6 +207,33 @@ public class CompactionUtil {
}
}
+ public static void rollbackLogCompaction(HoodieFlinkTable<?> table, String
instantTime, TransactionManager transactionManager) {
+ HoodieInstant inflightInstant =
table.getInstantGenerator().getLogCompactionInflightInstant(instantTime);
+ if
(table.getMetaClient().reloadActiveTimeline().filterPendingLogCompactionTimeline().containsInstant(inflightInstant))
{
+ log.warn("Failed to rollback log compaction instant: [{}]", instantTime);
+ table.rollbackInflightLogCompaction(inflightInstant, transactionManager);
+ }
+ }
+
+ /**
+ * Force rolls back all the inflight log compaction instants, especially for
job failover restart.
+ *
+ * @param table The hoodie table
+ */
+ public static void rollbackLogCompaction(HoodieFlinkTable<?> table,
HoodieFlinkWriteClient writeClient) {
+ HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
+ .filterPendingLogCompactionTimeline()
+ .filter(instant ->
+ instant.getState() == HoodieInstant.State.INFLIGHT);
+ inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+ log.info("Rollback the inflight log compaction instant: {} for
failover", inflightInstant);
+ table.rollbackInflightLogCompaction(inflightInstant,
writeClient.getTransactionManager());
+ });
+ if (!inflightCompactionTimeline.getInstants().isEmpty()) {
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ }
+
/**
* Force rolls back all the inflight compaction instants, especially for job
failover restart.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 56e3c7646066..ecf7b73e312d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,6 +18,7 @@
package org.apache.hudi.util;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
@@ -62,6 +63,8 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
@@ -135,6 +138,18 @@ public class StreamerUtil {
return properties;
}
+ /**
+ * Creates the metadata write client from the given write client for data
table.
+ */
+ public static HoodieFlinkWriteClient
createMetadataWriteClient(HoodieFlinkWriteClient dataWriteClient) {
+ // Get the metadata writer from the table and use its write client
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
+ dataWriteClient.getHoodieTable().getMetadataWriter(null, true, true);
+ ValidationUtils.checkArgument(metadataWriterOpt.isPresent(), "Failed to
create the metadata writer");
+ FlinkHoodieBackedTableMetadataWriter metadataWriter =
(FlinkHoodieBackedTableMetadataWriter) metadataWriterOpt.get();
+ return (HoodieFlinkWriteClient) metadataWriter.getWriteClient();
+ }
+
public static HoodieSchema
getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceHoodieSchema();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index d5bd4f895bc1..595cea5f7b94 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -169,7 +169,7 @@ public class ITTestDataStreamWrite extends TestLogger {
conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
// use synchronized compaction to ensure flink job finishing with
compaction completed.
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
defaultWriteAndCheckExpected(conf, "mor_write_with_compact", 1);
@@ -212,7 +212,7 @@ public class ITTestDataStreamWrite extends TestLogger {
public void testStreamWriteWithIndexBootstrap(String tableType) throws
Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
// use synchronized compaction to avoid sleeping for async compact.
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.set(FlinkOptions.TABLE_TYPE, tableType);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index eb25855dc1b2..60560b7cb3a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -55,7 +55,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -63,6 +65,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -857,12 +860,17 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
}
- @Test
- public void testBucketAssignWithRLI() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testBucketAssignWithRLI(boolean mdtCompactionEnabled) throws
Exception {
// use record level index
conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+ conf.set(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED,
mdtCompactionEnabled);
+ if (mdtCompactionEnabled) {
+ conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
+ }
TestHarness testHarness =
preparePipeline(conf)
.consume(TestData.DATA_SET_INSERT)
@@ -895,6 +903,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkpointComplete(2)
.checkWrittenData(EXPECTED1)
.end();
+
+ if (mdtCompactionEnabled) {
+ TestUtils.validateMdtCompactionInstant(conf.get(FlinkOptions.PATH),
false);
+ }
}
@Test
@@ -938,12 +950,22 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
- @Test
- public void testIndexWriteFunctionWithMultipleCheckpoints() throws Exception
{
+ @ParameterizedTest
+ @MethodSource("mdtCompactionParams")
+ public void testIndexWriteFunctionWithMultipleCheckpoints(boolean
mdtCompactionEnabled, boolean isLogCompaction) throws Exception {
// Test IndexWriteFunction with multiple checkpoint operations
conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+ conf.set(FlinkOptions.METADATA_COMPACTION_SCHEDULE_ENABLED,
mdtCompactionEnabled);
+ if (mdtCompactionEnabled) {
+ if (isLogCompaction) {
+
conf.setString(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
"true");
+
conf.setString(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "2");
+ } else {
+ conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 1);
+ }
+ }
TestHarness testHarness =
preparePipeline()
@@ -985,6 +1007,23 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
getRecordKeyIndex(metaClient, Arrays.asList("id1", "id2", "id3",
"id4", "id5", "id6", "id7", "id8", "id9", "id10", "id11"));
assertEquals(11, result.size());
result.forEach((key, value) ->
assertEquals(expectedKeyPartitionMap.get(key), value.getPartitionPath()));
+
+ if (mdtCompactionEnabled) {
+ TestUtils.validateMdtCompactionInstant(conf.get(FlinkOptions.PATH),
isLogCompaction);
+ }
+ }
+
+ /**
+ * Return test params => (mdt compaction enabled, log compaction enabled).
+ */
+ private static Stream<Arguments> mdtCompactionParams() {
+ Object[][] data =
+ new Object[][] {
+ {true, false},
+ {true, true},
+ {false, false},
+ {false, true}};
+ return Stream.of(data).map(Arguments::of);
}
private Map<String, HoodieRecordGlobalLocation>
getRecordKeyIndex(HoodieTableMetaClient metaClient, List<String> keys) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 6e585f3386e9..7438773db6ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -88,7 +88,7 @@ public class CompactFunctionWrapper {
public void openFunction() throws Exception {
compactionPlanOperator = new CompactionPlanOperator(conf);
- planEventOutput = new CollectOutputAdapter<>();
+ planEventOutput = new CollectOutputAdapter<>();
compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
compactionPlanOperator.open();
@@ -109,7 +109,6 @@ public class CompactFunctionWrapper {
public void compact(long checkpointID) throws Exception {
// collect the CompactEvents.
- compactionPlanOperator.setOutput(planEventOutput);
compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents
for (CompactionPlanEvent event : planEventOutput.getRecords()) {
@@ -119,6 +118,11 @@ public class CompactFunctionWrapper {
for (CompactionCommitEvent event : commitEventOutput.getRecords()) {
commitSink.invoke(event, null);
}
+ // reset collector
+ planEventOutput = new CollectOutputAdapter<>();
+ compactionPlanOperator.setOutput(planEventOutput);
+ commitEventOutput = new CollectOutputAdapter<>();
+ compactOperator.setOutput(commitEventOutput);
}
public void close() throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 4a83dad542ff..5d0bd8cd6157 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -151,7 +151,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.stateInitializationContext = new MockStateInitializationContext();
this.indexStateInitializationContext = new
MockStateInitializationContext();
this.coordinatorStateStore = new TreeMap<>();
- this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
+ this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf) ||
OptionsResolver.needsAsyncMetadataCompaction(conf);
this.isStreamingWriteIndexEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2fc626487057..6c1cc68705cc 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -547,8 +547,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType) throws
Exception {
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testDataSkippingWithRecordLevelIndex(HoodieTableType tableType, boolean
mdtCompactionEnabled) throws Exception {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
@@ -557,10 +557,15 @@ public class ITTestHoodieDataSource {
.option(FlinkOptions.METADATA_ENABLED, true)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, tableType.name())
+ .option(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS,
mdtCompactionEnabled ? 1 : 10)
.end();
tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1);
+ if (mdtCompactionEnabled) {
+ TestUtils.validateMdtCompactionInstant(tempFile.getAbsolutePath(),
false);
+ }
+
List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where uuid =
'id1'").execute().collect());
assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01,
par1]]");
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 6e1b5d372add..bcfe719e2015 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -467,7 +467,7 @@ public class ITTestSchemaEvolution {
private void doCompact(Configuration conf) throws Exception {
// use sync compaction to ensure compaction finished.
- conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.set(FlinkOptions.COMPACTION_OPERATION_EXECUTE_ASYNC_ENABLED, false);
try (HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf)) {
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 115eec7c88d0..c000fd20fb03 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -53,6 +54,7 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -63,6 +65,7 @@ public class TestCompactionUtil {
private HoodieFlinkTable<?> table;
private HoodieTableMetaClient metaClient;
private Configuration conf;
+ private FlinkHoodieBackedTableMetadataWriter flinkMetadataWriter;
@TempDir
File tempFile;
@@ -82,8 +85,8 @@ public class TestCompactionUtil {
this.metaClient = table.getMetaClient();
// initialize the metadata table path
if (conf.get(FlinkOptions.METADATA_ENABLED)) {
- FlinkHoodieBackedTableMetadataWriter.create(table.getStorageConf(),
table.getConfig(),
- table.getContext(), Option.empty());
+ this.flinkMetadataWriter = (FlinkHoodieBackedTableMetadataWriter)
FlinkHoodieBackedTableMetadataWriter.create(
+ table.getStorageConf(), table.getConfig(), table.getContext(),
Option.empty());
}
}
@@ -182,5 +185,126 @@ public class TestCompactionUtil {
metaClient.reloadActiveTimeline();
return instantTime;
}
+
+ @Test
+ void testScheduleMetadataCompactionSuccess() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+ options.put(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS.key(), "1");
+ beforeEach(options);
+
+ try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient)
flinkMetadataWriter.getWriteClient()) {
+ HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+ HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+
+ // Initially, there should be no pending compactions
+ int initialCount = metadataMetaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstants().size();
+
+ // Call scheduleMetadataCompaction with committed = true when there are
no pending compactions
+ CompactionUtil.scheduleMetadataCompaction(metadataWriteClient, true);
+
+ int finalCount = metadataMetaClient.reloadActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .getInstants().size();
+
+ // A new compaction should have been scheduled, so count should increase
+ assertThat(finalCount, is(initialCount + 1));
+ }
+ }
+
+ @Test
+ void testScheduleMetadataLogCompactionFallback() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+ options.put(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
+
options.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
"true");
+ beforeEach(options);
+
+ try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient)
flinkMetadataWriter.getWriteClient()) {
+ HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+ HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+ // Initially, check the timeline for log compaction instants
+ int initialLogCompactionCount = metadataMetaClient.reloadActiveTimeline()
+ .filterPendingLogCompactionTimeline()
+ .getInstants().size();
+
+ assertEquals(0, initialLogCompactionCount);
+
+ // Call scheduleMetadataCompaction with committed = true
+ CompactionUtil.scheduleMetadataCompaction(metadataWriteClient, true);
+
+ int finalLogCompactionCount = metadataMetaClient.reloadActiveTimeline()
+ .filterPendingLogCompactionTimeline()
+ .getInstants().size();
+
+ // If regular compaction fails, it might fall back to log compaction if
enabled
+ // The exact behavior depends on the implementation, so we just ensure
no exception occurs
+ // and that the method executes properly
+ assertEquals(1, finalLogCompactionCount);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void rollbackLogCompactionWithWriteClient(boolean rollbackSingleInstant)
throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.METADATA_ENABLED.key(), "true");
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "true");
+ options.put(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD.key(), "1");
+
options.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(),
"true");
+ beforeEach(options);
+
+ try (HoodieFlinkWriteClient metadataWriteClient = (HoodieFlinkWriteClient)
flinkMetadataWriter.getWriteClient()) {
+ HoodieFlinkTable metadataTable = metadataWriteClient.getHoodieTable();
+ HoodieTableMetaClient metadataMetaClient = metadataTable.getMetaClient();
+
+ // Generate a log compaction plan and transition it to inflight
+ String instantTime = generateLogCompactionPlan(metadataMetaClient,
metadataTable);
+
+ // Verify that the log compaction is in inflight state
+ List<HoodieInstant> inflightInstants =
metadataMetaClient.getActiveTimeline()
+ .filterPendingLogCompactionTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.INFLIGHT)
+ .getInstants();
+ assertThat("There should be one inflight log compaction instant",
inflightInstants.size(), is(1));
+ assertEquals(instantTime, inflightInstants.get(0).requestedTime());
+
+ // Call the rollbackLogCompaction method with writeClient
+ if (rollbackSingleInstant) {
+ CompactionUtil.rollbackLogCompaction(metadataTable, instantTime,
metadataWriteClient.getTransactionManager());
+ } else {
+ CompactionUtil.rollbackLogCompaction(metadataTable,
metadataWriteClient);
+ }
+
+ // Reload the timeline to check the state after rollback
+ metadataMetaClient.reloadActiveTimeline();
+
+ // Check that the log compaction instant is now in REQUESTED state
(rolled back)
+ List<HoodieInstant> requestedInstants =
metadataMetaClient.getActiveTimeline()
+ .filterPendingLogCompactionTimeline()
+ .getInstants();
+
+ assertThat("There should be no requested instant after rollback",
requestedInstants.size(), is(0));
+ }
+ }
+
+ /**
+ * Generates a log compaction plan on the timeline and returns its instant
time.
+ */
+ private String generateLogCompactionPlan(HoodieTableMetaClient metaClient,
HoodieFlinkTable table) {
+ HoodieCompactionOperation operation = new HoodieCompactionOperation();
+ HoodieCompactionPlan plan = new
HoodieCompactionPlan(Collections.singletonList(operation),
Collections.emptyMap(), 1, null, null, null);
+ String instantTime = WriteClientTestUtils.createNewInstantTime();
+ HoodieInstant logCompactionInstant =
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
+
metaClient.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
plan);
+
table.getActiveTimeline().transitionLogCompactionRequestedToInflight(logCompactionInstant);
+ metaClient.reloadActiveTimeline();
+ return instantTime;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index adbee35dd7c6..f7e697b44f85 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -30,6 +31,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.storage.StoragePath;
@@ -48,6 +50,8 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -76,6 +80,21 @@ public class TestUtils {
.orElse(null);
}
+ public static void validateMdtCompactionInstant(String basePath, boolean
isLogCompaction) throws IOException {
+ String baseMdtPath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
+ String compactInstant = TestUtils.getLastCompleteInstant(baseMdtPath);
+ assertNotNull(compactInstant);
+ WriteOperationType writeOperationType = isLogCompaction ?
WriteOperationType.LOG_COMPACT : WriteOperationType.COMPACT;
+ assertEquals(writeOperationType, TestUtils.getOperationType(baseMdtPath,
compactInstant));
+ }
+
+ private static WriteOperationType getOperationType(String basePath, String
instantTs) throws IOException {
+ final HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+ new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new
Configuration())), basePath);
+ HoodieInstant hoodieInstant =
metaClient.getCommitsTimeline().filter(instant ->
instant.requestedTime().equals(instantTs)).firstInstant().get();
+ return
metaClient.getCommitTimeline().readCommitMetadata(hoodieInstant).getOperationType();
+ }
+
public static String getLastDeltaCompleteInstant(String basePath) {
final HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new
Configuration())), basePath);