This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 03fdc63ac58 [HUDI-6882] Differentiate between replacecommits in 
cluster planning (#9755)
03fdc63ac58 is described below

commit 03fdc63ac5810efd9d1036188c2623131c258c08
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 21 17:40:22 2023 -0400

    [HUDI-6882] Differentiate between replacecommits in cluster planning (#9755)
    
    Cluster planning will run clustering every n commits. To do this, it gets 
the previous clustering instant and then finds the number of commits after 
that. However, it was finding the previous clustering instant just by finding 
the latest replacecommit. Replacecommit is also used for insert_overwrite. This 
commit fixes the logic to check the commit metadata to ensure it is a cluster 
commit.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../cluster/ClusteringPlanActionExecutor.java      |  3 +-
 .../table/timeline/HoodieDefaultTimeline.java      | 16 +++++++
 .../apache/hudi/functional/TestCOWDataSource.scala | 54 +++++++++++++++++++++-
 3 files changed, 69 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 680fd696921..b8c38bd140d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -57,8 +57,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor
 
   protected Option<HoodieClusteringPlan> createClusteringPlan() {
     LOG.info("Checking if clustering needs to be run on " + 
config.getBasePath());
-    Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline()
-        .filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant();
+    Option<HoodieInstant> lastClusteringInstant = 
table.getActiveTimeline().getLastClusterCommit();
 
     int commitsSinceLastClustering = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
         
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
 Integer.MAX_VALUE)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 21b40cdbf12..264af4683e1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,12 +18,15 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -490,6 +493,19 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     }
     return firstNonSavepointCommit;
   }
+
+  public Option<HoodieInstant> getLastClusterCommit() {
+    return  Option.fromJavaOptional(getCommitsTimeline().filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+        .getReverseOrderedInstants()
+        .filter(i -> {
+          try {
+            HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i, 
this);
+            return 
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
+          } catch (IOException e) {
+            return false;
+          }
+        }).findFirst());
+  }
   
   @Override
   public Option<byte[]> getInstantDetails(HoodieInstant instant) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 104996d5c4f..68227ba074e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
+import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, 
KEYGENERATOR_CLASS_NAME}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -28,7 +28,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, 
TimelineUtils}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
@@ -1724,6 +1724,56 @@ class TestCOWDataSource extends 
HoodieSparkClientTestBase with ScalaAssertionSup
     val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(), 
timeline)
     metadata.getOperationType.equals(WriteOperationType.UPSERT)
   }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testInsertOverwriteCluster(recordType: HoodieRecordType): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts(recordType)
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val optsWithCluster = Map(
+      INLINE_CLUSTERING_ENABLE.key() -> "true",
+      "hoodie.clustering.inline.max.commits" -> "2",
+      "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key",
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+    ) ++ writeOpts
+    inputDF.write.format("hudi")
+      .options(optsWithCluster)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    for (i <- 1 until 6) {
+      val records = recordsToStrings(dataGen.generateInsertsForPartition("00" 
+ i, 10, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+      inputDF.write.format("hudi")
+        .options(optsWithCluster)
+        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
+        .mode(SaveMode.Append)
+        .save(basePath)
+    }
+
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(hadoopConf)
+      .build()
+    val timeline = metaClient.getActiveTimeline
+    val instants = 
timeline.getAllCommitsTimeline.filterCompletedInstants.getInstants
+    assertEquals(9, instants.size)
+    val replaceInstants = instants.filter(i => 
i.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).toList
+    assertEquals(8, replaceInstants.size)
+    val clusterInstants = replaceInstants.filter(i => {
+      TimelineUtils.getCommitMetadata(i, 
metaClient.getActiveTimeline).getOperationType.equals(WriteOperationType.CLUSTER)
+    })
+    assertEquals(3, clusterInstants.size)
+  }
 }
 
 object TestCOWDataSource {

Reply via email to