This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch incremental-TableService in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 85270558328b62913a92a83d87d0d3988cdd080f Author: YueZhang <[email protected]> AuthorDate: Wed Jan 8 17:55:09 2025 +0800 finsh base conding && need more test --- .../action/BaseTableServicePlanActionExecutor.java | 163 +++++++++++++++++++++ ...java => IncrementalPartitionAwareStrategy.java} | 28 ++-- .../cluster/ClusteringPlanActionExecutor.java | 11 +- .../cluster/ClusteringPlanPartitionFilter.java | 14 +- .../cluster/strategy/ClusteringPlanStrategy.java | 3 +- .../PartitionAwareClusteringPlanStrategy.java | 33 +++-- .../compact/ScheduleCompactionActionExecutor.java | 8 +- .../BaseHoodieCompactionPlanGenerator.java | 24 +-- .../generators/HoodieCompactionPlanGenerator.java | 18 ++- .../HoodieLogCompactionPlanGenerator.java | 25 +++- .../strategy/BoundedIOCompactionStrategy.java | 5 +- .../BoundedPartitionAwareCompactionStrategy.java | 33 ++++- .../compact/strategy/CompactionStrategy.java | 20 ++- .../strategy/CompositeCompactionStrategy.java | 20 ++- .../strategy/DayBasedCompactionStrategy.java | 12 +- .../LogFileNumBasedCompactionStrategy.java | 15 +- .../LogFileSizeBasedCompactionStrategy.java | 14 +- .../PartitionRegexBasedCompactionStrategy.java | 9 +- .../strategy/UnBoundedCompactionStrategy.java | 3 +- .../UnBoundedPartitionAwareCompactionStrategy.java | 14 +- .../TestPartitionAwareClusteringPlanStrategy.java | 6 +- ...inkSizeBasedClusteringPlanStrategyRecently.java | 5 +- .../TestSparkClusteringPlanPartitionFilter.java | 10 +- .../strategy/TestHoodieCompactionStrategy.java | 55 ++++--- .../src/main/avro/HoodieClusteringPlan.avsc | 10 +- .../src/main/avro/HoodieCompactionOperation.avsc | 8 + .../hudi/common/testutils/CompactionTestUtils.java | 2 +- .../org/apache/hudi/utils/TestClusteringUtil.java | 2 +- .../org/apache/hudi/utils/TestCompactionUtil.java | 2 +- 29 files changed, 437 insertions(+), 135 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java new file mode 100644 index 00000000000..bc3bbfa6f65 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.TableServiceType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; + +public abstract class BaseTableServicePlanActionExecutor<T, I, K, O, R> extends BaseActionExecutor<T, I, K, O, R> { + + private static final Logger LOG = LoggerFactory.getLogger(BaseTableServicePlanActionExecutor.class); + + public BaseTableServicePlanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, + HoodieTable<T, I, K, O> table, String instantTime) { + super(context, config, table, instantTime); + } + + /** + * Get partitions, if strategy implement `IncrementalPartitionAwareStrategy` then return incremental partitions, + * otherwise return all partitions of the table + * @param strategy + * @return + */ + public List<String> getPartitions(Object strategy, TableServiceType type) { + if (strategy instanceof IncrementalPartitionAwareStrategy) { + try { + // get incremental partitions. + Set<String> incrementalPartitions = getIncrementalPartitions(type); + if (!incrementalPartitions.isEmpty()) { + return new ArrayList<>(incrementalPartitions); + } + } catch (Exception ex) { + LOG.warn("Failed to get incremental partitions", ex); + } + } + + // fall back to get all partitions + return FSUtils.getAllPartitionPaths(context, table.getMetaClient().getStorage(), + config.getMetadataConfig(), table.getMetaClient().getBasePath()); + } + + private Set<String> getIncrementalPartitions(TableServiceType type) { + Pair<Option<HoodieInstant>, List<String>> missingPair = fetchMissingPartitions(type); + if (!missingPair.getLeft().isPresent()) { + // Last complete table service commit maybe archived. + return Collections.emptySet(); + } + + List<String> missingPartitions = missingPair.getRight(); + + String leftBoundary = missingPair.getLeft().get().requestedTime(); + String rightBoundary = instantTime; + + // compute [leftBoundary, rightBoundary) as time window + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + Set<String> partitionsInCommitMeta = table.getActiveTimeline().filterCompletedInstants().getCommitsTimeline().getInstants().stream().flatMap(instant -> { + if (!instant.getAction().equals(CLUSTERING_ACTION)) { + String completionTime = instant.getCompletionTime(); + if (completionTime.compareTo(leftBoundary) <= 0 && completionTime.compareTo(rightBoundary) > 0) { + try { + HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant, activeTimeline); + return metadata.getWriteStats().stream().map(HoodieWriteStat::getPartitionPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to get commit meta " + instant, e); + } + } + } + return Stream.empty(); + }).collect(Collectors.toSet()); + + partitionsInCommitMeta.addAll(missingPartitions); + return partitionsInCommitMeta; + } + + private Pair<Option<HoodieInstant>, List<String>> fetchMissingPartitions(TableServiceType tableServiceType) { + Option<HoodieInstant> instant = Option.empty(); + List<String> missingPartitions = new ArrayList<>(); + + switch (tableServiceType) { + case COMPACT: + case LOG_COMPACT: { + Option<HoodieInstant> lastCompactionCommitInstant = table.getActiveTimeline() + .filterCompletedInstants().getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).lastInstant(); + if (lastCompactionCommitInstant.isPresent()) { + instant = lastCompactionCommitInstant; + String action = tableServiceType.equals(TableServiceType.COMPACT) ? HoodieTimeline.COMPACTION_ACTION : HoodieTimeline.LOG_COMPACTION_ACTION; + HoodieInstant compactionPlanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, action, + instant.get().requestedTime(), InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + Option<byte[]> details = table.getMetaClient().getActiveTimeline().readCompactionPlanAsBytes(compactionPlanInstant); + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(table.getMetaClient(), details); + if (compactionPlan.getMissingSchedulePartitions() != null) { + missingPartitions = compactionPlan.getMissingSchedulePartitions(); + } + } + break; + } + case CLUSTER: { + Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().filterCompletedInstants().getLastClusteringInstant(); + if (lastClusteringInstant.isPresent()) { + instant = lastClusteringInstant; + Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), lastClusteringInstant.get()); + if (clusteringPlan.isPresent() && clusteringPlan.get().getRight().getMissingSchedulePartitions() != null) { + missingPartitions = clusteringPlan.get().getRight().getMissingSchedulePartitions(); + } + } + break; + } + default: + throw new HoodieException("Un-supported incremental table service " + tableServiceType); + } + + return Pair.of(instant, missingPartitions); + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java similarity index 50% copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java index ffc437bcd4c..24dfd9cde3c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java @@ -16,25 +16,27 @@ * limitations under the License. */ -package org.apache.hudi.table.action.compact.strategy; +package org.apache.hudi.table.action; -import org.apache.hudi.avro.model.HoodieCompactionOperation; -import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import java.util.List; /** - * UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a pass-through and will compact - * all the base files which has a log file. This usually means no-intelligence on compaction. + * Marking strategy interface. * - * @see CompactionStrategy + * Any Strategy implement this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing. + * At this time, Incremental partitions should be passed to the current strategy. */ -public class UnBoundedCompactionStrategy extends CompactionStrategy { - - @Override - public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig config, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionWorkloads) { - return operations; - } +public interface IncrementalPartitionAwareStrategy { + + /** + * Filter the given incremental partitions. + * @param writeConfig + * @param incrementalPartitions + * @return Pair of final processing partition paths and filtered partitions which will be recorded as missing partitions. + * Different strategies can individually implement whether to record, or which partitions to record as missing partitions. + */ + Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitions); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java index d698053d7b5..75625e11592 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.cluster; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -31,17 +32,19 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor; import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy; +import org.apache.hudi.util.Lazy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; -public class ClusteringPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> { +public class ClusteringPlanActionExecutor<T, I, K, O> extends BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> { private static final Logger LOG = LoggerFactory.getLogger(ClusteringPlanActionExecutor.class); @@ -84,7 +87,9 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends BaseActionExecutor ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config), new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config); - return strategy.generateClusteringPlan(); + Lazy<List<String>> partitions = Lazy.lazily(() -> getPartitions(strategy, TableServiceType.CLUSTER)); + + return strategy.generateClusteringPlan(partitions); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java index ecc3706f674..06019d3b2b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java @@ -42,13 +42,13 @@ import java.util.stream.Stream; */ public class ClusteringPlanPartitionFilter { - public static List<String> filter(List<String> partitions, HoodieWriteConfig config) { + public static List<String> filter(List<String> partitions, HoodieWriteConfig config, ArrayList<String> missingPartitions) { ClusteringPlanPartitionFilterMode mode = config.getClusteringPlanPartitionFilterMode(); switch (mode) { case NONE: return partitions; case RECENT_DAYS: - return recentDaysFilter(partitions, config); + return recentDaysFilter(partitions, config, missingPartitions); case SELECTED_PARTITIONS: return selectedPartitionsFilter(partitions, config); case DAY_ROLLING: @@ -71,12 +71,14 @@ public class ClusteringPlanPartitionFilter { return selectPt; } - private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config) { + private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config, ArrayList<String> missingPartitions) { int targetPartitionsForClustering = config.getTargetPartitionsForClustering(); int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering(); - return partitions.stream() - .sorted(Comparator.reverseOrder()) - .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) + int skipOffset = Math.max(skipPartitionsFromLatestForClustering, 0); + List<String> sortedPartitions = partitions.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList()); + missingPartitions.addAll(sortedPartitions.subList(0, skipOffset)); + return sortedPartitions.stream() + .skip(skipOffset) .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitions.size()) .collect(Collectors.toList()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java index a6894388f6d..6f4508075eb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -33,6 +33,7 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; +import org.apache.hudi.util.Lazy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +102,7 @@ public abstract class ClusteringPlanStrategy<T,I,K,O> implements Serializable { * * If there is no data available to cluster, return None. */ - public abstract Option<HoodieClusteringPlan> generateClusteringPlan(); + public abstract Option<HoodieClusteringPlan> generateClusteringPlan(Lazy<List<String>> partitions); /** * Check if the clustering can proceed. If not (i.e., return false), the PlanStrategy will generate an empty plan to stop the scheduling. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index 515d6532871..af238cf900f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; @@ -30,14 +29,18 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter; +import org.apache.hudi.util.Lazy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,7 +48,7 @@ import java.util.stream.Stream; /** * Scheduling strategy with restriction that clustering groups can only contain files from same partition. */ -public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends ClusteringPlanStrategy<T,I,K,O> { +public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends ClusteringPlanStrategy<T,I,K,O> implements IncrementalPartitionAwareStrategy { private static final Logger LOG = LoggerFactory.getLogger(PartitionAwareClusteringPlanStrategy.class); public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { @@ -113,14 +116,15 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus /** * Return list of partition paths to be considered for clustering. */ - protected List<String> filterPartitionPaths(List<String> partitionPaths) { - List<String> filteredPartitions = ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig()); + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitions) { + ArrayList<String> missingPartitions = new ArrayList<>(); + List<String> filteredPartitions = ClusteringPlanPartitionFilter.filter(partitions, getWriteConfig(), missingPartitions); LOG.debug("Filtered to the following partitions: " + filteredPartitions); - return filteredPartitions; + return Pair.of(filteredPartitions, missingPartitions); } @Override - public Option<HoodieClusteringPlan> generateClusteringPlan() { + public Option<HoodieClusteringPlan> generateClusteringPlan(Lazy<List<String>> partitions) { if (!checkPrecondition()) { return Option.empty(); } @@ -135,21 +139,24 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus if (StringUtils.isNullOrEmpty(partitionSelected)) { // get matched partitions if set - partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths( - getEngineContext(), metaClient.getStorage(), config.getMetadataConfig(), metaClient.getBasePath())); + partitionPaths = getRegexPatternMatchedPartitions(config, partitions.get()); // filter the partition paths if needed to reduce list status } else { partitionPaths = Arrays.asList(partitionSelected.split(",")); } - partitionPaths = filterPartitionPaths(partitionPaths); + Pair<List<String>, List<String>> partitionsPair = filterPartitionPaths(getWriteConfig(), partitionPaths); + partitionPaths = partitionsPair.getLeft(); + List<String> missingPartitions = partitionsPair.getRight(); LOG.info("Scheduling clustering partitionPaths: {}", partitionPaths); + LOG.info("Missing Scheduled clustering partitionPaths: {}", missingPartitions); if (partitionPaths.isEmpty()) { // In case no partitions could be picked, return no clustering plan return Option.empty(); } + Set<String> processedPartitions = new HashSet<>(); List<HoodieClusteringGroup> clusteringGroups = getEngineContext() .flatMap( partitionPaths, @@ -160,8 +167,15 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus partitionPaths.size()) .stream() .limit(getWriteConfig().getClusteringMaxNumGroups()) + .peek(clusteringGroup -> { + String partitionPath = clusteringGroup.getSlices().get(0).getPartitionPath(); + processedPartitions.add(partitionPath); + }) .collect(Collectors.toList()); + partitionPaths.removeAll(processedPartitions); + missingPartitions.addAll(partitionPaths); + if (clusteringGroups.isEmpty()) { LOG.warn("No data available to cluster"); return Option.empty(); @@ -178,6 +192,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus .setExtraMetadata(getExtraMetadata()) .setVersion(getPlanVersion()) .setPreserveHoodieMetadata(true) + .setMissingSchedulePartitions(missingPartitions) .build()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index e83800e45da..bd97d40f7fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -34,7 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor; import org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator; import org.apache.hudi.table.action.compact.plan.generators.HoodieCompactionPlanGenerator; import org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator; @@ -50,7 +50,7 @@ import java.util.Map; import static org.apache.hudi.common.util.CollectionUtils.nonEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; -public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> { +public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> { private static final Logger LOG = LoggerFactory.getLogger(ScheduleCompactionActionExecutor.class); private WriteOperationType operationType; @@ -73,9 +73,9 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> extends BaseActionExec private void initPlanGenerator(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table) { if (WriteOperationType.COMPACT.equals(operationType)) { - planGenerator = new HoodieCompactionPlanGenerator(table, context, config); + planGenerator = new HoodieCompactionPlanGenerator(table, context, config, this); } else { - planGenerator = new HoodieLogCompactionPlanGenerator(table, context, config); + planGenerator = new HoodieLogCompactionPlanGenerator(table, context, config, this); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 1884f4c5d40..0d392bc5ff9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -41,6 +40,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor; import org.apache.hudi.table.action.compact.CompactHelpers; import org.slf4j.Logger; @@ -64,11 +64,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa protected final HoodieTable<T, I, K, O> hoodieTable; protected final HoodieWriteConfig writeConfig; protected final transient HoodieEngineContext engineContext; + protected final BaseTableServicePlanActionExecutor executor; - public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, + BaseTableServicePlanActionExecutor executor) { this.hoodieTable = table; this.writeConfig = writeConfig; this.engineContext = engineContext; + this.executor = executor; } @Nullable @@ -82,13 +85,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient); - List<String> partitionPaths = FSUtils.getAllPartitionPaths( - engineContext, metaClient.getStorage(), writeConfig.getMetadataConfig(), metaClient.getBasePath()); + List<String> partitionPaths = getPartitions(); int allPartitionSize = partitionPaths.size(); // filter the partition paths if needed to reduce list status - partitionPaths = filterPartitionPathsByStrategy(partitionPaths); + Pair<List<String>, List<String>> partitionPair = filterPartitionPathsByStrategy(partitionPaths); + partitionPaths = partitionPair.getLeft(); + LOG.info("Strategy: {} matched {} partition paths from all {} partitions for table {}", writeConfig.getCompactionStrategy().getClass().getSimpleName(), partitionPaths.size(), allPartitionSize, hoodieTable.getConfig().getBasePath()); @@ -169,7 +173,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa } // Filter the compactions with the passed in filter. This lets us choose most effective compactions only - HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, operations); + HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, operations, partitionPair); ValidationUtils.checkArgument( compactionPlan.getOperations().stream().noneMatch( op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), @@ -182,12 +186,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPa return compactionPlan; } - protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations); + protected abstract List<String> getPartitions(); + + protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>,List<String>> partitionPair); protected abstract boolean filterLogCompactionOperations(); - protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) { - return partitionPaths; + protected Pair<List<String>, List<String>> filterPartitionPathsByStrategy(List<String> partitionPaths) { + return Pair.of(partitionPaths, Collections.emptyList()); } protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java index a93ece710b0..378461f1b98 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java @@ -22,11 +22,13 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.slf4j.Logger; @@ -43,22 +45,28 @@ public class HoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, private final CompactionStrategy compactionStrategy; - public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); + public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, + BaseTableServicePlanActionExecutor executor) { + super(table, engineContext, writeConfig, executor); this.compactionStrategy = writeConfig.getCompactionStrategy(); LOG.info("Compaction Strategy used is: " + compactionStrategy.toString()); } @Override - protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations) { + protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>, List<String>> partitionPair) { // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only return compactionStrategy.generateCompactionPlan(writeConfig, operations, - CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); + CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()), getStrategyParams(), partitionPair); } @Override - protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) { + protected List<String> getPartitions() { + return executor.getPartitions(compactionStrategy, TableServiceType.COMPACT); + } + + @Override + protected Pair<List<String>, List<String>> filterPartitionPathsByStrategy(List<String> partitionPaths) { return compactionStrategy.filterPartitionPaths(writeConfig, partitionPaths); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index a81ee663fa9..5d9337ac4e7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -26,13 +26,16 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor; import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper; import org.slf4j.Logger; @@ -45,25 +48,33 @@ import java.util.stream.Collectors; public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieCompactionPlanGenerator<T, I, K, O> { private static final Logger LOG = LoggerFactory.getLogger(HoodieLogCompactionPlanGenerator.class); + private final HoodieCompactionStrategy compactionStrategy; - public HoodieLogCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - @Override - protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations) { - HoodieCompactionStrategy compactionStrategy = HoodieCompactionStrategy.newBuilder() + public HoodieLogCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, + BaseTableServicePlanActionExecutor executor) { + super(table, engineContext, writeConfig, executor); + this.compactionStrategy = HoodieCompactionStrategy.newBuilder() .setStrategyParams(getStrategyParams()) .setCompactorClassName(LogCompactionExecutionHelper.class.getName()) .build(); + } + + @Override + protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>, List<String>> partitionPair) { return HoodieCompactionPlan.newBuilder() .setOperations(operations) .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION) .setStrategy(compactionStrategy) + .setMissingSchedulePartitions(partitionPair.getRight()) .setPreserveHoodieMetadata(true) .build(); } + @Override + protected List<String> getPartitions() { + return executor.getPartitions(compactionStrategy, TableServiceType.LOG_COMPACT); + } + @Override protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds, Option<InstantRange> instantRange) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java index d93a50fd673..6d7a17ba536 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * CompactionStrategy which looks at total IO to be done for the compaction (read + write) and limits the list of @@ -35,7 +36,7 @@ public class BoundedIOCompactionStrategy extends CompactionStrategy { @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, Set<String> missingPartitions) { // Iterate through the operations in order and accept operations as long as we are within the // IO limit // Preserves the original ordering of compactions @@ -46,7 +47,7 @@ public class BoundedIOCompactionStrategy extends CompactionStrategy { targetIORemaining -= opIo; finalOperations.add(op); if (targetIORemaining <= 0) { - return finalOperations; + missingPartitions.add(op.getPartitionPath()); } } return finalOperations; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java index 09c19b1aabe..bd858862bc7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java @@ -20,14 +20,17 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -46,7 +49,7 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, Set<String> missingPartitions) { // The earliest partition to compact - current day minus the target partitions limit String earliestPartitionPathToCompact = dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); @@ -54,19 +57,35 @@ public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionS return operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet() .stream().sorted(Map.Entry.comparingByKey(DayBasedCompactionStrategy.comparator)) - .filter(e -> DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0) - .flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); + .filter(e -> { + if (DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0) { + return true; + } else { + missingPartitions.add(e.getKey()); + return false; + } + }).flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); } @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) { + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) { // The earliest partition to compact - current day minus the target partitions limit String earliestPartitionPathToCompact = dateFormat.get().format(getDateAtOffsetFromToday(-1 * writeConfig.getTargetPartitionsPerDayBasedCompaction())); + + ArrayList<String> partitionsToProcess = new ArrayList<>(); + ArrayList<String> missingPartitions = new ArrayList<>(); + // Get all partitions and sort them - return partitionPaths.stream().map(partition -> partition.replace("/", "-")) - .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")) - .filter(e -> DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, e) >= 0).collect(Collectors.toList()); + partitionPaths.stream().map(partition -> partition.replace("/", "-")) + .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/")).forEach(partition -> { + if (DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, partition) >= 0) { + partitionsToProcess.add(partition); + } else { + missingPartitions.add(partition); + } + }); + return Pair.of(partitionsToProcess, missingPartitions); } public static Date getDateAtOffsetFromToday(int offset) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java index 5b2dba7ab7a..ac82fd0eb3a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java @@ -23,13 +23,18 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.utils.FileSliceMetricUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Strategy for compaction. Pluggable implementation to define how compaction should be done. The over-ridden @@ -37,7 +42,7 @@ import java.util.Map; * compaction operation to run in a single compaction. Implementation of CompactionStrategy cannot hold any state. * Difference instantiations can be passed in every time */ -public abstract class CompactionStrategy implements Serializable { +public abstract class CompactionStrategy implements IncrementalPartitionAwareStrategy, Serializable { public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; @@ -71,10 +76,13 @@ public abstract class CompactionStrategy implements Serializable { * @return Compaction plan to be scheduled. */ public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig writeConfig, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, Map<String, String> params, Pair<List<String>, List<String>> partitionPair) { // Strategy implementation can overload this method to set specific compactor-id + Set<String> missingPartitions = new HashSet<>(partitionPair.getRight()); + List<HoodieCompactionOperation> operationsToProcess = orderAndFilter(writeConfig, operations, pendingCompactionPlans, missingPartitions); return HoodieCompactionPlan.newBuilder() - .setOperations(orderAndFilter(writeConfig, operations, pendingCompactionPlans)) + .setOperations(operationsToProcess) + .setMissingSchedulePartitions(new ArrayList<>(missingPartitions)) .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION).build(); } @@ -88,7 +96,7 @@ public abstract class CompactionStrategy implements Serializable { * @return list of compactions to perform in this run */ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, Set<String> missingPartitions) { return operations; } @@ -99,7 +107,7 @@ public abstract class CompactionStrategy implements Serializable { * @param allPartitionPaths * @return */ - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { - return allPartitionPaths; + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + return Pair.of(allPartitionPaths, Collections.emptyList()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java index da90269509d..c9583f89ea8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java @@ -21,9 +21,12 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * CompositeCompactionStrategy chains multiple compaction strategies together. @@ -39,21 +42,26 @@ public class CompositeCompactionStrategy extends CompactionStrategy { } @Override - public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, + Set<String> missingPartitions) { List<HoodieCompactionOperation> finalOperations = operations; for (CompactionStrategy strategy : strategies) { - finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, pendingCompactionPlans); + finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, pendingCompactionPlans, missingPartitions); } return finalOperations; } @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { - List<String> finalPartitionPaths = allPartitionPaths; + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + List<String> partitionsToProcess = allPartitionPaths; + List<String> missingPartitions = new ArrayList<>(); + for (CompactionStrategy strategy : strategies) { - finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, finalPartitionPaths); + Pair<List<String>, List<String>> innerRes = strategy.filterPartitionPaths(writeConfig, partitionsToProcess); + partitionsToProcess = innerRes.getLeft(); + missingPartitions.addAll(innerRes.getRight()); } - return finalPartitionPaths; + return Pair.of(partitionsToProcess, missingPartitions); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java index d77d22d05c5..871b1b9fc38 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.compact.strategy; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -61,10 +62,13 @@ public class DayBasedCompactionStrategy extends BoundedIOCompactionStrategy { } @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { - return allPartitionPaths.stream().sorted(comparator) - .collect(Collectors.toList()).subList(0, Math.min(allPartitionPaths.size(), - writeConfig.getTargetPartitionsPerDayBasedCompaction())); + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + List<String> sortedPartitions = allPartitionPaths.stream().sorted(comparator).collect(Collectors.toList()); + int boundary = Math.min(allPartitionPaths.size(), writeConfig.getTargetPartitionsPerDayBasedCompaction()); + + List<String> partitionsToProcess = sortedPartitions.subList(0, boundary); + List<String> missingPartitions = sortedPartitions.subList(boundary, sortedPartitions.size()); + return Pair.of(partitionsToProcess, missingPartitions); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java index 6f79b684d0a..e7024a00ee7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -34,12 +35,20 @@ public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrate implements Comparator<HoodieCompactionOperation> { @Override - public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, + Set<String> missingPartitions) { Long numThreshold = writeConfig.getCompactionLogFileNumThreshold(); List<HoodieCompactionOperation> filterOperator = operations.stream() .filter(e -> e.getDeltaFilePaths().size() >= numThreshold) - .sorted(this).collect(Collectors.toList()); - return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans); + .filter(e -> { + if (e.getDeltaFilePaths().size() >= numThreshold) { + return true; + } else { + missingPartitions.add(e.getPartitionPath()); + return false; + } + }).sorted(this).collect(Collectors.toList()); + return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans, missingPartitions); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java index c165141dfc5..1024f5f9cf0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -39,14 +40,21 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans, Set<String> missingPartitions) { // Filter the file group which log files size is greater than the threshold in bytes. // Order the operations based on the reverse size of the logs and limit them by the IO long threshold = writeConfig.getCompactionLogFileSizeThreshold(); return super.orderAndFilter(writeConfig, operations.stream() - .filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold) + .filter(e -> { + if (e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold) { + return true; + } else { + missingPartitions.add(e.getPartitionPath()); + return false; + } + }) .sorted(this).collect(Collectors.toList()), - pendingCompactionPlans); + pendingCompactionPlans, missingPartitions); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java index eb3b42ee7ca..1ff5316389f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java @@ -18,18 +18,23 @@ package org.apache.hudi.table.action.compact.strategy; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; import java.util.stream.Collectors; public class PartitionRegexBasedCompactionStrategy extends CompactionStrategy { + /** + * Regex filtered partitions are not included in missing partitions. + */ @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { String regex = writeConfig.getCompactionSpecifyPartitionPathRegex(); Pattern pattern = Pattern.compile(regex); - return allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList()); + return Pair.of(allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList()), new ArrayList<>()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java index ffc437bcd4c..fc34a7bd8d1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.config.HoodieWriteConfig; import java.util.List; +import java.util.Set; /** * UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a pass-through and will compact @@ -34,7 +35,7 @@ public class UnBoundedCompactionStrategy extends CompactionStrategy { @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig config, - List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionWorkloads) { + List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionWorkloads, Set<String> missingPartitions) { return operations; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java index 66b5612e1ac..d51ee5a12c7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java @@ -20,11 +20,13 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -40,26 +42,24 @@ public class UnBoundedPartitionAwareCompactionStrategy extends CompactionStrateg @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig config, - final List<HoodieCompactionOperation> operations, final List<HoodieCompactionPlan> pendingCompactionWorkloads) { + final List<HoodieCompactionOperation> operations, final List<HoodieCompactionPlan> pendingCompactionWorkloads, Set<String> missingPartitions) { BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy = new BoundedPartitionAwareCompactionStrategy(); List<HoodieCompactionOperation> operationsToExclude = - boundedPartitionAwareCompactionStrategy.orderAndFilter(config, operations, pendingCompactionWorkloads); + boundedPartitionAwareCompactionStrategy.orderAndFilter(config, operations, pendingCompactionWorkloads, missingPartitions); List<HoodieCompactionOperation> allOperations = new ArrayList<>(operations); allOperations.removeAll(operationsToExclude); return allOperations; } @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) { + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) { List<String> allPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-")).sorted(Comparator.reverseOrder()) .map(partitionPath -> partitionPath.replace("-", "/")).collect(Collectors.toList()); BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy = new BoundedPartitionAwareCompactionStrategy(); - List<String> partitionsToExclude = - boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig, partitionPaths); - allPartitionPaths.removeAll(partitionsToExclude); - return allPartitionPaths; + Pair<List<String>, List<String>> partitionPair = boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig, allPartitionPaths); + return Pair.of(partitionPair.getRight(), partitionPair.getLeft()); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java index a053a961105..3b579f142a3 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.action.cluster.strategy; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -83,11 +84,6 @@ public class TestPartitionAwareClusteringPlanStrategy { super(table, engineContext, writeConfig); } - @Override - protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List list) { - return null; - } - @Override protected Map<String, String> getStrategyParams() { return null; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java index 10046f84f5b..84232cf64fe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.util.Lazy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ public class FlinkSizeBasedClusteringPlanStrategyRecently<T> extends FlinkSizeBa } @Override - public Option<HoodieClusteringPlan> generateClusteringPlan() { + public Option<HoodieClusteringPlan> generateClusteringPlan(Lazy<List<String>> partitions) { if (!checkPrecondition()) { return Option.empty(); } @@ -71,7 +72,7 @@ public class FlinkSizeBasedClusteringPlanStrategyRecently<T> extends FlinkSizeBa List<String> partitionPaths = getPartitionPathInActiveTimeline(hoodieTable); - partitionPaths = filterPartitionPaths(partitionPaths); + partitionPaths = filterPartitionPaths(getWriteConfig(), partitionPaths).getLeft(); if (partitionPaths.isEmpty()) { // In case no partitions could be picked, return no clustering plan diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java index 70643a327d4..c048ac0dedd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java @@ -63,7 +63,7 @@ public class TestSparkClusteringPlanPartitionFilter { fakeTimeBasedPartitionsPath.add("20210718"); fakeTimeBasedPartitionsPath.add("20210716"); fakeTimeBasedPartitionsPath.add("20210719"); - List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + List list = (List)sg.filterPartitionPaths(null, fakeTimeBasedPartitionsPath).getLeft(); assertEquals(3, list.size()); } @@ -81,7 +81,7 @@ public class TestSparkClusteringPlanPartitionFilter { fakeTimeBasedPartitionsPath.add("20210718"); fakeTimeBasedPartitionsPath.add("20210716"); fakeTimeBasedPartitionsPath.add("20210719"); - List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + List list = (List)sg.filterPartitionPaths(null, fakeTimeBasedPartitionsPath).getLeft(); assertEquals(1, list.size()); assertSame("20210718", list.get(0)); } @@ -101,7 +101,7 @@ public class TestSparkClusteringPlanPartitionFilter { fakeTimeBasedPartitionsPath.add("20211221"); fakeTimeBasedPartitionsPath.add("20211222"); fakeTimeBasedPartitionsPath.add("20211224"); - List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + List list = (List)sg.filterPartitionPaths(table.getConfig(), fakeTimeBasedPartitionsPath).getLeft(); assertEquals(1, list.size()); assertSame("20211222", list.get(0)); } @@ -117,7 +117,7 @@ public class TestSparkClusteringPlanPartitionFilter { for (int i = 0; i < 24; i++) { fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i)); } - List filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + List filterPartitions = (List)sg.filterPartitionPaths(null, fakeTimeBasedPartitionsPath).getLeft(); assertEquals(1, filterPartitions.size()); assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), filterPartitions.get(0)); fakeTimeBasedPartitionsPath = new ArrayList<>(); @@ -125,7 +125,7 @@ public class TestSparkClusteringPlanPartitionFilter { fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i)); fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? String.valueOf(i) : "0" + i)); } - filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + filterPartitions =(List)sg.filterPartitionPaths(null, fakeTimeBasedPartitionsPath).getLeft(); assertEquals(2, filterPartitions.size()); int hourOfDay = DateTime.now().getHourOfDay(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index a368d43da25..64d57bfe395 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -63,7 +63,7 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") .withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertEquals(operations, returned, "UnBounded should not re-order or filter"); } @@ -79,7 +79,7 @@ public class TestHoodieCompactionStrategy { HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertTrue(returned.size() < operations.size(), "BoundedIOCompaction should have resulted in fewer compactions"); assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted in 2 compactions being chosen"); @@ -103,7 +103,7 @@ public class TestHoodieCompactionStrategy { .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) .build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertTrue(returned.size() < operations.size(), "LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions"); @@ -137,7 +137,7 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build(); - List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); + List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)).getLeft(); assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions"); List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions); @@ -182,11 +182,11 @@ public class TestHoodieCompactionStrategy { .build()) .build(); - List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); + List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)).getLeft(); assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions"); List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertEquals(1, returned.size(), "DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer compactions"); @@ -241,7 +241,7 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertTrue(returned.size() < operations.size(), "BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions"); @@ -290,7 +290,7 @@ public class TestHoodieCompactionStrategy { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertTrue(returned.size() < operations.size(), "UnBoundedPartitionAwareCompactionStrategy should not include last " @@ -312,7 +312,7 @@ public class TestHoodieCompactionStrategy { .withCompactionLogFileNumThreshold(2).build()) .build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); - List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); + List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>(), Collections.emptySet()); assertTrue(returned.size() < operations.size(), "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions"); @@ -341,7 +341,7 @@ public class TestHoodieCompactionStrategy { List<String> allPartitionPaths = Arrays.asList( "2017/01/01", "2018/01/02", "2017/02/01" ); - List<String> returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths); + List<String> returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths).getLeft(); // filter by num first and then filter by prefix assertEquals(1, returned.size()); assertEquals("2017/01/01", returned.get(0)); @@ -349,7 +349,7 @@ public class TestHoodieCompactionStrategy { writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( HoodieCompactionConfig.newBuilder().withCompactionStrategy(new PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024) .withCompactionLogFileNumThreshold(2).build()).build(); - returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths); + returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths).getLeft(); // filter by prefix first and then filter by num assertEquals(2, returned.size()); assertEquals("2017/01/01", returned.get(0)); @@ -358,15 +358,28 @@ public class TestHoodieCompactionStrategy { public static class NumStrategy extends CompactionStrategy { @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { - return allPartitionPaths.stream().limit(2).collect(Collectors.toList()); + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + List<String> partitionToProcess = allPartitionPaths.stream().limit(2).collect(Collectors.toList()); + List<String> missingPartitions = allPartitionPaths.stream().skip(2).collect(Collectors.toList()); + return Pair.of(partitionToProcess, missingPartitions); } } public static class PrefixStrategy extends CompactionStrategy { @Override - public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { - return allPartitionPaths.stream().filter(s -> s.startsWith("2017")).collect(Collectors.toList()); + public Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) { + ArrayList<String> partitionsToProcess = new ArrayList<>(); + ArrayList<String> missingPartitions = new ArrayList<>(); + + allPartitionPaths.forEach(partition -> { + if (partition.startsWith("2017")) { + partitionsToProcess.add(partition); + } else { + missingPartitions.add(partition); + } + }); + + return Pair.of(partitionsToProcess, missingPartitions); } } @@ -381,21 +394,21 @@ public class TestHoodieCompactionStrategy { ); HoodieWriteConfig writeConfig = updateRegex(".*"); - List<String> filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + List<String> filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(5, filteredPartitions.size()); writeConfig = updateRegex("2020/01/01"); - filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(1, filteredPartitions.size()); assertEquals("2020/01/01", filteredPartitions.get(0)); writeConfig = updateRegex("2020/01/0[1-2]"); - filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(2, filteredPartitions.size()); assertEquals("2020/01/01", filteredPartitions.get(0)); assertEquals("2020/01/02", filteredPartitions.get(1)); writeConfig = updateRegex("2020/01/0[1-2]|2020/02/01"); - filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(3, filteredPartitions.size()); assertEquals("2020/01/01", filteredPartitions.get(0)); assertEquals("2020/01/02", filteredPartitions.get(1)); @@ -403,13 +416,13 @@ public class TestHoodieCompactionStrategy { writeConfig = updateRegex("2020/.*/01"); - filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(2, filteredPartitions.size()); assertEquals("2020/01/01", filteredPartitions.get(0)); assertEquals("2020/02/01", filteredPartitions.get(1)); writeConfig = updateRegex(".*/01/.*"); - filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions); + filteredPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitions).getLeft(); assertEquals(4, filteredPartitions.size()); assertEquals("2020/01/01", filteredPartitions.get(0)); assertEquals("2020/01/02", filteredPartitions.get(1)); diff --git a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc index 87486267d1c..d4a6c207762 100644 --- a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc +++ b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc @@ -50,6 +50,14 @@ "name":"preserveHoodieMetadata", "type":["null", "boolean"], "default": null - } + }, + { + "name":"missingSchedulePartitions", + "type":["null", { + "type":"array", + "items":"string" + }], + "default": null + } ] } diff --git a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc index bab7321f29c..61a96faec87 100644 --- a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc +++ b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc @@ -96,6 +96,14 @@ "name":"preserveHoodieMetadata", "type":["boolean", "null"], "default": false + }, + { + "name":"missingSchedulePartitions", + "type":["null", { + "type":"array", + "items":"string" + }], + "default": null } ] } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java index 2bc8f7fba62..eaed3668c9f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java @@ -192,7 +192,7 @@ public class CompactionTestUtils { } }).collect(Collectors.toList()); return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>(), - CompactionUtils.LATEST_COMPACTION_METADATA_VERSION, null, null); + CompactionUtils.LATEST_COMPACTION_METADATA_VERSION, null, null, null); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java index f35a2792731..c40dd2350e5 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java @@ -131,7 +131,7 @@ public class TestClusteringUtil { private String generateClusteringPlan() { HoodieClusteringGroup group = new HoodieClusteringGroup(); HoodieClusteringPlan plan = new HoodieClusteringPlan(Collections.singletonList(group), - HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(), 1, false); + HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(), 1, false, null); HoodieRequestedReplaceMetadata metadata = new HoodieRequestedReplaceMetadata(WriteOperationType.CLUSTER.name(), plan, Collections.emptyMap(), 1); String instantTime = table.getMetaClient().createNewInstantTime(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 111680bdcdb..ccfd5965cad 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -174,7 +174,7 @@ public class TestCompactionUtil { */ private String generateCompactionPlan() { HoodieCompactionOperation operation = new HoodieCompactionOperation(); - HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1, null, null); + HoodieCompactionPlan plan = new HoodieCompactionPlan(Collections.singletonList(operation), Collections.emptyMap(), 1, null, null, null); String instantTime = table.getMetaClient().createNewInstantTime(); HoodieInstant compactionInstant = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime);
