This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e4a6c650e0 [HUDI-7531] Consider pending clustering when scheduling a
new clustering plan (#10923)
5e4a6c650e0 is described below
commit 5e4a6c650e0016449caa376dd34e81771ec91b6a
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 26 02:13:25 2024 -0700
[HUDI-7531] Consider pending clustering when scheduling a new clustering
plan (#10923)
---
.../PreferWriterConflictResolutionStrategy.java | 2 +-
.../cluster/ClusteringPlanActionExecutor.java | 3 +-
.../rollback/BaseRollbackActionExecutor.java | 2 +-
.../action/rollback/RestorePlanActionExecutor.java | 2 +-
.../table/read/IncrementalQueryAnalyzer.java | 2 +-
.../table/timeline/HoodieDefaultTimeline.java | 20 ++---
.../hudi/common/table/timeline/HoodieTimeline.java | 3 +-
.../apache/hudi/common/util/ClusteringUtils.java | 33 ++++----
.../hudi/common/util/TestClusteringUtils.java | 2 +
.../java/org/apache/hudi/util/StreamerUtil.java | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 95 ++++++++++++++++++----
11 files changed, 111 insertions(+), 55 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
index 61ed673fc62..62fbf64a7f9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java
@@ -55,7 +55,7 @@ public class PreferWriterConflictResolutionStrategy
Option<HoodieInstant>
lastSuccessfulInstant) {
HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
if ((REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
- && ClusteringUtils.isClusteringCommit(metaClient, currentInstant))
+ && ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant))
|| COMPACTION_ACTION.equals(currentInstant.getAction())) {
return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant);
} else {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 18c98d377f6..6cb8f023ba8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -57,7 +57,8 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " +
config.getBasePath());
- Option<HoodieInstant> lastClusteringInstant =
table.getActiveTimeline().getLastClusterCommit();
+ Option<HoodieInstant> lastClusteringInstant =
+ table.getActiveTimeline().getLastClusteringInstant();
int commitsSinceLastClustering =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
Integer.MAX_VALUE)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 077f956eb7b..51159d3d5c3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -188,7 +188,7 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION))
{
return true;
}
- return
!ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant);
+ return !ClusteringUtils.isClusteringInstant(table.getActiveTimeline(),
instant);
}).map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
index b3ee11b9836..2f9e96859ff 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java
@@ -71,7 +71,7 @@ public class RestorePlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T,
// rollback pending clustering instants first before other instants (See
HUDI-3362)
List<HoodieInstant> pendingClusteringInstantsToRollback =
table.getActiveTimeline().filterPendingReplaceTimeline()
// filter only clustering related replacecommits (Not
insert_overwrite related commits)
- .filter(instant ->
ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
+ .filter(instant ->
ClusteringUtils.isClusteringInstant(table.getActiveTimeline(), instant))
.getReverseOrderedInstants()
.filter(instant ->
HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(),
savepointToRestoreTimestamp))
.collect(Collectors.toList());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index 03596354bb1..3f0eb32c7e5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -230,7 +230,7 @@ public class IncrementalQueryAnalyzer {
timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
}
if (skipClustering) {
- timeline = timeline.filter(instant ->
!ClusteringUtils.isClusteringInstant(instant, oriTimeline));
+ timeline = timeline.filter(instant ->
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
}
return timeline;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 7af56785906..9b231b4ee85 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -18,8 +18,6 @@
package org.apache.hudi.common.table.timeline;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -31,7 +29,6 @@ import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -512,25 +509,18 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
}
@Override
- public Option<HoodieInstant> getLastClusterCommit() {
- return Option.fromJavaOptional(getCommitsTimeline().filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+ public Option<HoodieInstant> getLastClusteringInstant() {
+ return Option.fromJavaOptional(getCommitsTimeline().filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
.getReverseOrderedInstants()
- .filter(i -> {
- try {
- HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i,
this);
- return
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
- } catch (IOException e) {
- LOG.warn("Unable to read commit metadata for " + i + " due to " +
e.getMessage());
- return false;
- }
- }).findFirst());
+ .filter(i -> ClusteringUtils.isClusteringInstant(this, i))
+ .findFirst());
}
@Override
public Option<HoodieInstant> getLastPendingClusterInstant() {
return Option.fromJavaOptional(filterPendingReplaceTimeline()
.getReverseOrderedInstants()
- .filter(i -> ClusteringUtils.isPendingClusteringInstant(this,
i)).findFirst());
+ .filter(i -> ClusteringUtils.isClusteringInstant(this,
i)).findFirst());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index ac77e1eb606..b02e797f23d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -397,9 +397,8 @@ public interface HoodieTimeline extends Serializable {
/**
* get the most recent cluster commit if present
- *
*/
- public Option<HoodieInstant> getLastClusterCommit();
+ public Option<HoodieInstant> getLastClusteringInstant();
/**
* get the most recent pending cluster commit if present
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index e9ae9cfbdfb..d041b6bcb8f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -75,15 +75,22 @@ public class ClusteringUtils {
}
/**
- * Checks if the replacecommit is clustering commit.
+ * Checks if the requested, inflight, or completed instant of replacecommit
action
+ * is a clustering operation, by checking whether the requested instant
contains
+ * a clustering plan.
+ *
+ * @param timeline Hudi timeline.
+ * @param replaceInstant the instant of replacecommit action to check.
+ * @return whether the instant is a clustering operation.
*/
- public static boolean isClusteringCommit(HoodieTableMetaClient metaClient,
HoodieInstant pendingReplaceInstant) {
- return getClusteringPlan(metaClient, pendingReplaceInstant).isPresent();
+ public static boolean isClusteringInstant(HoodieTimeline timeline,
HoodieInstant replaceInstant) {
+ return getClusteringPlan(timeline, replaceInstant).isPresent();
}
/**
* Get requested replace metadata from timeline.
- * @param timeline used to get the bytes stored in the requested replace
instant in the timeline
+ *
+ * @param timeline used to get the bytes stored in the
requested replace instant in the timeline
* @param pendingReplaceInstant can be in any state, because it will always
be converted to requested state
* @return option of the replace metadata if present, else empty
* @throws IOException
@@ -238,16 +245,8 @@ public class ClusteringUtils {
public static List<HoodieInstant>
getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
return
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstantsAsStream()
- .filter(instant -> isPendingClusteringInstant(metaClient, instant))
- .collect(Collectors.toList());
- }
-
- public static boolean isPendingClusteringInstant(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
- return getClusteringPlan(metaClient, instant).isPresent();
- }
-
- public static boolean isPendingClusteringInstant(HoodieTimeline timeline,
HoodieInstant instant) {
- return getClusteringPlan(timeline, instant).isPresent();
+ .filter(instant -> isClusteringInstant(metaClient.getActiveTimeline(),
instant))
+ .collect(Collectors.toList());
}
/**
@@ -301,9 +300,11 @@ public class ClusteringUtils {
}
/**
- * Returns whether the given instant {@code instant} is with clustering
operation.
+ * @param instant Hudi instant to check.
+ * @param timeline Hudi timeline.
+ * @return whether the given {@code instant} is a completed clustering
operation.
*/
- public static boolean isClusteringInstant(HoodieInstant instant,
HoodieTimeline timeline) {
+ public static boolean isCompletedClusteringInstant(HoodieInstant instant,
HoodieTimeline timeline) {
if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
return false;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index d3375fe5e8a..881ebf1fa06 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -133,7 +133,9 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
String clusterTime1 = "1";
HoodieInstant requestedInstant =
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant,
Option.empty());
+
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
requestedInstant));
HoodieClusteringPlan requestedClusteringPlan =
ClusteringUtils.getClusteringPlan(metaClient,
requestedInstant).get().getRight();
+
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
inflightInstant));
HoodieClusteringPlan inflightClusteringPlan =
ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight();
assertEquals(requestedClusteringPlan, inflightClusteringPlan);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 2b8656f2148..c4587cc2c0b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -514,7 +514,7 @@ public class StreamerUtil {
public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant
instant, HoodieTimeline timeline) {
return tableType == HoodieTableType.MERGE_ON_READ
? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a
compaction
- : !ClusteringUtils.isClusteringInstant(instant, timeline); // not a
clustering
+ : !ClusteringUtils.isCompletedClusteringInstant(instant, timeline);
// not a clustering
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 22a61d58881..2014db073d9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,9 +23,10 @@ import
org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENE
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
import org.apache.hudi.avro.AvroSchemaCompatibility.SchemaIncompatibilityType
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -33,7 +34,8 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant,
HoodieTimeline, Tim
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
-import org.apache.hudi.common.util
+import org.apache.hudi.common.util.{ClusteringUtils, Option}
+import org.apache.hudi.common.{HoodiePendingRollbackInfo, util}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
@@ -44,6 +46,7 @@ import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
+import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils,
ScalaAssertionSupport}
@@ -1819,9 +1822,9 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
@ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
- def testInsertOverwriteCluster(recordType: HoodieRecordType): Unit = {
- val (writeOpts, _) = getWriterReaderOpts(recordType)
+ @EnumSource(value = classOf[HoodieInstant.State], names = Array("REQUESTED",
"INFLIGHT", "COMPLETED"))
+ def testInsertOverwriteCluster(firstClusteringState: HoodieInstant.State):
Unit = {
+ val (writeOpts, _) = getWriterReaderOpts()
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -1831,6 +1834,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
INLINE_CLUSTERING_ENABLE.key() -> "true",
"hoodie.clustering.inline.max.commits" -> "2",
"hoodie.clustering.plan.strategy.sort.columns" -> "_row_key",
+ "hoodie.clustering.plan.strategy.max.num.groups" -> "1",
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
@@ -1843,7 +1847,15 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
.mode(SaveMode.Overwrite)
.save(basePath)
- for (i <- 1 until 6) {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(hadoopConf)
+ .build()
+
+ assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isEmpty)
+
+ var lastClustering: HoodieInstant = null
+ for (i <- 1 until 4) {
val records = recordsToStrings(dataGen.generateInsertsForPartition("00"
+ i, 10, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
@@ -1851,21 +1863,72 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
+ val lastInstant =
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
+ if (i == 1 || i == 3) {
+ // Last instant is clustering
+ assertTrue(TimelineUtils.getCommitMetadata(lastInstant,
metaClient.getActiveTimeline)
+ .getOperationType.equals(WriteOperationType.CLUSTER))
+
assertTrue(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline,
lastInstant))
+ lastClustering = lastInstant
+ assertEquals(
+ lastClustering,
+ metaClient.getActiveTimeline.getLastClusteringInstant.get)
+ } else {
+ assertTrue(TimelineUtils.getCommitMetadata(lastInstant,
metaClient.getActiveTimeline)
+ .getOperationType.equals(WriteOperationType.INSERT_OVERWRITE))
+
assertFalse(ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline,
lastInstant))
+ assertEquals(
+ lastClustering,
+ metaClient.getActiveTimeline.getLastClusteringInstant.get)
+ }
+ if (i == 1) {
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .forTable("hoodie_test")
+ .withPath(basePath)
+ .withProps(optsWithCluster)
+ .build()
+ if (firstClusteringState == HoodieInstant.State.INFLIGHT
+ || firstClusteringState == HoodieInstant.State.REQUESTED) {
+ // Move the clustering to inflight for testing
+ fs.delete(new Path(metaClient.getMetaPath, lastInstant.getFileName),
false)
+ val inflightClustering =
metaClient.reloadActiveTimeline.lastInstant.get
+ assertTrue(inflightClustering.isInflight)
+ assertEquals(
+ inflightClustering,
+ metaClient.getActiveTimeline.getLastClusteringInstant.get)
+ }
+ if (firstClusteringState == HoodieInstant.State.REQUESTED) {
+ val table = HoodieSparkTable.create(writeConfig, context)
+ table.rollbackInflightClustering(
+ metaClient.getActiveTimeline.getLastClusteringInstant.get,
+ new java.util.function.Function[String,
Option[HoodiePendingRollbackInfo]] {
+ override def apply(commitToRollback: String):
Option[HoodiePendingRollbackInfo] = {
+ new SparkRDDWriteClient(context,
writeConfig).getTableServiceClient
+ .getPendingRollbackInfo(table.getMetaClient,
commitToRollback, false)
+ }
+ })
+ val requestedClustering =
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
+ assertTrue(requestedClustering.isRequested)
+ assertEquals(
+ requestedClustering,
+ metaClient.getActiveTimeline.getLastClusteringInstant.get)
+ }
+ // This should not schedule any new clustering
+ new SparkRDDWriteClient(context, writeConfig)
+
.scheduleClustering(org.apache.hudi.common.util.Option.of(Map[String,
String]()))
+ assertEquals(lastInstant.getTimestamp,
+
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get.getTimestamp)
+ }
}
-
- val metaClient = HoodieTableMetaClient.builder()
- .setBasePath(basePath)
- .setConf(hadoopConf)
- .build()
- val timeline = metaClient.getActiveTimeline
- val instants =
timeline.getAllCommitsTimeline.filterCompletedInstants.getInstants
- assertEquals(9, instants.size)
+ val timeline = metaClient.reloadActiveTimeline
+ val instants = timeline.getCommitsTimeline.getInstants
+ assertEquals(6, instants.size)
val replaceInstants = instants.filter(i =>
i.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).toList
- assertEquals(8, replaceInstants.size)
+ assertEquals(5, replaceInstants.size)
val clusterInstants = replaceInstants.filter(i => {
TimelineUtils.getCommitMetadata(i,
metaClient.getActiveTimeline).getOperationType.equals(WriteOperationType.CLUSTER)
})
- assertEquals(3, clusterInstants.size)
+ assertEquals(2, clusterInstants.size)
}