This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 03fdc63ac58 [HUDI-6882] Differentiate between replacecommits in
cluster planning (#9755)
03fdc63ac58 is described below
commit 03fdc63ac5810efd9d1036188c2623131c258c08
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 21 17:40:22 2023 -0400
[HUDI-6882] Differentiate between replacecommits in cluster planning (#9755)
Cluster planning will run clustering every n commits. To do this, it gets
the previous clustering instant and then finds the number of commits after
that. However, it was finding the previous clustering instant just by finding
the latest replacecommit. Replacecommit is also used for insert_overwrite. This
commit fixes the logic to check the commit metadata to ensure it is a cluster
commit.
Co-authored-by: Jonathan Vexler <=>
---
.../cluster/ClusteringPlanActionExecutor.java | 3 +-
.../table/timeline/HoodieDefaultTimeline.java | 16 +++++++
.../apache/hudi/functional/TestCOWDataSource.scala | 54 +++++++++++++++++++++-
3 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 680fd696921..b8c38bd140d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -57,8 +57,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " +
config.getBasePath());
- Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline()
- .filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant();
+ Option<HoodieInstant> lastClusteringInstant =
table.getActiveTimeline().getLastClusterCommit();
int commitsSinceLastClustering =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
Integer.MAX_VALUE)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 21b40cdbf12..264af4683e1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,12 +18,15 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
+import java.io.IOException;
import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -490,6 +493,19 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
return firstNonSavepointCommit;
}
+
+ public Option<HoodieInstant> getLastClusterCommit() {
+ return Option.fromJavaOptional(getCommitsTimeline().filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+ .getReverseOrderedInstants()
+ .filter(i -> {
+ try {
+ HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i,
this);
+ return
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
+ } catch (IOException e) {
+ return false;
+ }
+ }).findFirst());
+ }
@Override
public Option<byte[]> getInstantDetails(HoodieInstant instant) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 104996d5c4f..68227ba074e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
+import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE,
KEYGENERATOR_CLASS_NAME}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -28,7 +28,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig,
HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline,
TimelineUtils}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
@@ -1724,6 +1724,56 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(),
timeline)
metadata.getOperationType.equals(WriteOperationType.UPSERT)
}
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
+ def testInsertOverwriteCluster(recordType: HoodieRecordType): Unit = {
+ val (writeOpts, _) = getWriterReaderOpts(recordType)
+
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ val optsWithCluster = Map(
+ INLINE_CLUSTERING_ENABLE.key() -> "true",
+ "hoodie.clustering.inline.max.commits" -> "2",
+ "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key",
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ ) ++ writeOpts
+ inputDF.write.format("hudi")
+ .options(optsWithCluster)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ for (i <- 1 until 6) {
+ val records = recordsToStrings(dataGen.generateInsertsForPartition("00"
+ i, 10, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("hudi")
+ .options(optsWithCluster)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ }
+
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(hadoopConf)
+ .build()
+ val timeline = metaClient.getActiveTimeline
+ val instants =
timeline.getAllCommitsTimeline.filterCompletedInstants.getInstants
+ assertEquals(9, instants.size)
+ val replaceInstants = instants.filter(i =>
i.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).toList
+ assertEquals(8, replaceInstants.size)
+ val clusterInstants = replaceInstants.filter(i => {
+ TimelineUtils.getCommitMetadata(i,
metaClient.getActiveTimeline).getOperationType.equals(WriteOperationType.CLUSTER)
+ })
+ assertEquals(3, clusterInstants.size)
+ }
}
object TestCOWDataSource {