This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch flink-incremental-compaction in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e18fc63d42e843fbcc4f304821dadff1d4400a3e Author: zhangyue143 <[email protected]> AuthorDate: Mon Dec 9 11:21:32 2024 +0800 flink schedule compaction with incremental partitions --- .../hudi/client/BaseHoodieTableServiceClient.java | 9 +- .../java/org/apache/hudi/table/HoodieTable.java | 4 +- .../compact/ScheduleCompactionActionExecutor.java | 13 +- .../BaseHoodieCompactionPlanGenerator.java | 16 ++- .../generators/HoodieCompactionPlanGenerator.java | 7 +- .../HoodieLogCompactionPlanGenerator.java | 4 +- .../hudi/table/HoodieFlinkCopyOnWriteTable.java | 4 +- .../hudi/table/HoodieFlinkMergeOnReadTable.java | 13 +- .../hudi/table/HoodieJavaCopyOnWriteTable.java | 4 +- .../hudi/table/HoodieJavaMergeOnReadTable.java | 8 +- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 4 +- .../hudi/table/HoodieSparkMergeOnReadTable.java | 8 +- .../apache/hudi/configuration/FlinkOptions.java | 6 + .../hudi/sink/StreamWriteOperatorCoordinator.java | 140 ++++++++++++++++++--- .../java/org/apache/hudi/util/CompactionUtil.java | 14 ++- .../apache/hudi/sink/ITTestDataStreamWrite.java | 14 +++ ...CoordinatorWithIncrementalCompactionEnable.java | 63 ++++++++++ ...eadWithIncrementalScheduleCompactionEnable.java | 29 +++++ .../org/apache/hudi/utils/TestCompactionUtil.java | 4 +- .../table/action/compact/TestHoodieCompactor.java | 4 +- 20 files changed, 313 insertions(+), 55 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index e96a6707247..6328dfe6d65 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -116,6 +116,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl protected transient AsyncArchiveService asyncArchiveService; protected Set<String> pendingInflightAndRequestedInstants; + protected Option<Set<String>> specificPartitions = Option.empty(); protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, @@ -162,6 +163,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl } } + public void setSpecificPartitions(Option<Set<String>> specificPartitions) { + this.specificPartitions = specificPartitions; + } + protected void setPendingInflightAndRequestedInstants(Set<String> pendingInflightAndRequestedInstants) { this.pendingInflightAndRequestedInstants = pendingInflightAndRequestedInstants; } @@ -663,8 +668,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl break; case COMPACT: LOG.info("Scheduling compaction at instant time: {}", instantTime); - Option<HoodieCompactionPlan> compactionPlan = table - .scheduleCompaction(context, instantTime, extraMetadata); + Option<HoodieCompactionPlan> compactionPlan = + table.scheduleCompaction(context, instantTime, extraMetadata, specificPartitions); option = compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); break; case LOG_COMPACT: diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 19b6624922a..e36b36ef78c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -452,11 +452,13 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { * @param context HoodieEngineContext * @param instantTime Instant Time for scheduling compaction * @param extraMetadata additional metadata to write into plan + * @param specificPartitions specific partitions to do compaction * @return */ public abstract Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, - Option<Map<String, String>> extraMetadata); + Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions); /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index e83800e45da..7124e17ec0d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -46,6 +46,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Map; +import java.util.Set; import static org.apache.hudi.common.util.CollectionUtils.nonEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -56,26 +57,26 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec private WriteOperationType operationType; private final Option<Map<String, String>> extraMetadata; private BaseHoodieCompactionPlanGenerator planGenerator; - public ScheduleCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, Option<Map<String, String>> extraMetadata, - WriteOperationType operationType) { + WriteOperationType operationType, + Option<Set<String>> specificPartitions) { super(context, config, table, instantTime); this.extraMetadata = extraMetadata; this.operationType = operationType; checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, "Only COMPACT and LOG_COMPACT is supported"); - initPlanGenerator(context, config, table); + initPlanGenerator(context, config, table, specificPartitions); } - private void initPlanGenerator(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { + private void initPlanGenerator(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, Option<Set<String>> specificPartitions) { if (WriteOperationType.COMPACT.equals(operationType)) { - planGenerator = new HoodieCompactionPlanGenerator(table, context, config); + planGenerator = new HoodieCompactionPlanGenerator(table, context, config, specificPartitions); } else { - planGenerator = new HoodieLogCompactionPlanGenerator(table, context, config); + planGenerator = new HoodieLogCompactionPlanGenerator(table, context, config, specificPartitions); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 79059c2ca34..115f05b2381 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -50,6 +50,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -64,11 +65,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa protected final HoodieTable<T, I, K, O> hoodieTable; protected final HoodieWriteConfig writeConfig; protected final transient HoodieEngineContext engineContext; + private final Option<Set<String>> specificPartitions; - public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, + Option<Set<String>> specificPartitions) { this.hoodieTable = table; this.writeConfig = writeConfig; this.engineContext = engineContext; + this.specificPartitions = specificPartitions; } @Nullable @@ -82,8 +86,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient); - List<String> partitionPaths = FSUtils.getAllPartitionPaths( - engineContext, metaClient.getStorage(), writeConfig.getMetadataConfig(), metaClient.getBasePath()); + List<String> partitionPaths; + if (specificPartitions != null && specificPartitions.isPresent() && !specificPartitions.get().isEmpty()) { + LOG.info("Schedule compaction based on specific partitions " + specificPartitions.get()); + partitionPaths = new ArrayList<>(specificPartitions.get()); + } else { + LOG.info("Get all table partitions."); + partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metaClient.getStorage(), writeConfig.getMetadataConfig(), metaClient.getBasePath()); + } int allPartitionSize = partitionPaths.size(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java index a93ece710b0..8fd983f939b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Set; import static java.util.stream.Collectors.toList; @@ -43,8 +45,9 @@ public class HoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, private final CompactionStrategy compactionStrategy; - public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); + public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, + Option<Set<String>> specificPartitions) { + super(table, engineContext, writeConfig, specificPartitions); this.compactionStrategy = writeConfig.getCompactionStrategy(); LOG.info("Compaction Strategy used is: " + compactionStrategy.toString()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index a81ee663fa9..27a347f670b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -46,8 +46,8 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I, private static final Logger LOG = LoggerFactory.getLogger(HoodieLogCompactionPlanGenerator.class); - public HoodieLogCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); + public HoodieLogCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, Option<Set<String>> specificPartitions) { + super(table, engineContext, writeConfig, specificPartitions); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 4fd217ce4bd..9106eab85b7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -75,6 +75,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -318,7 +319,8 @@ public class HoodieFlinkCopyOnWriteTable<T> } @Override - public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { + public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 361ea7a6e6f..7ee793eb6c8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -47,6 +47,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** * Flink MERGE_ON_READ table. @@ -100,12 +101,12 @@ public class HoodieFlinkMergeOnReadTable<T> } @Override - public Option<HoodieCompactionPlan> scheduleCompaction( - HoodieEngineContext context, - String instantTime, - Option<Map<String, String>> extraMetadata) { + public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, + String instantTime, + Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT, specificPartitions); return scheduleCompactionExecutor.execute(); } @@ -121,7 +122,7 @@ public class HoodieFlinkMergeOnReadTable<T> @Override public Option<HoodieCompactionPlan> scheduleLogCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT, Option.empty()); return scheduleLogCompactionExecutor.execute(); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 16e194e2b66..bf02b3d9e7f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -80,6 +80,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; public class HoodieJavaCopyOnWriteTable<T> extends HoodieJavaTable<T> implements HoodieCompactionHandler<T> { @@ -185,7 +186,8 @@ public class HoodieJavaCopyOnWriteTable<T> @Override public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, - Option<Map<String, String>> extraMetadata) { + Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { throw new HoodieNotSupportedException("ScheduleCompaction is not supported on a CopyOnWrite table"); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index 8c2edc4f3d3..065ee747a7b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -42,6 +42,7 @@ import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import java.util.List; import java.util.Map; +import java.util.Set; public class HoodieJavaMergeOnReadTable<T> extends HoodieJavaCopyOnWriteTable<T> { protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { @@ -72,9 +73,10 @@ public class HoodieJavaMergeOnReadTable<T> extends HoodieJavaCopyOnWriteTable<T> } @Override - public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { + public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT, specificPartitions); return scheduleCompactionExecutor.execute(); } @@ -90,7 +92,7 @@ public class HoodieJavaMergeOnReadTable<T> extends HoodieJavaCopyOnWriteTable<T> @Override public Option<HoodieCompactionPlan> scheduleLogCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT, Option.empty()); return scheduleLogCompactionExecutor.execute(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 67a7658a309..de75583fcaa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -89,6 +89,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable; @@ -171,7 +172,8 @@ public class HoodieSparkCopyOnWriteTable<T> } @Override - public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { + public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index f5929fdc667..761e4a78f48 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -66,6 +66,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable; @@ -140,9 +141,10 @@ public class HoodieSparkMergeOnReadTable<T> extends HoodieSparkCopyOnWriteTable< } @Override - public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { + public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata, + Option<Set<String>> specificPartitions) { ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT, specificPartitions); return scheduleCompactionExecutor.execute(); } @@ -163,7 +165,7 @@ public class HoodieSparkMergeOnReadTable<T> extends HoodieSparkCopyOnWriteTable< @Override public Option<HoodieCompactionPlan> scheduleLogCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) { ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new ScheduleCompactionActionExecutor( - context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT); + context, config, this, instantTime, extraMetadata, WriteOperationType.LOG_COMPACT, Option.empty()); return scheduleLogCompactionExecutor.execute(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 36af35435fe..7ef794e6395 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -699,6 +699,12 @@ public class FlinkOptions extends HoodieConfig { .noDefaultValue() .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism"); + public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS = ConfigOptions + .key("compaction.schedule.incremental.partitions") + .booleanType() + .defaultValue(false) + .withDescription("Schedule the compaction plan based on all the partitions related to commit meta since last compete compaction."); + public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; public static final String NUM_AND_TIME = "num_and_time"; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index fbb2a5ff391..e1fd9fad19f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -21,13 +21,20 @@ package org.apache.hudi.sink; import org.apache.hudi.adapter.OperatorCoordinatorAdapter; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -39,6 +46,7 @@ import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.meta.CkpMetadataFactory; +import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.storage.StorageConfiguration; @@ -49,6 +57,7 @@ import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -58,8 +67,13 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -70,8 +84,12 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -165,6 +183,8 @@ public class StreamWriteOperatorCoordinator * The client id heartbeats. */ private ClientIds clientIds; + private Map<String, String> cacheWrittenPartitions; + private String lastInstant = ""; /** * Constructs a StreamingSinkOperatorCoordinator. @@ -231,6 +251,9 @@ public class StreamWriteOperatorCoordinator if (this.clientIds != null) { this.clientIds.close(); } + if (cacheWrittenPartitions != null) { + cacheWrittenPartitions.clear(); + } } @Override @@ -238,7 +261,7 @@ public class StreamWriteOperatorCoordinator executor.execute( () -> { try { - result.complete(new byte[0]); + result.complete(doCheckpointCoordinatorStatus(this.lastInstant)); } catch (Throwable throwable) { // when a checkpoint fails, throws directly. result.completeExceptionally( @@ -260,9 +283,10 @@ public class StreamWriteOperatorCoordinator // for streaming mode, commits the ever received events anyway, // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) - final boolean committed = commitInstant(this.instant, checkpointId); + final Pair<Boolean, List<WriteStatus>> committedPair = commitInstant(this.instant, checkpointId); // schedules the compaction or clustering if it is enabled in stream execution mode - scheduleTableServices(committed); + scheduleTableServices(committedPair); + boolean committed = committedPair.getLeft(); if (committed) { // start new instant. @@ -275,8 +299,11 @@ public class StreamWriteOperatorCoordinator } @Override - public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { - // no operation + public void resetToCheckpoint(long checkpointID, byte[] checkpointData) throws Exception { + if (checkpointData != null && checkpointData.length > 0) { + this.lastInstant = resetCheckpointCoordinatorStatus(checkpointData); + LOG.info("Reset to checkpoint last instant is " + lastInstant); + } } @Override @@ -321,6 +348,81 @@ public class StreamWriteOperatorCoordinator // Utilities // ------------------------------------------------------------------------- + private void scheduleCompactionInternal(Pair<Boolean, List<WriteStatus>> committedPair) { + boolean committed = committedPair.getLeft(); + if (committed && tableState.scheduleIncrementalPartitions) { + updateCachePartitions(committedPair.getRight()); + } + boolean res = CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, + committed, Option.of(cacheWrittenPartitions.keySet())); + if (res && tableState.scheduleIncrementalPartitions) { + this.cacheWrittenPartitions.clear(); + this.lastInstant = this.instant; + LOG.info("Compaction scheduled , clear cacheWrittenPartitions. And reset last instant to " + this.lastInstant); + } + } + + private void updateCachePartitions(List<WriteStatus> writeResults) { + initCachePartitions(); + writeResults.forEach(status -> { + this.cacheWrittenPartitions.putIfAbsent(status.getStat().getPartitionPath(), ""); + }); + } + + // take lastInstant as state only + public byte[] doCheckpointCoordinatorStatus(String lastInstant) throws IOException { + if (tableState.scheduleIncrementalPartitions) { + ValidationUtils.checkArgument(lastInstant != null); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + out.writeInt(lastInstant.length()); + out.write(lastInstant.getBytes()); + out.flush(); + LOG.info("Finish to do checkpoint last instant : " + lastInstant); + return baos.toByteArray(); + } else { + return new byte[0]; + } + } + + public String resetCheckpointCoordinatorStatus(byte[] checkpointData) throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData); + ObjectInputStream ois = new ObjectInputStream(bais); + int instantSite = ois.readInt(); + byte[] instantBytes = new byte[instantSite]; + ois.readFully(instantBytes); + return new String(instantBytes); + } + + private void initCachePartitions() { + if (cacheWrittenPartitions != null) { + return; + } + long start = System.currentTimeMillis(); + this.cacheWrittenPartitions = new ConcurrentHashMap<>(); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline().reload(); + // get active timeline last commit instant or pending compaction instant as millstone instant + // only works for mor table + HoodieTimeline resTimeline; + if (StringUtils.nonEmpty(this.lastInstant)) { + String millstoneInstant = this.lastInstant; + resTimeline = activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(millstoneInstant); + } else { + Option<HoodieInstant> activeLastInstant = activeTimeline.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, COMPACTION_ACTION)).lastInstant(); + String millstoneInstant = activeLastInstant.isPresent() ? activeLastInstant.get().requestedTime() : INIT_INSTANT_TS; + resTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants().findInstantsAfter(millstoneInstant); + } + resTimeline.getInstantsAsStream().forEach(instant -> { + LOG.info("Reading " + instant); + HoodieCommitMetadata metadata = WriteProfiles.getCommitMetadata(conf.getString(FlinkOptions.TABLE_NAME), new Path(conf.getString(FlinkOptions.PATH)), instant, activeTimeline); + for (HoodieWriteStat writeStat : metadata.getWriteStats()) { + cacheWrittenPartitions.putIfAbsent(writeStat.getPartitionPath(), ""); + } + }); + long end = System.currentTimeMillis(); + LOG.info("Finish to init cache partitions, using " + (end - start) + " mills. " + cacheWrittenPartitions); + } + private void initHiveSync() { this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); this.hiveSyncContext = HiveSyncContext.create(conf, this.storageConf); @@ -420,7 +522,10 @@ public class StreamWriteOperatorCoordinator if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } - commitInstant(instant); + Pair<Boolean, List<WriteStatus>> pair = commitInstant(instant); + if (pair.getLeft() && tableState.scheduleIncrementalPartitions) { + updateCachePartitions(pair.getRight()); + } } // stop the heartbeat for old instant if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { @@ -460,7 +565,8 @@ public class StreamWriteOperatorCoordinator addEventToBuffer(event); if (allEventsReceived()) { // start to commit the instant. - boolean committed = commitInstant(this.instant); + Pair<Boolean, List<WriteStatus>> commitPair = commitInstant(this.instant); + boolean committed = commitPair.getLeft(); if (committed) { // The executor thread inherits the classloader of the #handleEventFromOperator // caller, which is a AppClassLoader. @@ -468,19 +574,19 @@ public class StreamWriteOperatorCoordinator // sync Hive synchronously if it is enabled in batch mode. syncHive(); // schedules the compaction or clustering if it is enabled in batch execution mode - scheduleTableServices(true); + scheduleTableServices(commitPair); } } } - private void scheduleTableServices(Boolean committed) { + private void scheduleTableServices(Pair<Boolean, List<WriteStatus>> committedPair) { // if compaction is on, schedule the compaction if (tableState.scheduleCompaction) { - CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed); + scheduleCompactionInternal(committedPair); } // if clustering is on, schedule the clustering if (tableState.scheduleClustering) { - ClusteringUtil.scheduleClustering(conf, writeClient, committed); + ClusteringUtil.scheduleClustering(conf, writeClient, committedPair.getLeft()); } } @@ -524,7 +630,7 @@ public class StreamWriteOperatorCoordinator /** * Commits the instant. */ - private boolean commitInstant(String instant) { + private Pair<Boolean, List<WriteStatus>> commitInstant(String instant) { return commitInstant(instant, -1); } @@ -533,10 +639,10 @@ public class StreamWriteOperatorCoordinator * * @return true if the write statuses are committed successfully. */ - private boolean commitInstant(String instant, long checkpointId) { + private Pair<Boolean, List<WriteStatus>> commitInstant(String instant, long checkpointId) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { // The last checkpoint finished successfully. - return false; + return Pair.of(false, new ArrayList<>()); } List<WriteStatus> writeResults = Arrays.stream(eventBuffer) @@ -554,10 +660,10 @@ public class StreamWriteOperatorCoordinator if (checkpointId != -1) { sendCommitAckEvents(checkpointId); } - return false; + return Pair.of(false, writeResults); } doCommit(instant, writeResults); - return true; + return Pair.of(true, writeResults); } /** @@ -679,6 +785,7 @@ public class StreamWriteOperatorCoordinator final boolean syncHive; final boolean syncMetadata; final boolean isDeltaTimeCompaction; + final boolean scheduleIncrementalPartitions; private TableState(Configuration conf) { this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); @@ -690,6 +797,7 @@ public class StreamWriteOperatorCoordinator this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf); + this.scheduleIncrementalPartitions = conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS); } public static TableState create(Configuration conf) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 657bbdbea60..89c1a790c86 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; +import java.util.Set; /** * Utilities for flink hudi compaction. @@ -52,17 +53,22 @@ public class CompactionUtil { * @param deltaTimeCompaction Whether the compaction is trigger by elapsed delta time * @param committed Whether the last instant was committed successfully */ - public static void scheduleCompaction( + public static boolean scheduleCompaction( HoodieFlinkWriteClient<?> writeClient, boolean deltaTimeCompaction, - boolean committed) { + boolean committed, + Option<Set<String>> specificPartitions) { + if (specificPartitions.isPresent() && !specificPartitions.get().isEmpty()) { + writeClient.getTableServiceClient().setSpecificPartitions(specificPartitions); + } if (committed) { - writeClient.scheduleCompaction(Option.empty()); + return writeClient.scheduleCompaction(Option.empty()).isPresent(); } else if (deltaTimeCompaction) { // if there are no new commits and the compaction trigger strategy is based on elapsed delta time, // schedules the compaction anyway. - writeClient.scheduleCompaction(Option.empty()); + return writeClient.scheduleCompaction(Option.empty()).isPresent(); } + return false; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index a5d3b3ece4d..96ba7f7ad1f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -171,6 +171,20 @@ public class ITTestDataStreamWrite extends TestLogger { testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED); } + @ParameterizedTest + @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) + public void testWriteMergeOnReadWithCompactionIncremental(String indexType) throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, true); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid"); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "uuid"); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED); + } + @Test public void testWriteCopyOnWriteWithClustering() throws Exception { testWriteCopyOnWriteWithClustering(false); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java new file mode 100644 index 00000000000..241929042aa --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java @@ -0,0 +1,63 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.sink; + import org.apache.hudi.configuration.FlinkOptions; + import org.apache.hudi.sink.event.WriteMetadataEvent; + import org.apache.hudi.sink.utils.MockCoordinatorExecutor; + import org.apache.hudi.utils.TestConfigurations; + import org.apache.flink.configuration.Configuration; + import org.apache.flink.runtime.jobgraph.OperatorID; + import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; + import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.BeforeEach; + import org.junit.jupiter.api.io.TempDir; + import org.junit.jupiter.params.ParameterizedTest; + import org.junit.jupiter.params.provider.ValueSource; + import java.io.File; + import static org.junit.jupiter.api.Assertions.assertEquals; + /** + * Test cases for StreamingSinkOperatorCoordinator. + */ + public class TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable { + private StreamWriteOperatorCoordinator coordinator; + @TempDir + File tempFile; + @BeforeEach + public void before() throws Exception { + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, true); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); + coordinator.handleEventFromOperator(1, WriteMetadataEvent.emptyBootstrap(1)); + } + @AfterEach + public void after() throws Exception { + coordinator.close(); + } + @ParameterizedTest + @ValueSource(strings = {"", "20241028145729909"}) + void testStreamOperatorCheckpoint(String lastInstant) throws Exception { + byte[] cpStatus = coordinator.doCheckpointCoordinatorStatus(lastInstant); + String res = coordinator.resetCheckpointCoordinatorStatus(cpStatus); + assertEquals(res, lastInstant); + } + } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java new file mode 100644 index 00000000000..a1e42fb8a04 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java @@ -0,0 +1,29 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.sink; + import org.apache.hudi.configuration.FlinkOptions; + import org.apache.flink.configuration.Configuration; + /** + * Test cases for delta stream write. + */ + public class TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable extends TestWriteMergeOnRead { + @Override + protected void setUp(Configuration conf) { + conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, true); + } + } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 111680bdcdb..e5bddf725fc 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -142,7 +142,7 @@ public class TestCompactionUtil { TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf); try (HoodieFlinkWriteClient<?> writeClient = FlinkWriteClients.createWriteClient(conf)) { - CompactionUtil.scheduleCompaction(writeClient, true, true); + CompactionUtil.scheduleCompaction(writeClient, true, true, Option.empty()); Option<HoodieInstant> pendingCompactionInstant = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant(); assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan expects to be scheduled"); @@ -152,7 +152,7 @@ public class TestCompactionUtil { TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too close writeClient.startCommit(); - CompactionUtil.scheduleCompaction(writeClient, true, false); + CompactionUtil.scheduleCompaction(writeClient, true, false, Option.empty()); int numCompactionCommits = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants(); assertThat("Two compaction plan expects to be scheduled", numCompactionCommits, is(2)); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index c32d0c6739a..21634fbe3f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -135,7 +135,7 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); String compactionInstantTime = writeClient.createNewInstantTime(); assertThrows(HoodieNotSupportedException.class, () -> { - table.scheduleCompaction(context, compactionInstantTime, Option.empty()); + table.scheduleCompaction(context, compactionInstantTime, Option.empty(), Option.empty()); table.compact(context, compactionInstantTime); }); @@ -158,7 +158,7 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); String compactionInstantTime = writeClient.createNewInstantTime(); - Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty()); + Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty(), Option.empty()); assertFalse(plan.isPresent(), "If there is nothing to compact, result will be empty"); // Verify compaction.requested, compaction.completed metrics counts.
