This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4a993dae5df95e15436b6a9f214fb9a6ff64515a Author: Jon Vexler <[email protected]> AuthorDate: Fri Apr 12 00:08:37 2024 -0400 [HUDI-7290] Don't assume ReplaceCommits are always Clustering (#10479) * fix all usages not in tests * do pass through and fix * fix test that didn't actually use a cluster commit * make method private and fix naming * revert write markers changes --------- Co-authored-by: Jonathan Vexler <=> --- .../hudi/client/BaseHoodieTableServiceClient.java | 10 ++++--- .../org/apache/hudi/table/marker/WriteMarkers.java | 2 ++ .../table/timeline/HoodieDefaultTimeline.java | 31 ++++++++++++++++++++-- .../hudi/common/table/timeline/HoodieTimeline.java | 11 ++++++++ .../table/view/AbstractTableFileSystemView.java | 5 +--- .../table/view/TestHoodieTableFileSystemView.java | 30 +++++++++++++++++++-- .../clustering/ClusteringPlanSourceFunction.java | 2 +- .../java/org/apache/hudi/util/ClusteringUtil.java | 2 +- .../apache/hudi/utilities/HoodieClusteringJob.java | 12 ++++----- 9 files changed, 86 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 909581687d4..e408dc7a779 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -444,8 +444,12 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); if (pendingClusteringTimeline.containsInstant(inflightInstant)) { - table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); - table.getMetaClient().reloadActiveTimeline(); + if (pendingClusteringTimeline.isPendingClusterInstant(inflightInstant.getTimestamp())) { + table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + } else { + throw new HoodieClusteringException("Non clustering replace-commit inflight at timestamp " + clusteringInstant); + } } clusteringTimer = metrics.getClusteringCtx(); LOG.info("Starting clustering at {}", clusteringInstant); @@ -575,7 +579,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl // if just inline schedule is enabled if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering() - && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) { + && !table.getActiveTimeline().getLastPendingClusterInstant().isPresent()) { // proceed only if there are no pending clustering metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true"); inlineScheduleClustering(extraMetadata); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 01c8c99618a..f8fbd13b1c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -87,6 +87,7 @@ public abstract class WriteMarkers implements Serializable { HoodieTimeline pendingReplaceTimeline = activeTimeline.filterPendingReplaceTimeline(); // TODO If current is compact or clustering then create marker directly without early conflict detection. // Need to support early conflict detection between table service and common writers. + // ok to use filterPendingReplaceTimeline().containsInstant because early conflict detection is not relevant for insert overwrite as well if (pendingCompactionTimeline.containsInstant(instantTime) || pendingReplaceTimeline.containsInstant(instantTime)) { return create(partitionPath, fileName, type, false); } @@ -127,6 +128,7 @@ public abstract class WriteMarkers implements Serializable { HoodieTimeline pendingReplaceTimeline = activeTimeline.filterPendingReplaceTimeline(); // TODO If current is compact or clustering then create marker directly without early conflict detection. // Need to support early conflict detection between table service and common writers. + // ok to use filterPendingReplaceTimeline().containsInstant because early conflict detection is not relevant for insert overwrite as well if (pendingCompactionTimeline.containsInstant(instantTime) || pendingReplaceTimeline.containsInstant(instantTime)) { return create(partitionPath, fileName, type, true); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index a26bed061d6..737ec0ca5d9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -503,13 +503,40 @@ public class HoodieDefaultTimeline implements HoodieTimeline { .findFirst()); } + @Override + public Option<HoodieInstant> getFirstPendingClusterInstant() { + return getLastOrFirstPendingClusterInstant(false); + } + @Override public Option<HoodieInstant> getLastPendingClusterInstant() { - return Option.fromJavaOptional(filterPendingReplaceTimeline() - .getReverseOrderedInstants() + return getLastOrFirstPendingClusterInstant(true); + } + + private Option<HoodieInstant> getLastOrFirstPendingClusterInstant(boolean isLast) { + HoodieTimeline replaceTimeline = filterPendingReplaceTimeline(); + Stream<HoodieInstant> replaceStream; + if (isLast) { + replaceStream = replaceTimeline.getReverseOrderedInstants(); + } else { + replaceStream = replaceTimeline.getInstantsAsStream(); + } + return Option.fromJavaOptional(replaceStream .filter(i -> ClusteringUtils.isClusteringInstant(this, i)).findFirst()); } + @Override + public boolean isPendingClusterInstant(String instantTime) { + HoodieTimeline potentialTimeline = getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> i.getTimestamp().equals(instantTime)); + if (potentialTimeline.countInstants() == 0) { + return false; + } + if (potentialTimeline.countInstants() > 1) { + throw new IllegalStateException("Multiple instants with same timestamp: " + potentialTimeline); + } + return ClusteringUtils.isClusteringInstant(this, potentialTimeline.firstInstant().get()); + } + @Override public Option<byte[]> getInstantDetails(HoodieInstant instant) { return details.apply(instant); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index cdbe5b15fc5..a7344fc1512 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -406,6 +406,17 @@ public interface HoodieTimeline extends Serializable { */ public Option<HoodieInstant> getLastPendingClusterInstant(); + + /** + * get the least recent pending cluster commit if present + */ + public Option<HoodieInstant> getFirstPendingClusterInstant(); + + /** + * return true if instant is a pending clustering commit, otherwise false + */ + public boolean isPendingClusterInstant(String instantTime); + /** * Read the completed instant details. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 0f0f87c03c7..21ad0426a27 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -524,10 +524,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV * @param baseFile base File */ protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) { - List<String> pendingReplaceInstants = - metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - - return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime()); + return metaClient.getActiveTimeline().isPendingClusterInstant(baseFile.getCommitTime()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 216af429335..b9a7b840f36 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.view; import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieFSPermission; import org.apache.hudi.avro.model.HoodieFileStatus; @@ -57,6 +58,7 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; import org.apache.hadoop.fs.FileStatus; @@ -1442,6 +1444,30 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { } } + private void saveAsCompleteCluster(HoodieActiveTimeline timeline, HoodieInstant inflight, Option<byte[]> data) { + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, inflight.getAction()); + HoodieInstant clusteringInstant = new HoodieInstant(State.REQUESTED, inflight.getAction(), inflight.getTimestamp()); + HoodieClusteringPlan plan = new HoodieClusteringPlan(); + plan.setExtraMetadata(new HashMap<>()); + plan.setInputGroups(Collections.emptyList()); + plan.setStrategy(HoodieClusteringStrategy.newBuilder().build()); + plan.setVersion(1); + plan.setPreserveHoodieMetadata(false); + try { + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.CLUSTER.name()) + .setExtraMetadata(Collections.emptyMap()) + .setClusteringPlan(plan) + .build(); + timeline.saveToPendingReplaceCommit(clusteringInstant, + TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling clustering", ioe); + } + timeline.transitionRequestedToInflight(clusteringInstant, Option.empty()); + timeline.saveAsComplete(inflight, data); + } + @Test public void testReplaceWithTimeTravel() throws IOException { String partitionPath1 = "2020/06/27"; @@ -1765,8 +1791,8 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { List<HoodieWriteStat> writeStats2 = buildWriteStats(partitionToFile2, commitTime2); HoodieCommitMetadata commitMetadata2 = - CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION); - saveAsComplete(commitTimeline, instant2, Option.of(getUTF8Bytes(commitMetadata2.toJsonString()))); + CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.CLUSTER, "", HoodieTimeline.REPLACE_COMMIT_ACTION); + saveAsCompleteCluster(commitTimeline, instant2, Option.of(getUTF8Bytes(commitMetadata2.toJsonString()))); // another insert commit String commitTime3 = "3"; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java index ed78e33c10f..292e3bba5cc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java @@ -76,7 +76,7 @@ public class ClusteringPlanSourceFunction extends AbstractRichFunction implement @Override public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws Exception { - boolean isPending = StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingReplaceTimeline().containsInstant(clusteringInstantTime); + boolean isPending = StreamerUtil.createMetaClient(conf).getActiveTimeline().isPendingClusterInstant(clusteringInstantTime); if (isPending) { for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) { LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java index ac81b4e7af4..6f0bb97a053 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java @@ -109,7 +109,7 @@ public class ClusteringUtil { */ public static void rollbackClustering(HoodieFlinkTable<?> table, HoodieFlinkWriteClient<?> writeClient, String instantTime) { HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(instantTime); - if (table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant)) { + if (table.getMetaClient().reloadActiveTimeline().isPendingClusterInstant(instantTime)) { LOG.warn("Rollback failed clustering instant: [" + instantTime + "]"); table.rollbackInflightClustering(inflightInstant, commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 9415a80b4d5..90c7d493705 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieCleanConfig; @@ -216,7 +215,7 @@ public class HoodieClusteringJob { // Instant time is not specified // Find the earliest scheduled clustering instant for execution Option<HoodieInstant> firstClusteringInstant = - metaClient.getActiveTimeline().filterPendingReplaceTimeline().firstInstant(); + metaClient.getActiveTimeline().getFirstPendingClusterInstant(); if (firstClusteringInstant.isPresent()) { cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp(); LOG.info("Found the earliest scheduled clustering instant which will be executed: " @@ -262,14 +261,15 @@ public class HoodieClusteringJob { if (cfg.retryLastFailedClusteringJob) { HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext()); - HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights(); - if (!inflightHoodieTimeline.empty()) { - HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get(); + Option<HoodieInstant> lastClusterOpt = table.getActiveTimeline().getLastPendingClusterInstant(); + + if (lastClusterOpt.isPresent()) { + HoodieInstant inflightClusteringInstant = lastClusterOpt.get(); Date clusteringStartTime = HoodieActiveTimeline.parseDateFromInstantTime(inflightClusteringInstant.getTimestamp()); if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) { // if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering. LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again."); - instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp()); + instantTime = Option.of(inflightClusteringInstant.getTimestamp()); } else { LOG.info(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job."); }
