This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e4a6c650e0 [HUDI-7531] Consider pending clustering when scheduling a 
new clustering plan (#10923)
5e4a6c650e0 is described below

commit 5e4a6c650e0016449caa376dd34e81771ec91b6a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 26 02:13:25 2024 -0700

    [HUDI-7531] Consider pending clustering when scheduling a new clustering 
plan (#10923)
---
 .../PreferWriterConflictResolutionStrategy.java    |  2 +-
 .../cluster/ClusteringPlanActionExecutor.java      |  3 +-
 .../rollback/BaseRollbackActionExecutor.java       |  2 +-
 .../action/rollback/RestorePlanActionExecutor.java |  2 +-
 .../table/read/IncrementalQueryAnalyzer.java       |  2 +-
 .../table/timeline/HoodieDefaultTimeline.java      | 20 ++---
 .../hudi/common/table/timeline/HoodieTimeline.java |  3 +-
 .../apache/hudi/common/util/ClusteringUtils.java   | 33 ++++----
 .../hudi/common/util/TestClusteringUtils.java      |  2 +
 .../java/org/apache/hudi/util/StreamerUtil.java    |  2 +-
 .../apache/hudi/functional/TestCOWDataSource.scala | 95 ++++++++++++++++++----
 11 files changed, 111 insertions(+), 55 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 61ed673fc62..62fbf64a7f9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -55,7 +55,7 @@ public class PreferWriterConflictResolutionStrategy
                                                     Option<HoodieInstant> 
lastSuccessfulInstant) {
     HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
     if ((REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
-        && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
+        && ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant))
         || COMPACTION_ACTION.equals(currentInstant.getAction())) {
       return getCandidateInstantsForTableServicesCommits(activeTimeline, 
currentInstant);
     } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 18c98d377f6..6cb8f023ba8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -57,7 +57,8 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor
 
   protected Option<HoodieClusteringPlan> createClusteringPlan() {
     LOG.info("Checking if clustering needs to be run on " + 
config.getBasePath());
-    Option<HoodieInstant> lastClusteringInstant = 
table.getActiveTimeline().getLastClusterCommit();
+    Option<HoodieInstant> lastClusteringInstant =
+        table.getActiveTimeline().getLastClusteringInstant();
 
     int commitsSinceLastClustering = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
         
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
 Integer.MAX_VALUE)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 077f956eb7b..51159d3d5c3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -188,7 +188,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
         if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) 
{
           return true;
         }
-        return 
!ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
+        return !ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), 
instant);
       }).map(HoodieInstant::getTimestamp)
           .collect(Collectors.toList());
       if ((instantTimeToRollback != null) && !inflights.isEmpty()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index b3ee11b9836..2f9e96859ff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -71,7 +71,7 @@ public class RestorePlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T,
       // rollback pending clustering instants first before other instants (See 
HUDI-3362)
       List<HoodieInstant> pendingClusteringInstantsToRollback = 
table.getActiveTimeline().filterPendingReplaceTimeline()
               // filter only clustering related replacecommits (Not 
insert_overwrite related commits)
-              .filter(instant -> 
ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
+              .filter(instant -> 
ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant))
               .getReverseOrderedInstants()
               .filter(instant -> 
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), 
savepointToRestoreTimestamp))
               .collect(Collectors.toList());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 03596354bb1..3f0eb32c7e5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -230,7 +230,7 @@ public class IncrementalQueryAnalyzer {
       timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
     }
     if (skipClustering) {
-      timeline = timeline.filter(instant -> 
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+      timeline = timeline.filter(instant -> 
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
     }
     return timeline;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 7af56785906..9b231b4ee85 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.common.table.timeline;
 
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -31,7 +29,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -512,25 +509,18 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   }
 
   @Override
-  public Option<HoodieInstant> getLastClusterCommit() {
-    return  Option.fromJavaOptional(getCommitsTimeline().filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+  public Option<HoodieInstant> getLastClusteringInstant() {
+    return Option.fromJavaOptional(getCommitsTimeline().filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
         .getReverseOrderedInstants()
-        .filter(i -> {
-          try {
-            HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i, 
this);
-            return 
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
-          } catch (IOException e) {
-            LOG.warn("Unable to read commit metadata for " + i + " due to " + 
e.getMessage());
-            return false;
-          }
-        }).findFirst());
+        .filter(i -> ClusteringUtils.isClusteringInstant(this, i))
+        .findFirst());
   }
 
   @Override
   public Option<HoodieInstant> getLastPendingClusterInstant() {
     return  Option.fromJavaOptional(filterPendingReplaceTimeline()
         .getReverseOrderedInstants()
-        .filter(i -> ClusteringUtils.isPendingClusteringInstant(this, 
i)).findFirst());
+        .filter(i -> ClusteringUtils.isClusteringInstant(this, 
i)).findFirst());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index ac77e1eb606..b02e797f23d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -397,9 +397,8 @@ public interface HoodieTimeline extends Serializable {
 
   /**
    * get the most recent cluster commit if present
-   *
    */
-  public Option<HoodieInstant> getLastClusterCommit();
+  public Option<HoodieInstant> getLastClusteringInstant();
 
   /**
    * get the most recent pending cluster commit if present
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index e9ae9cfbdfb..d041b6bcb8f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -75,15 +75,22 @@ public class ClusteringUtils {
   }
 
   /**
-   * Checks if the replacecommit is clustering commit.
+   * Checks if the requested, inflight, or completed instant of replacecommit 
action
+   * is a clustering operation, by checking whether the requested instant 
contains
+   * a clustering plan.
+   *
+   * @param timeline       Hudi timeline.
+   * @param replaceInstant the instant of replacecommit action to check.
+   * @return whether the instant is a clustering operation.
    */
-  public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, 
HoodieInstant pendingReplaceInstant) {
-    return getClusteringPlan(metaClient, pendingReplaceInstant).isPresent();
+  public static boolean isClusteringInstant(HoodieTimeline timeline, 
HoodieInstant replaceInstant) {
+    return getClusteringPlan(timeline, replaceInstant).isPresent();
   }
 
   /**
    * Get requested replace metadata from timeline.
-   * @param timeline used to get the bytes stored in the requested replace 
instant in the timeline
+   *
+   * @param timeline              used to get the bytes stored in the 
requested replace instant in the timeline
    * @param pendingReplaceInstant can be in any state, because it will always 
be converted to requested state
    * @return option of the replace metadata if present, else empty
    * @throws IOException
@@ -238,16 +245,8 @@ public class ClusteringUtils {
 
   public static List<HoodieInstant> 
getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
     return 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstantsAsStream()
-            .filter(instant -> isPendingClusteringInstant(metaClient, instant))
-            .collect(Collectors.toList());
-  }
-
-  public static boolean isPendingClusteringInstant(HoodieTableMetaClient 
metaClient, HoodieInstant instant) {
-    return getClusteringPlan(metaClient, instant).isPresent();
-  }
-
-  public static boolean isPendingClusteringInstant(HoodieTimeline timeline, 
HoodieInstant instant) {
-    return getClusteringPlan(timeline, instant).isPresent();
+        .filter(instant -> isClusteringInstant(metaClient.getActiveTimeline(), 
instant))
+        .collect(Collectors.toList());
   }
 
   /**
@@ -301,9 +300,11 @@ public class ClusteringUtils {
   }
 
   /**
-   * Returns whether the given instant {@code instant} is with clustering 
operation.
+   * @param instant  Hudi instant to check.
+   * @param timeline Hudi timeline.
+   * @return whether the given {@code instant} is a completed clustering 
operation.
    */
-  public static boolean isClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
+  public static boolean isCompletedClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
     if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
       return false;
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index d3375fe5e8a..881ebf1fa06 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -133,7 +133,9 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     String clusterTime1 = "1";
     HoodieInstant requestedInstant = 
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
     HoodieInstant inflightInstant = 
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant,
 Option.empty());
+    
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(), 
requestedInstant));
     HoodieClusteringPlan requestedClusteringPlan = 
ClusteringUtils.getClusteringPlan(metaClient, 
requestedInstant).get().getRight();
+    
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(), 
inflightInstant));
     HoodieClusteringPlan inflightClusteringPlan = 
ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight();
     assertEquals(requestedClusteringPlan, inflightClusteringPlan);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 2b8656f2148..c4587cc2c0b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -514,7 +514,7 @@ public class StreamerUtil {
   public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant 
instant, HoodieTimeline timeline) {
     return tableType == HoodieTableType.MERGE_ON_READ
         ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a 
compaction
-        : !ClusteringUtils.isClusteringInstant(instant, timeline);   // not a 
clustering
+        : !ClusteringUtils.isCompletedClusteringInstant(instant, timeline);   
// not a clustering
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 22a61d58881..2014db073d9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,9 +23,10 @@ import 
org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENE
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.avro.AvroSchemaCompatibility.SchemaIncompatibilityType
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -33,7 +34,8 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, 
HoodieTimeline, Tim
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
-import org.apache.hudi.common.util
+import org.apache.hudi.common.util.{ClusteringUtils, Option}
+import org.apache.hudi.common.{HoodiePendingRollbackInfo, util}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.metrics.HoodieMetricsConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
@@ -44,6 +46,7 @@ import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
+import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils, 
ScalaAssertionSupport}
@@ -1819,9 +1822,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
   }
 
   @ParameterizedTest
-  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testInsertOverwriteCluster(recordType: HoodieRecordType): Unit = {
-    val (writeOpts, _) = getWriterReaderOpts(recordType)
+  @EnumSource(value = classOf[HoodieInstant.State], names = Array("REQUESTED", 
"INFLIGHT", "COMPLETED"))
+  def testInsertOverwriteCluster(firstClusteringState: HoodieInstant.State): 
Unit = {
+    val (writeOpts, _) = getWriterReaderOpts()
 
     // Insert Operation
     val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -1831,6 +1834,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       INLINE_CLUSTERING_ENABLE.key() -> "true",
       "hoodie.clustering.inline.max.commits" -> "2",
       "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key",
+      "hoodie.clustering.plan.strategy.max.num.groups" -> "1",
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
@@ -1843,7 +1847,15 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    for (i <- 1 until 6) {
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(hadoopConf)
+      .build()
+
+    assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isEmpty)
+
+    var lastClustering: HoodieInstant = null
+    for (i <- 1 until 4) {
       val records = recordsToStrings(dataGen.generateInsertsForPartition("00" 
+ i, 10, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
       val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
       inputDF.write.format("hudi")
@@ -1851,21 +1863,72 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
         .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
         .mode(SaveMode.Append)
         .save(basePath)
+      val lastInstant = 
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
+      if (i == 1 || i == 3) {
+        // Last instant is clustering
+        assertTrue(TimelineUtils.getCommitMetadata(lastInstant, 
metaClient.getActiveTimeline)
+          .getOperationType.equals(WriteOperationType.CLUSTER))
+        
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline, 
lastInstant))
+        lastClustering = lastInstant
+        assertEquals(
+          lastClustering,
+          metaClient.getActiveTimeline.getLastClusteringInstant.get)
+      } else {
+        assertTrue(TimelineUtils.getCommitMetadata(lastInstant, 
metaClient.getActiveTimeline)
+          .getOperationType.equals(WriteOperationType.INSERT_OVERWRITE))
+        
assertFalse(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline, 
lastInstant))
+        assertEquals(
+          lastClustering,
+          metaClient.getActiveTimeline.getLastClusteringInstant.get)
+      }
+      if (i == 1) {
+        val writeConfig = HoodieWriteConfig.newBuilder()
+          .forTable("hoodie_test")
+          .withPath(basePath)
+          .withProps(optsWithCluster)
+          .build()
+        if (firstClusteringState == HoodieInstant.State.INFLIGHT
+          || firstClusteringState == HoodieInstant.State.REQUESTED) {
+          // Move the clustering to inflight for testing
+          fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName), 
false)
+          val inflightClustering = 
metaClient.reloadActiveTimeline.lastInstant.get
+          assertTrue(inflightClustering.isInflight)
+          assertEquals(
+            inflightClustering,
+            metaClient.getActiveTimeline.getLastClusteringInstant.get)
+        }
+        if (firstClusteringState == HoodieInstant.State.REQUESTED) {
+          val table = HoodieSparkTable.create(writeConfig, context)
+          table.rollbackInflightClustering(
+            metaClient.getActiveTimeline.getLastClusteringInstant.get,
+            new java.util.function.Function[String, 
Option[HoodiePendingRollbackInfo]] {
+              override def apply(commitToRollback: String): 
Option[HoodiePendingRollbackInfo] = {
+                new SparkRDDWriteClient(context, 
writeConfig).getTableServiceClient
+                  .getPendingRollbackInfo(table.getMetaClient, 
commitToRollback, false)
+              }
+            })
+          val requestedClustering = 
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
+          assertTrue(requestedClustering.isRequested)
+          assertEquals(
+            requestedClustering,
+            metaClient.getActiveTimeline.getLastClusteringInstant.get)
+        }
+        // This should not schedule any new clustering
+        new SparkRDDWriteClient(context, writeConfig)
+          
.scheduleClustering(org.apache.hudi.common.util.Option.of(Map[String, 
String]()))
+        assertEquals(lastInstant.getTimestamp,
+          
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get.getTimestamp)
+      }
     }
-
-    val metaClient = HoodieTableMetaClient.builder()
-      .setBasePath(basePath)
-      .setConf(hadoopConf)
-      .build()
-    val timeline = metaClient.getActiveTimeline
-    val instants = 
timeline.getAllCommitsTimeline.filterCompletedInstants.getInstants
-    assertEquals(9, instants.size)
+    val timeline = metaClient.reloadActiveTimeline
+    val instants = timeline.getCommitsTimeline.getInstants
+    assertEquals(6, instants.size)
     val replaceInstants = instants.filter(i => 
i.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).toList
-    assertEquals(8, replaceInstants.size)
+    assertEquals(5, replaceInstants.size)
     val clusterInstants = replaceInstants.filter(i => {
       TimelineUtils.getCommitMetadata(i, 
metaClient.getActiveTimeline).getOperationType.equals(WriteOperationType.CLUSTER)
     })
-    assertEquals(3, clusterInstants.size)
+    assertEquals(2, clusterInstants.size)
   }
 
 

Reply via email to