This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4a993dae5df95e15436b6a9f214fb9a6ff64515a
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Apr 12 00:08:37 2024 -0400

    [HUDI-7290]  Don't assume ReplaceCommits are always Clustering (#10479)
    
    * fix all usages not in tests
    * do pass through and fix
    * fix test that didn't actually use a cluster commit
    * make method private and fix naming
    * revert write markers changes
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 10 ++++---
 .../org/apache/hudi/table/marker/WriteMarkers.java |  2 ++
 .../table/timeline/HoodieDefaultTimeline.java      | 31 ++++++++++++++++++++--
 .../hudi/common/table/timeline/HoodieTimeline.java | 11 ++++++++
 .../table/view/AbstractTableFileSystemView.java    |  5 +---
 .../table/view/TestHoodieTableFileSystemView.java  | 30 +++++++++++++++++++--
 .../clustering/ClusteringPlanSourceFunction.java   |  2 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |  2 +-
 .../apache/hudi/utilities/HoodieClusteringJob.java | 12 ++++-----
 9 files changed, 86 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 909581687d4..e408dc7a779 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -444,8 +444,12 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
     if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
-      table.getMetaClient().reloadActiveTimeline();
+      if 
(pendingClusteringTimeline.isPendingClusterInstant(inflightInstant.getTimestamp()))
 {
+        table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+        table.getMetaClient().reloadActiveTimeline();
+      } else {
+        throw new HoodieClusteringException("Non clustering replace-commit 
inflight at timestamp " + clusteringInstant);
+      }
     }
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at {}", clusteringInstant);
@@ -575,7 +579,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
 
     // if just inline schedule is enabled
     if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering()
-        && table.getActiveTimeline().filterPendingReplaceTimeline().empty()) {
+        && 
!table.getActiveTimeline().getLastPendingClusterInstant().isPresent()) {
       // proceed only if there are no pending clustering
       
metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), 
"true");
       inlineScheduleClustering(extraMetadata);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
index 01c8c99618a..f8fbd13b1c2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
@@ -87,6 +87,7 @@ public abstract class WriteMarkers implements Serializable {
       HoodieTimeline pendingReplaceTimeline = 
activeTimeline.filterPendingReplaceTimeline();
       // TODO If current is compact or clustering then create marker directly 
without early conflict detection.
       // Need to support early conflict detection between table service and 
common writers.
+      // ok to use filterPendingReplaceTimeline().containsInstant because 
early conflict detection is not relevant for insert overwrite as well
       if (pendingCompactionTimeline.containsInstant(instantTime) || 
pendingReplaceTimeline.containsInstant(instantTime)) {
         return create(partitionPath, fileName, type, false);
       }
@@ -127,6 +128,7 @@ public abstract class WriteMarkers implements Serializable {
       HoodieTimeline pendingReplaceTimeline = 
activeTimeline.filterPendingReplaceTimeline();
       // TODO If current is compact or clustering then create marker directly 
without early conflict detection.
       // Need to support early conflict detection between table service and 
common writers.
+      // ok to use filterPendingReplaceTimeline().containsInstant because 
early conflict detection is not relevant for insert overwrite as well
       if (pendingCompactionTimeline.containsInstant(instantTime) || 
pendingReplaceTimeline.containsInstant(instantTime)) {
         return create(partitionPath, fileName, type, true);
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index a26bed061d6..737ec0ca5d9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -503,13 +503,40 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
         .findFirst());
   }
 
+  @Override
+  public Option<HoodieInstant> getFirstPendingClusterInstant() {
+    return getLastOrFirstPendingClusterInstant(false);
+  }
+
   @Override
   public Option<HoodieInstant> getLastPendingClusterInstant() {
-    return  Option.fromJavaOptional(filterPendingReplaceTimeline()
-        .getReverseOrderedInstants()
+    return getLastOrFirstPendingClusterInstant(true);
+  }
+
+  private Option<HoodieInstant> getLastOrFirstPendingClusterInstant(boolean 
isLast) {
+    HoodieTimeline replaceTimeline = filterPendingReplaceTimeline();
+    Stream<HoodieInstant> replaceStream;
+    if (isLast) {
+      replaceStream = replaceTimeline.getReverseOrderedInstants();
+    } else {
+      replaceStream = replaceTimeline.getInstantsAsStream();
+    }
+    return  Option.fromJavaOptional(replaceStream
         .filter(i -> ClusteringUtils.isClusteringInstant(this, 
i)).findFirst());
   }
   
+  @Override
+  public boolean isPendingClusterInstant(String instantTime) {
+    HoodieTimeline potentialTimeline = 
getCommitsTimeline().filterPendingReplaceTimeline().filter(i -> 
i.getTimestamp().equals(instantTime));
+    if (potentialTimeline.countInstants() == 0) {
+      return false;
+    }
+    if (potentialTimeline.countInstants() > 1) {
+      throw new IllegalStateException("Multiple instants with same timestamp: 
" + potentialTimeline);
+    }
+    return ClusteringUtils.isClusteringInstant(this, 
potentialTimeline.firstInstant().get());
+  }
+
   @Override
   public Option<byte[]> getInstantDetails(HoodieInstant instant) {
     return details.apply(instant);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index cdbe5b15fc5..a7344fc1512 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -406,6 +406,17 @@ public interface HoodieTimeline extends Serializable {
    */
   public Option<HoodieInstant> getLastPendingClusterInstant();
 
+
+  /**
+   * get the least recent pending cluster commit if present
+   */
+  public Option<HoodieInstant> getFirstPendingClusterInstant();
+
+  /**
+   * return true if instant is a pending clustering commit, otherwise false
+   */
+  public boolean isPendingClusterInstant(String instantTime);
+
   /**
    * Read the completed instant details.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 0f0f87c03c7..21ad0426a27 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -524,10 +524,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    * @param baseFile base File
    */
   protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) {
-    List<String> pendingReplaceInstants =
-        
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
-
-    return !pendingReplaceInstants.isEmpty() && 
pendingReplaceInstants.contains(baseFile.getCommitTime());
+    return 
metaClient.getActiveTimeline().isPendingClusterInstant(baseFile.getCommitTime());
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 216af429335..b9a7b840f36 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.view;
 
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieFSPermission;
 import org.apache.hudi.avro.model.HoodieFileStatus;
@@ -57,6 +58,7 @@ import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -1442,6 +1444,30 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     }
   }
 
+  private void saveAsCompleteCluster(HoodieActiveTimeline timeline, 
HoodieInstant inflight, Option<byte[]> data) {
+    assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, inflight.getAction());
+    HoodieInstant clusteringInstant = new HoodieInstant(State.REQUESTED, 
inflight.getAction(), inflight.getTimestamp());
+    HoodieClusteringPlan plan = new HoodieClusteringPlan();
+    plan.setExtraMetadata(new HashMap<>());
+    plan.setInputGroups(Collections.emptyList());
+    plan.setStrategy(HoodieClusteringStrategy.newBuilder().build());
+    plan.setVersion(1);
+    plan.setPreserveHoodieMetadata(false);
+    try {
+      HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+          .setOperationType(WriteOperationType.CLUSTER.name())
+          .setExtraMetadata(Collections.emptyMap())
+          .setClusteringPlan(plan)
+          .build();
+      timeline.saveToPendingReplaceCommit(clusteringInstant,
+          
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Exception scheduling clustering", ioe);
+    }
+    timeline.transitionRequestedToInflight(clusteringInstant, Option.empty());
+    timeline.saveAsComplete(inflight, data);
+  }
+
   @Test
   public void testReplaceWithTimeTravel() throws IOException {
     String partitionPath1 = "2020/06/27";
@@ -1765,8 +1791,8 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     List<HoodieWriteStat> writeStats2 = buildWriteStats(partitionToFile2, 
commitTime2);
 
     HoodieCommitMetadata commitMetadata2 =
-        CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, 
Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-    saveAsComplete(commitTimeline, instant2, 
Option.of(getUTF8Bytes(commitMetadata2.toJsonString())));
+        CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, 
Option.empty(), WriteOperationType.CLUSTER, "", 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+    saveAsCompleteCluster(commitTimeline, instant2, 
Option.of(getUTF8Bytes(commitMetadata2.toJsonString())));
 
     // another insert commit
     String commitTime3 = "3";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
index ed78e33c10f..292e3bba5cc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanSourceFunction.java
@@ -76,7 +76,7 @@ public class ClusteringPlanSourceFunction extends 
AbstractRichFunction implement
 
   @Override
   public void run(SourceContext<ClusteringPlanEvent> sourceContext) throws 
Exception {
-    boolean isPending = 
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterPendingReplaceTimeline().containsInstant(clusteringInstantTime);
+    boolean isPending = 
StreamerUtil.createMetaClient(conf).getActiveTimeline().isPendingClusterInstant(clusteringInstantTime);
     if (isPending) {
       for (HoodieClusteringGroup clusteringGroup : 
clusteringPlan.getInputGroups()) {
         LOG.info("Execute clustering plan for instant {} as {} file slices", 
clusteringInstantTime, clusteringGroup.getSlices().size());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index ac81b4e7af4..6f0bb97a053 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -109,7 +109,7 @@ public class ClusteringUtil {
    */
   public static void rollbackClustering(HoodieFlinkTable<?> table, 
HoodieFlinkWriteClient<?> writeClient, String instantTime) {
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(instantTime);
-    if 
(table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant))
 {
+    if 
(table.getMetaClient().reloadActiveTimeline().isPendingClusterInstant(instantTime))
 {
       LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
       table.rollbackInflightClustering(inflightInstant,
           commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 9415a80b4d5..90c7d493705 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -216,7 +215,7 @@ public class HoodieClusteringJob {
         // Instant time is not specified
         // Find the earliest scheduled clustering instant for execution
         Option<HoodieInstant> firstClusteringInstant =
-            
metaClient.getActiveTimeline().filterPendingReplaceTimeline().firstInstant();
+            metaClient.getActiveTimeline().getFirstPendingClusterInstant();
         if (firstClusteringInstant.isPresent()) {
           cfg.clusteringInstantTime = 
firstClusteringInstant.get().getTimestamp();
           LOG.info("Found the earliest scheduled clustering instant which will 
be executed: "
@@ -262,14 +261,15 @@ public class HoodieClusteringJob {
 
       if (cfg.retryLastFailedClusteringJob) {
         HoodieSparkTable<HoodieRecordPayload> table = 
HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
-        HoodieTimeline inflightHoodieTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
-        if (!inflightHoodieTimeline.empty()) {
-          HoodieInstant inflightClusteringInstant = 
inflightHoodieTimeline.lastInstant().get();
+        Option<HoodieInstant> lastClusterOpt = 
table.getActiveTimeline().getLastPendingClusterInstant();
+
+        if (lastClusterOpt.isPresent()) {
+          HoodieInstant inflightClusteringInstant = lastClusterOpt.get();
           Date clusteringStartTime = 
HoodieActiveTimeline.parseDateFromInstantTime(inflightClusteringInstant.getTimestamp());
           if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < 
System.currentTimeMillis()) {
             // if there has failed clustering, then we will use the failed 
clustering instant-time to trigger next clustering action which will rollback 
and clustering.
             LOG.info("Found failed clustering instant at : " + 
inflightClusteringInstant + "; Will rollback the failed clustering and 
re-trigger again.");
-            instantTime = 
Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp());
+            instantTime = Option.of(inflightClusteringInstant.getTimestamp());
           } else {
             LOG.info(inflightClusteringInstant + " might still be in progress, 
will trigger a new clustering job.");
           }

Reply via email to