This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 506f106cc2f [HUDI-7507] Adding timestamp ordering validation before
creating requested instant (#11580)
506f106cc2f is described below
commit 506f106cc2f17021342a017cac023a43b15b9e01
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Oct 7 14:36:21 2024 -0700
[HUDI-7507] Adding timestamp ordering validation before creating requested
instant (#11580)
- Adding timestamp ordering validation before creating requested timeline
file
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 26 ++++-
.../hudi/client/timeline/TimestampUtils.java | 37 +++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 17 +++
.../java/org/apache/hudi/table/HoodieTable.java | 13 +++
.../action/clean/CleanPlanActionExecutor.java | 1 +
.../cluster/ClusteringPlanActionExecutor.java | 1 +
.../compact/ScheduleCompactionActionExecutor.java | 1 +
.../rollback/BaseRollbackPlanActionExecutor.java | 1 +
.../apache/hudi/client/HoodieFlinkWriteClient.java | 5 +
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 +
.../apache/hudi/client/HoodieJavaWriteClient.java | 5 +
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 5 +
.../hudi/client/TestJavaHoodieBackedMetadata.java | 6 +-
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 63 +++++++----
.../apache/hudi/client/SparkRDDWriteClient.java | 5 +
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 5 +
.../TestMultiWriterWithPreferWriterIngestion.java | 121 ++++++++++++++++-----
.../functional/TestHoodieBackedMetadata.java | 6 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 90 +++++++++++----
.../java/org/apache/hudi/table/TestCleaner.java | 46 ++++----
.../clean/TestCleanerInsertAndCleanByVersions.java | 2 +-
.../TestCopyOnWriteRollbackActionExecutor.java | 9 +-
.../table/functional/MockCompactionStrategy.java | 41 +++++++
.../table/functional/TestCleanPlanExecutor.java | 61 ++++++-----
.../TestHoodieSparkMergeOnReadTableCompaction.java | 106 ++++++++++++++++++
.../hudi/testutils/HoodieCleanerTestBase.java | 22 ++--
.../hudi/functional/RecordLevelIndexTestBase.scala | 28 +++--
.../TestHoodieBulkInsertDataInternalWriter.java | 7 +-
.../TestHoodieDataSourceInternalWriter.java | 11 +-
.../TestHoodieBulkInsertDataInternalWriter.java | 5 +-
.../TestHoodieDataSourceInternalBatchWrite.java | 8 +-
.../sources/TestGcsEventsHoodieIncrSource.java | 2 +-
.../utilities/sources/TestHoodieIncrSource.java | 24 ++--
.../sources/TestS3EventsHoodieIncrSource.java | 2 +-
34 files changed, 605 insertions(+), 182 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b9da3387654..780d48d16bc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -30,6 +30,7 @@ import
org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.timeline.TimestampUtils;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
@@ -331,6 +332,21 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig
config, Configuration hadoopConf, HoodieTableMetaClient metaClient);
+ /**
+ * Validate timestamp for new commits. Here we validate that the instant
time being validated is the latest among all entries in the timeline.
+ * This will ensure timestamps are monotonically increasing. With
multi=writers, out of order commits completion are still possible, just that
+ * when a new commit starts, it will always get the highest commit time
compared to other instants in the timeline.
+ * @param metaClient instance of{@link HoodieTableMetaClient} to be used.
+ * @param instantTime instant time of the current commit thats in progress.
+ */
+ protected abstract void validateTimestamp(HoodieTableMetaClient metaClient,
String instantTime);
+
+ protected void validateTimestampInternal(HoodieTableMetaClient metaClient,
String instantTime) {
+ if (config.shouldEnableTimestampOrderinValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+ }
+ }
+
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata,
String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
@@ -932,6 +948,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
+ "table could be in an inconsistent state. Pending restores: " +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
.map(instant ->
instant.getTimestamp()).collect(Collectors.toList()).toArray()));
+ HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED,
instantTime, actionType);
+ this.txnManager.beginTransaction(Option.of(requestedInstant),
+ lastCompletedTxnAndMetadata.isPresent() ?
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
+ try {
+ validateTimestamp(metaClient, instantTime);
+ } finally {
+ txnManager.endTransaction(Option.of(requestedInstant));
+ }
+
// if there are pending compactions, their instantTime must not be greater
than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
->
ValidationUtils.checkArgument(
@@ -941,7 +966,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
}
-
if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
metaClient.getActiveTimeline().createRequestedReplaceCommit(instantTime,
actionType);
} else {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
new file mode 100644
index 00000000000..dfc1578290b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.timeline;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+
+public class TimestampUtils {
+
+ public static void validateForLatestTimestamp(HoodieTableMetaClient
metaClient, String instantTime) {
+ // validate that the instant for which requested is about to be created is
the latest in the timeline.
+ if (!metaClient.isMetadataTable()) { // lets validate data table that
timestamps are generated in monotically increasing order.
+ HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(metaClient);
+
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry
-> {
+
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
+ "Found later commit time " + entry + ", compared to the current
instant " + instantTime + ", hence failing to create requested commit meta
file");
+ });
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 01a2b9c0c86..f4b2be343cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -741,6 +741,14 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ public static final ConfigProperty<Boolean>
ENABLE_TIMESTAMP_ORDERING_VALIDATION = ConfigProperty
+ .key("hoodie.timestamp.ordering.validate.enable")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("0.15.1")
+ .withDocumentation("Enable validation for commit time generation to
ensure new commit time generated is always the latest among other entries. "
+ + "This is for additional safety to always generate a monotonically
increasing commit times (for ingestion writer, table services etc).");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -2621,6 +2629,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return props.getInteger(WRITES_FILEID_ENCODING,
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}
+ public Boolean shouldEnableTimestampOrderinValidation() {
+ return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3135,6 +3147,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withEnableTimestampOrderingValidation(boolean
enableTimestampOrderingValidation) {
+ writeConfig.setValue(ENABLE_TIMESTAMP_ORDERING_VALIDATION,
Boolean.toString(enableTimestampOrderingValidation));
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 5d1279af2f7..8ff7b279024 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -31,6 +31,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.client.timeline.TimestampUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -896,6 +897,18 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
}
}
+ /**
+ * Validates that the instantTime is latest in the write timeline.
+ * @param instantTime instant time of interest.
+ */
+ public abstract void validateForLatestTimestamp(String instantTime);
+
+ protected void validateForLatestTimestampInternal(String instantTime) {
+ if (this.config.shouldEnableTimestampOrderinValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+ }
+ }
+
public HoodieFileFormat getBaseFileFormat() {
return metaClient.getTableConfig().getBaseFileFormat();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index 0329fc8ddc6..2cab17aab90 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -179,6 +179,7 @@ public class CleanPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I
final HoodieInstant cleanInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION,
startCleanTime);
// Save to both aux and timeline folder
try {
+ table.validateForLatestTimestamp(cleanInstant.getTimestamp());
table.getActiveTimeline().saveToCleanRequested(cleanInstant,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 54df15d6e80..91f64b1cee9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -98,6 +98,7 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends
BaseActionExecutor
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.setClusteringPlan(planOption.get())
.build();
+ table.validateForLatestTimestamp(clusteringInstant.getTimestamp());
table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
} catch (IOException ioe) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 77178c55455..e28c1ccd143 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -114,6 +114,7 @@ public class ScheduleCompactionActionExecutor<T, I, K, O>
extends BaseActionExec
Option<HoodieCompactionPlan> option = Option.empty();
if (plan != null && nonEmpty(plan.getOperations())) {
extraMetadata.ifPresent(plan::setExtraMetadata);
+ table.validateForLatestTimestamp(instantTime);
try {
if (operationType.equals(WriteOperationType.COMPACT)) {
HoodieInstant compactionInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 48af32751a3..045e8a5a720 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -113,6 +113,7 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new
HoodieInstantInfo(instantToRollback.getTimestamp(),
instantToRollback.getAction()), rollbackRequests,
LATEST_ROLLBACK_PLAN_VERSION);
if (!skipTimelinePublish) {
+ table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
if
(table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp()))
{
LOG.warn("Request Rollback found with instant time " +
rollbackInstant + ", hence skipping scheduling rollback");
} else {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index b31d4470d43..d762d52d9ee 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -123,6 +123,11 @@ public class HoodieFlinkWriteClient<T> extends
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context,
metaClient);
}
+ @Override
+ protected void validateTimestamp(HoodieTableMetaClient metaClient, String
instantTime) {
+ // no op
+ }
+
@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>>
hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 705299e6f97..e65b89ab43a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -92,6 +92,11 @@ public class HoodieFlinkCopyOnWriteTable<T>
super(config, context, metaClient);
}
+ @Override
+ public void validateForLatestTimestamp(String instantTime) {
+ // no-op
+ }
+
/**
* Upsert a batch of new records into Hoodie table at the supplied
instantTime.
*
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 596767e8cc6..ca55244128d 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -103,6 +103,11 @@ public class HoodieJavaWriteClient<T> extends
return HoodieJavaTable.create(config, context, metaClient);
}
+ @Override
+ protected void validateTimestamp(HoodieTableMetaClient metaClient, String
instantTime) {
+ validateTimestampInternal(metaClient, instantTime);
+ }
+
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 525f153a395..cc91d5d3436 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -92,6 +92,11 @@ public class HoodieJavaCopyOnWriteTable<T>
super(config, context, metaClient);
}
+ @Override
+ public void validateForLatestTimestamp(String instantTime) {
+ validateForLatestTimestampInternal(instantTime);
+ }
+
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext
context,
String instantTime,
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 5d58059b573..d2f069afbe4 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -1786,7 +1786,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieJavaWriteClient client = getHoodieWriteClient(config);
// Write 1 (Bulk insert)
- String newCommitTime = "0000001";
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(records, newCommitTime);
@@ -1794,7 +1794,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
// Write 2 (inserts)
- newCommitTime = "0000002";
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
@@ -1827,7 +1827,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(),
file))));
// trigger new write to mimic other writes succeeding before re-attempt.
- newCommitTime = "0000003";
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(records, newCommitTime);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 7dee7d2b29c..5fc2c36acfc 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -1319,25 +1319,29 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// perform 1 successful commit
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, true);
// Perform 2 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, true);
HoodieTableMetaClient metaClient = createMetaClient();
@@ -1349,13 +1353,14 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
// Await till enough time passes such that the first 2 failed commits
heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
client.close();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
- writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit5)),
commit5,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, true);
client.clean();
@@ -1396,47 +1401,53 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
HoodieTestUtils.init(storageConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
// Perform 1 successful writes to table
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, true);
// Perform 1 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit2,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
// Perform 2 failed writes to table
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
// Await till enough time passes such that the 2 failed commits heartbeats
are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit4);
Thread.sleep(2000);
}
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
// Perform 2 failed commits
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
+ String commit6 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit6, commit5, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 300,
0, false);
client.close();
@@ -1458,28 +1469,33 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTestUtils.init(storageConf, basePath);
HoodieJavaWriteClient client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
// perform 1 successful write
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 100,
0, true);
// Perform 2 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit2,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 100,
0, false);
client.close();
+
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, HoodieJavaWriteClient::bulkInsert,
false, 100, 100,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
// Create a successful commit
- Future<List<WriteStatus>> commit3 = service.submit(() -> writeBatch(new
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "400", "300", Option.of(Arrays.asList("400")), "300", 100,
dataGen::generateInserts,
+ Future<List<WriteStatus>> commit4Future = service.submit(() ->
writeBatch(new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true)),
+ commit4, commit3, Option.of(Arrays.asList(commit4)), commit3, 100,
dataGen::generateInserts,
HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
- commit3.get();
+ commit4Future.get();
HoodieTableMetaClient metaClient = createMetaClient();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
@@ -1490,14 +1506,15 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
// Await till enough time passes such that the first 2 failed commits
heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
- Future<List<WriteStatus>> commit4 = service.submit(() -> writeBatch(new
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "500", "400", Option.of(Arrays.asList("500")), "500", 100,
dataGen::generateInserts,
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
+ Future<List<WriteStatus>> commit5Future = service.submit(() ->
writeBatch(new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true)),
+ commit5, commit4, Option.of(Arrays.asList(commit5)), commit5, 100,
dataGen::generateInserts,
HoodieJavaWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new
HoodieJavaWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)).clean());
- commit4.get();
+ commit5Future.get();
clean1.get();
client.close();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index bbdd34835ad..ef6cdde67bd 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -129,6 +129,11 @@ public class SparkRDDWriteClient<T> extends
return HoodieSparkTable.create(config, context, metaClient);
}
+ @Override
+ protected void validateTimestamp(HoodieTableMetaClient metaClient, String
instantTime) {
+ validateTimestampInternal(metaClient, instantTime);
+ }
+
@Override
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>>
hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 441ac9eb1ec..fea0a19dcdf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -104,6 +104,11 @@ public class HoodieSparkCopyOnWriteTable<T>
super(config, context, metaClient);
}
+ @Override
+ public void validateForLatestTimestamp(String instantTime) {
+ validateForLatestTimestampInternal(instantTime);
+ }
+
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
upsert(HoodieEngineContext context, String instantTime,
HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext)
context, config, this, instantTime, records).execute();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
index 68aadf0cccf..0a8130f0788 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
@@ -56,6 +56,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
@@ -82,7 +83,7 @@ public class TestMultiWriterWithPreferWriterIngestion extends
HoodieClientTestBa
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE",
"MERGE_ON_READ"})
+ @EnumSource(value = HoodieTableType.class, names = {"MERGE_ON_READ"})
public void
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType)
throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
@@ -123,67 +124,123 @@ public class TestMultiWriterWithPreferWriterIngestion
extends HoodieClientTestBa
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
// Create upserts, schedule cleaning, schedule compaction in parallel
- String instant4 = HoodieActiveTimeline.createNewInstantTime();
Future future1 = executors.submit(() -> {
int numRecords = 100;
String commitTimeBetweenPrevAndNew = instantTime2;
try {
- // For both COW and MOR table types the commit should not be blocked,
since we are giving preference to ingestion.
- createCommitWithUpserts(cfg, client1, instantTime3,
commitTimeBetweenPrevAndNew, instant4, numRecords);
- validInstants.add(instant4);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ // For both COW and MOR table types the commit should not be
blocked, since we are giving preference to ingestion.
+ try {
+ String instant4 = HoodieActiveTimeline.createNewInstantTime();
+ createCommitWithUpserts(cfg, client1, instantTime3,
commitTimeBetweenPrevAndNew, instant4, numRecords);
+ validInstants.add(instant4);
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit
time")) {
+ throw e;
+ }
+ }
+ }
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
- String instant5 = HoodieActiveTimeline.createNewInstantTime();
+ AtomicReference<String> instant5Ref = new AtomicReference<>();
Future future2 = executors.submit(() -> {
try {
- client2.scheduleTableService(instant5, Option.empty(),
TableServiceType.COMPACT);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ try {
+ String instant5 = HoodieActiveTimeline.createNewInstantTime();
+ client2.scheduleTableService(instant5, Option.empty(),
TableServiceType.COMPACT);
+ instant5Ref.set(instant5);
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit
time")) {
+ throw e;
+ }
+ }
+ }
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
throw new RuntimeException(e2);
}
}
});
- String instant6 = HoodieActiveTimeline.createNewInstantTime();
+ AtomicReference<String> instant6Ref = new AtomicReference<>();
Future future3 = executors.submit(() -> {
- try {
- client2.scheduleTableService(instant6, Option.empty(),
TableServiceType.CLEAN);
- } catch (Exception e2) {
- throw new RuntimeException(e2);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ try {
+ String instant6 = HoodieActiveTimeline.createNewInstantTime();
+ client2.scheduleTableService(instant6, Option.empty(),
TableServiceType.CLEAN);
+ instant6Ref.set(instant6);
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit time")) {
+ throw e;
+ }
+ } catch (Exception e2) {
+ throw new RuntimeException(e2);
+ }
}
});
future1.get();
future2.get();
future3.get();
// Create inserts, run cleaning, run compaction in parallel
- String instant7 = HoodieActiveTimeline.createNewInstantTime();
future1 = executors.submit(() -> {
int numRecords = 100;
- try {
- createCommitWithInserts(cfg, client1, instantTime3, instant7,
numRecords);
- validInstants.add(instant7);
- } catch (Exception e1) {
- throw new RuntimeException(e1);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ try {
+ String instant7 = HoodieActiveTimeline.createNewInstantTime();
+ createCommitWithInserts(cfg, client1, instantTime3, instant7,
numRecords);
+ validInstants.add(instant7);
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit time")) {
+ throw e;
+ }
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
}
});
future2 = executors.submit(() -> {
- try {
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client2.compact(instant5);
- client2.commitCompaction(instant5,
compactionMetadata.getCommitMetadata().get(), Option.empty());
- validInstants.add(instant5);
- } catch (Exception e2) {
- if (tableType == HoodieTableType.MERGE_ON_READ) {
- Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ try {
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
client2.compact(instant5Ref.get());
+ client2.commitCompaction(instant5Ref.get(),
compactionMetadata.getCommitMetadata().get(), Option.empty());
+ validInstants.add(instant5Ref.get());
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit time")) {
+ throw e;
+ }
+ } catch (Exception e2) {
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
+ }
}
}
});
future3 = executors.submit(() -> {
- try {
- client2.clean(instant6, false);
- validInstants.add(instant6);
- } catch (Exception e2) {
- throw new RuntimeException(e2);
+ int counter = 0;
+ while (counter++ < 5) { // we can't really time which writer triggers
first and which one triggers later. So, lets add few rounds of retries.
+ try {
+ client2.clean(instant6Ref.get(), false);
+ validInstants.add(instant6Ref.get());
+ break;
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().toString().contains("Found later commit time")) {
+ throw e;
+ }
+ } catch (Exception e2) {
+ throw new RuntimeException(e2);
+ }
}
});
future1.get();
@@ -195,6 +252,10 @@ public class TestMultiWriterWithPreferWriterIngestion
extends HoodieClientTestBa
Assertions.assertTrue(validInstants.containsAll(completedInstants));
}
+ private String getNextCommitTime() {
+ return HoodieActiveTimeline.createNewInstantTime();
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE",
"MERGE_ON_READ"})
public void testHoodieClientMultiWriterWithClustering(HoodieTableType
tableType) throws Exception {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 196ab9cb434..3af0d3af9d0 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2202,7 +2202,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
SparkRDDWriteClient client = getHoodieWriteClient(config);
// Write 1 (Bulk insert)
- String newCommitTime = "0000001";
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
1), newCommitTime).collect();
@@ -2210,7 +2210,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
// Write 2 (inserts)
- newCommitTime = "0000002";
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
@@ -2240,7 +2240,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(),
file))));
// trigger new write to mimic other writes succeeding before re-attempt.
- newCommitTime = "0000003";
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1),
newCommitTime).collect();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index e9020034c07..624570fdd59 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -945,6 +945,34 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOutOfOrderCommitTimestamps(boolean enableValidation) {
+ HoodieWriteConfig config = getConfigBuilder()
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withEnableTimestampOrderingValidation(enableValidation)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(commit1);
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(commit2);
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(commit3);
+
+ // create commit4 after commit5. commit4 creation should fail when
timestamp ordering validation is enabled.
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(commit5);
+ if (enableValidation) {
+ assertThrows(IllegalArgumentException.class, () ->
client.startCommitWithTime(commit4));
+ } else {
+ client.startCommitWithTime(commit4);
+ }
+ }
+ }
+
@Test
public void testPendingRestore() throws IOException {
HoodieWriteConfig config = getConfigBuilder().withMetadataConfig(
@@ -2437,25 +2465,29 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
SparkRDDWriteClient client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// perform 1 successful commit
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, true);
// Perform 2 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit1,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, true);
HoodieTableMetaClient metaClient = createMetaClient(basePath);
@@ -2467,12 +2499,13 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
// Await till enough time passes such that the first 2 failed commits
heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
- writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit5)),
commit5,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, true);
client.clean();
@@ -2515,32 +2548,36 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
SparkRDDWriteClient client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful writes to table
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, true);
// Perform 1 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit2,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
// Perform 2 failed writes to table
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit4, commit3, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
// Await till enough time passes such that the 2 failed commits heartbeats
are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit4);
Thread.sleep(2000);
}
client.clean();
@@ -2548,13 +2585,15 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit5, commit4, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
+ String commit6 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
- writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400",
+ writeBatch(client, commit6, commit5, Option.of(Arrays.asList(commit4)),
commit4,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 300,
0, false);
client.close();
@@ -2577,27 +2616,31 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
HoodieTestUtils.init(storageConf, basePath);
SparkRDDWriteClient client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
// perform 1 successful write
- writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit1, commit1, Option.of(Arrays.asList(commit1)),
commit1,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 100,
0, true);
// Perform 2 failed writes to table
- writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ writeBatch(client, commit2, commit1, Option.of(Arrays.asList(commit2)),
commit2,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 100,
0, false);
client.close();
+ String commit3 = HoodieActiveTimeline.createNewInstantTime();
client = new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true));
- writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
+ writeBatch(client, commit3, commit2, Option.of(Arrays.asList(commit3)),
commit3,
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false,
100, 100,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Create a successful commit
- Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "400", "300", Option.of(Arrays.asList("400")), "300", 100,
dataGen::generateInserts,
+ String commit4 = HoodieActiveTimeline.createNewInstantTime();
+ Future<JavaRDD<WriteStatus>> commit4Future = service.submit(() ->
writeBatch(new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true)),
+ commit4, commit3, Option.of(Arrays.asList(commit4)), commit3, 100,
dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
- commit3.get();
+ commit4Future.get();
HoodieTableMetaClient metaClient = createMetaClient(basePath);
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
@@ -2608,14 +2651,15 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
// Await till enough time passes such that the first 2 failed commits
heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
- conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
+ conditionMet = client.getHeartbeatClient().isHeartbeatExpired(commit3);
Thread.sleep(2000);
}
- Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)),
- "500", "400", Option.of(Arrays.asList("500")), "500", 100,
dataGen::generateInserts,
+ String commit5 = HoodieActiveTimeline.createNewInstantTime();
+ Future<JavaRDD<WriteStatus>> commit5Future = service.submit(() ->
writeBatch(new SparkRDDWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, true)),
+ commit5, commit4, Option.of(Arrays.asList(commit5)), commit5, 100,
dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new
SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy,
true)).clean());
- commit4.get();
+ commit5Future.get();
clean1.get();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index bce99444dcf..d94fdab3f25 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -154,7 +154,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
assertNoWriteErrors(statuses.collect());
// verify that there is a commit
metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline();
+ HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitsTimeline();
assertEquals(1, timeline.findInstantsAfter("000",
Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
// Should have 100 records in table (check using Index), all in locations
marked at commit
HoodieTable table = HoodieSparkTable.create(client.getConfig(), context,
metaClient);
@@ -1084,7 +1084,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
testTable.addReplaceCommit("00000000000004",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
// run cleaner with failures
- List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true,
simulateMetadataFailure, 5, true);
+ List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true,
simulateMetadataFailure, 5, true, Option.empty());
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
@@ -1131,16 +1131,16 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1, file2P1));
}
});
- commitWithMdt("10", part1ToFileId, testTable, metadataWriter);
- testTable.addClean("15");
- commitWithMdt("20", part1ToFileId, testTable, metadataWriter);
+ commitWithMdt(makeNewCommitTime(10, "%09d"), part1ToFileId, testTable,
metadataWriter);
+ testTable.addClean(makeNewCommitTime(15, "%09d"));
+ commitWithMdt(makeNewCommitTime(20, "%09d"), part1ToFileId, testTable,
metadataWriter);
// add clean instant
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""),
"", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>(), Collections.emptyMap());
HoodieCleanMetadata cleanMeta = new HoodieCleanMetadata("", 0L, 0,
- "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), Collections.emptyMap());
- testTable.addClean("30", cleanerPlan, cleanMeta);
+ makeNewCommitTime(20, "%09d"), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), Collections.emptyMap());
+ testTable.addClean(makeNewCommitTime(30, "%09d"), cleanerPlan,
cleanMeta);
// add file in partition "part_2"
String file3P2 = UUID.randomUUID().toString();
@@ -1150,8 +1150,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p2, CollectionUtils.createImmutableList(file3P2, file4P2));
}
});
- commitWithMdt("30", part2ToFileId, testTable, metadataWriter);
- commitWithMdt("40", part2ToFileId, testTable, metadataWriter);
+ commitWithMdt(makeNewCommitTime(30, "%09d"), part2ToFileId, testTable,
metadataWriter);
+ commitWithMdt(makeNewCommitTime(40, "%09d"), part2ToFileId, testTable,
metadataWriter);
// empty commits
String file5P2 = UUID.randomUUID().toString();
@@ -1161,25 +1161,25 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p2, CollectionUtils.createImmutableList(file5P2, file6P2));
}
});
- commitWithMdt("50", part2ToFileId, testTable, metadataWriter);
- commitWithMdt("60", part2ToFileId, testTable, metadataWriter);
+ commitWithMdt(makeNewCommitTime(50, "%09d"), part2ToFileId, testTable,
metadataWriter);
+ commitWithMdt(makeNewCommitTime(60, "%09d"), part2ToFileId, testTable,
metadataWriter);
// archive commit 1, 2
new HoodieTimelineArchiver<>(config, HoodieSparkTable.create(config,
context, metaClient))
.archiveIfRequired(context, false);
metaClient = HoodieTableMetaClient.reload(metaClient);
- assertFalse(metaClient.getActiveTimeline().containsInstant("10"));
- assertFalse(metaClient.getActiveTimeline().containsInstant("20"));
-
- runCleaner(config);
- assertFalse(testTable.baseFileExists(p1, "10", file1P1), "Clean old
FileSlice in p1 by fallback to full clean");
- assertFalse(testTable.baseFileExists(p1, "10", file2P1), "Clean old
FileSlice in p1 by fallback to full clean");
- assertFalse(testTable.baseFileExists(p2, "30", file3P2), "Clean old
FileSlice in p2");
- assertFalse(testTable.baseFileExists(p2, "30", file4P2), "Clean old
FileSlice in p2");
- assertTrue(testTable.baseFileExists(p1, "20", file1P1), "Latest
FileSlice exists");
- assertTrue(testTable.baseFileExists(p1, "20", file2P1), "Latest
FileSlice exists");
- assertTrue(testTable.baseFileExists(p2, "40", file3P2), "Latest
FileSlice exists");
- assertTrue(testTable.baseFileExists(p2, "40", file4P2), "Latest
FileSlice exists");
+
assertFalse(metaClient.getActiveTimeline().containsInstant(makeNewCommitTime(10,
"%09d")));
+
assertFalse(metaClient.getActiveTimeline().containsInstant(makeNewCommitTime(20,
"%09d")));
+
+ runCleaner(config, false, false, 80, false, Option.empty());
+ assertFalse(testTable.baseFileExists(p1, makeNewCommitTime(10, "%09d"),
file1P1), "Clean old FileSlice in p1 by fallback to full clean");
+ assertFalse(testTable.baseFileExists(p1, makeNewCommitTime(10, "%09d"),
file2P1), "Clean old FileSlice in p1 by fallback to full clean");
+ assertFalse(testTable.baseFileExists(p2, makeNewCommitTime(30, "%09d"),
file3P2), "Clean old FileSlice in p2");
+ assertFalse(testTable.baseFileExists(p2, makeNewCommitTime(30, "%09d"),
file4P2), "Clean old FileSlice in p2");
+ assertTrue(testTable.baseFileExists(p1, makeNewCommitTime(20, "%09d"),
file1P1), "Latest FileSlice exists");
+ assertTrue(testTable.baseFileExists(p1, makeNewCommitTime(20, "%09d"),
file2P1), "Latest FileSlice exists");
+ assertTrue(testTable.baseFileExists(p2, makeNewCommitTime(40, "%09d"),
file3P2), "Latest FileSlice exists");
+ assertTrue(testTable.baseFileExists(p2, makeNewCommitTime(40, "%09d"),
file4P2), "Latest FileSlice exists");
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
index f0cc4c3c789..bd97c22c3cf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java
@@ -141,7 +141,7 @@ public class TestCleanerInsertAndCleanByVersions extends
SparkClientFunctionalTe
? wrapRecordsGenFunctionForPreppedCalls(basePath(), storageConf(),
context(), cfg, dataGen::generateUniqueUpdates)
: dataGen::generateUniqueUpdates;
- HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
insertFirstBigBatchForClientCleanerTest(context(), metaClient, client,
recordInsertGenWrappedFunction, insertFn);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice =
new HashMap<>();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index e78ed757e8f..99f83e2ad43 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -443,15 +443,12 @@ public class TestCopyOnWriteRollbackActionExecutor
extends HoodieClientRollbackT
SparkRDDWriteClient clusteringClient = getHoodieWriteClient(
ClusteringTestUtils.getClusteringConfig(basePath,
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, properties));
- // Save an older instant for us to run clustering.
- String clusteringInstant1 = HoodieActiveTimeline.createNewInstantTime();
+ // Lets just trigger scheduling for a clustering instant.
+ String clusteringInstant1 =
ClusteringTestUtils.runClustering(clusteringClient, false, false);
- // Create completed clustering commit
+ // Create another clustering commit (and complete it)
ClusteringTestUtils.runClustering(clusteringClient, false, true);
- // Now execute clustering on the saved instant and do not allow it to
commit.
- ClusteringTestUtils.runClusteringOnInstant(clusteringClient, false, false,
clusteringInstant1);
-
HoodieTable table = this.getHoodieTable(metaClient,
getConfigBuilder().withEmbeddedTimelineServerEnabled(false).build());
HoodieInstant needRollBackInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, secondCommit);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
new file mode 100644
index 00000000000..8572f444f2e
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/MockCompactionStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
+
+import java.util.List;
+
+public class MockCompactionStrategy extends LogFileSizeBasedCompactionStrategy
{
+
+ @Override
+ public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig
writeConfig,
+
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan>
pendingCompactionPlans) {
+ HoodieCompactionPlan compactionPlan =
super.generateCompactionPlan(writeConfig, operations, pendingCompactionPlans);
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return compactionPlan;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index f9c2c82809e..96adcf69756 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -35,6 +35,7 @@ import
org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -62,6 +63,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -139,7 +141,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2,
true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2,
true, Option.empty());
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any
partitions and clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
@@ -158,7 +160,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsTwo =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4,
true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4,
true, Option.empty());
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any
partitions and clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
@@ -176,7 +178,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsThree =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6,
true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6,
true, Option.empty());
assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the
latest commit time to keep");
assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
@@ -192,7 +194,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsFour =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8,
true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8,
true, Option.empty());
// enableBootstrapSourceClean would delete the bootstrap base file as
the same time
HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour,
p0);
@@ -222,7 +224,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsFive =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
10, true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
10, true, Option.empty());
assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files
since at least 2 commits are needed from last clean operation before "
+ "clean can be scheduled again");
@@ -243,7 +245,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
List<HoodieCleanStat> hoodieCleanStatsFive2 =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
12, true);
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
12, true, Option.empty());
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
assertNull(cleanStat, "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
@@ -455,28 +457,28 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file
- String file1P0 =
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+ String file1P0 = testTable.addDeltaCommit(makeNewCommitTime(0,
"%09d")).getFileIdsWithBaseFilesInPartitions(p0).get(p0);
Map<String, List<String>> part1ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0));
}
});
- commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(makeNewCommitTime(0, "%09d"), part1ToFileId, testTable,
metadataWriter, true, true);
// Make 2 files, one base file and 1 log files associated with base file
- testTable.addDeltaCommit("001")
+ testTable.addDeltaCommit(makeNewCommitTime(1, "%09d"))
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 3);
- commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(makeNewCommitTime(1, "%09d"), part1ToFileId, testTable,
metadataWriter, true, true);
- List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
+ List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, 3, false);
assertEquals(3,
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
.size(), "Must clean three files, one base and 2 log files");
- assertFalse(testTable.baseFileExists(p0, "000", file1P0));
- assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
- assertTrue(testTable.baseFileExists(p0, "001", file1P0));
- assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
+ assertFalse(testTable.baseFileExists(p0, makeNewCommitTime(0, "%09d"),
file1P0));
+ assertFalse(testTable.logFilesExist(p0, makeNewCommitTime(0, "%09d"),
file1P0, 1, 2));
+ assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(1, "%09d"),
file1P0));
+ assertTrue(testTable.logFileExists(p0, makeNewCommitTime(1, "%09d"),
file1P0, 3));
}
}
@@ -506,30 +508,30 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p0, CollectionUtils.createImmutableList(file1P0));
}
});
- commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(makeNewCommitTime(0, "%09d"), part1ToFileId, testTable,
metadataWriter, true, true);
// Make 2 files, one base file and 1 log files associated with base file
- testTable.addDeltaCommit("001")
+ testTable.addDeltaCommit(makeNewCommitTime(1, "%09d"))
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 3);
- commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(makeNewCommitTime(1, "%09d"), part1ToFileId, testTable,
metadataWriter, true, true);
// Make 2 files, one base file and 1 log files associated with base file
- testTable.addDeltaCommit("002")
+ testTable.addDeltaCommit(makeNewCommitTime(2, "%09d"))
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 4);
- commitWithMdt("002", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt(makeNewCommitTime(2, "%09d"), part1ToFileId, testTable,
metadataWriter, true, true);
- List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
+ List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, false,
false, 3, false, Option.empty());
assertEquals(3,
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
.size(), "Must clean three files, one base and 2 log files");
- assertFalse(testTable.baseFileExists(p0, "000", file1P0));
- assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
- assertTrue(testTable.baseFileExists(p0, "001", file1P0));
- assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
- assertTrue(testTable.baseFileExists(p0, "002", file1P0));
- assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
+ assertFalse(testTable.baseFileExists(p0, makeNewCommitTime(0, "%09d"),
file1P0));
+ assertFalse(testTable.logFilesExist(p0, makeNewCommitTime(0, "%09d"),
file1P0, 1, 2));
+ assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(1, "%09d"),
file1P0));
+ assertTrue(testTable.logFileExists(p0, makeNewCommitTime(1, "%09d"),
file1P0, 3));
+ assertTrue(testTable.baseFileExists(p0, makeNewCommitTime(2, "%09d"),
file1P0));
+ assertTrue(testTable.logFileExists(p0, makeNewCommitTime(2, "%09d"),
file1P0, 4));
}
}
@@ -600,7 +602,7 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
testTable.addDeletePartitionCommit(deleteInstant1, p1,
Arrays.asList(file1P1, file2P1));
testTable.addDeletePartitionCommit(deleteInstant2, p2,
Arrays.asList(file1P2, file2P2));
- runCleaner(config);
+ runCleaner(config, Option.of((Functions.Function0<String>) () ->
HoodieActiveTimeline.createNewInstantTime()));
assertFalse(testTable.baseFileExists(p1, commitInstant, file1P1), "p1
cleaned");
assertFalse(testTable.baseFileExists(p1, commitInstant, file2P1), "p1
cleaned");
@@ -693,7 +695,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
commitWithMdt(thirdCommitTs, part3ToFileId, testTable, metadataWriter,
true, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
- List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config,
simulateFailureRetry, simulateMetadataFailure);
+ List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config,
simulateFailureRetry, simulateMetadataFailure, 0,
+ false, Option.of((Functions.Function0) () ->
HoodieActiveTimeline.createNewInstantTime()));
metaClient = HoodieTableMetaClient.reload(metaClient);
assertEquals(2, hoodieCleanStatsThree.size(), "Should clean one file
each from both the partitions");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index 85a9c6759c2..d05e6b5548b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -47,12 +47,15 @@ import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
import org.apache.hudi.table.action.rollback.RollbackUtils;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -61,6 +64,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
@@ -72,6 +76,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -81,6 +92,7 @@ import static
org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
@Tag("functional")
public class TestHoodieSparkMergeOnReadTableCompaction extends
SparkClientFunctionalTestHarness {
@@ -163,6 +175,100 @@ public class TestHoodieSparkMergeOnReadTableCompaction
extends SparkClientFuncti
assertEquals(300, readTableTotalRecordsNum());
}
+ @Test
+ public void testOutOfOrderCompactionSchedules() throws IOException,
ExecutionException, InterruptedException {
+ HoodieWriteConfig config = getWriteConfigWithMockCompactionStrategy();
+ HoodieWriteConfig config2 = getWriteConfig();
+
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, new
Properties());
+ client = getHoodieWriteClient(config);
+
+ // write data and commit
+ writeData(HoodieActiveTimeline.createNewInstantTime(), 100, true, false);
+
+ // 2nd batch
+ String commit2 = HoodieActiveTimeline.createNewInstantTime();
+ JavaRDD records = jsc().parallelize(dataGen.generateUniqueUpdates(commit2,
50), 2);
+ client.startCommitWithTime(commit2);
+ List<WriteStatus> writeStatuses = client.upsert(records,
commit2).collect();
+ List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ client.commitStats(commit2, context().parallelize(writeStatuses, 1),
writeStats, Option.empty(), metaClient.getCommitActionType());
+
+ // schedule compaction
+ String compactionInstant1 = HoodieActiveTimeline.createNewInstantTime();
+ String compactionInstant2 = HoodieActiveTimeline.createNewInstantTime(10 *
1000);
+ String commit3 = HoodieActiveTimeline.createNewInstantTime(15 * 1000);
+
+ final AtomicBoolean writer1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+ final ExecutorService executors = Executors.newFixedThreadPool(2);
+ try {
+ final SparkRDDWriteClient client2 = getHoodieWriteClient(config2);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Future future1 = executors.submit(() -> {
+ try {
+ if (countDownLatch.await(20, TimeUnit.SECONDS)) {
+ List<WriteStatus> writeStatuses1 = client.upsert(records,
commit3).collect();
+ List<HoodieWriteStat> writeStats1 =
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ client.commitStats(commit3, context().parallelize(writeStatuses1,
1), writeStats1, Option.empty(), metaClient.getCommitActionType());
+
+ client.scheduleCompactionAtInstant(compactionInstant1,
Option.empty());
+ // since compactionInstant1 is earlier than compactionInstant2,
and compaction strategy sleeps for 10s, this is expected to throw.
+ fail("Should not have reached here");
+ } else {
+ fail("Should not have reached here");
+ }
+ } catch (Exception e) {
+ writer1Completed.set(true);
+ }
+ });
+
+ Future future2 = executors.submit(() -> {
+ try {
+ assertTrue(client2.scheduleCompactionAtInstant(compactionInstant2,
Option.empty()));
+ writer2Completed.set(true);
+ countDownLatch.countDown();
+ } catch (Exception e) {
+ throw new HoodieException("Should not have reached here");
+ }
+ });
+
+ future2.get();
+ future1.get();
+
+ // both should have been completed successfully. I mean, we already
assert for conflict for writer2 at L155.
+ assertTrue(writer1Completed.get() && writer2Completed.get());
+ client.close();
+ client2.close();
+ } finally {
+ executors.shutdownNow();
+ }
+ }
+
+ private HoodieWriteConfig getWriteConfigWithMockCompactionStrategy() {
+ MockCompactionStrategy compactionStrategy = new MockCompactionStrategy();
+ return getWriteConfig(compactionStrategy);
+ }
+
+ private HoodieWriteConfig getWriteConfig() {
+ return getWriteConfig(new LogFileSizeBasedCompactionStrategy());
+ }
+
+ private HoodieWriteConfig getWriteConfig(CompactionStrategy
compactionStrategy) {
+ return HoodieWriteConfig.newBuilder()
+ .forTable("test-trip-table")
+ .withPath(basePath())
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withAutoCommit(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCompactionStrategy(compactionStrategy)
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .withEnableTimestampOrderingValidation(true)
+ .build();
+ }
+
@ParameterizedTest
@MethodSource("writeLogTest")
public void testWriteLogDuringCompaction(boolean enableMetadataTable,
boolean enableTimelineServer) throws IOException {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index ceeae9d107f..9bb44985b81 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
@@ -71,30 +72,34 @@ public class HoodieCleanerTestBase extends
HoodieClientTestBase {
return metadata;
}
+ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws
IOException {
+ return runCleaner(config, Option.empty());
+ }
+
/**
* Helper to run cleaner and collect Clean Stats.
*
* @param config HoodieWriteConfig
*/
- protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws
IOException {
- return runCleaner(config, false, false, 1, false);
+ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config,
Option<Functions.Function0<String>> commitTimeGenerationFunc) throws
IOException {
+ return runCleaner(config, false, false, 1, false,
commitTimeGenerationFunc);
}
protected List<HoodieCleanStat>
runCleanerWithInstantFormat(HoodieWriteConfig config, boolean
needInstantInHudiFormat) throws IOException {
- return runCleaner(config, false, false, 1, needInstantInHudiFormat);
+ return runCleaner(config, false, false, 1, needInstantInHudiFormat,
Option.empty());
}
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int
firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
- return runCleaner(config, false, false, firstCommitSequence,
needInstantInHudiFormat);
+ return runCleaner(config, false, false, firstCommitSequence,
needInstantInHudiFormat, Option.empty());
}
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean
simulateRetryFailure) throws IOException {
- return runCleaner(config, simulateRetryFailure, false, 1, false);
+ return runCleaner(config, simulateRetryFailure, false, 1, false,
Option.empty());
}
protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure) throws IOException {
- return runCleaner(config, simulateRetryFailure, simulateMetadataFailure,
1, false);
+ return runCleaner(config, simulateRetryFailure, simulateMetadataFailure,
1, false, Option.empty());
}
/**
@@ -104,9 +109,10 @@ public class HoodieCleanerTestBase extends
HoodieClientTestBase {
*/
protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean
simulateMetadataFailure,
- Integer firstCommitSequence, boolean needInstantInHudiFormat) throws
IOException {
+ Integer firstCommitSequence, boolean needInstantInHudiFormat,
Option<Functions.Function0<String>> commitTimeGenerationFunc) throws
IOException {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
- String cleanInstantTs = needInstantInHudiFormat ?
makeNewCommitTime(firstCommitSequence, "%014d") :
makeNewCommitTime(firstCommitSequence, "%09d");
+ String cleanInstantTs = commitTimeGenerationFunc.isPresent() ?
commitTimeGenerationFunc.get().apply() :
+ (needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence,
"%014d") : makeNewCommitTime(firstCommitSequence, "%09d"));
HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
if (null == cleanMetadata1) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 96853950d50..00fcbfbe044 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -28,11 +28,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JavaConversions
-
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, _}
import org.apache.spark.sql.functions.{col, not}
@@ -41,7 +41,6 @@ import org.junit.jupiter.api._
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors
-
import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
@@ -178,11 +177,26 @@ class RecordLevelIndexTestBase extends
HoodieSparkClientTestBase {
}
val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
latestBatchDf.cache()
- latestBatchDf.write.format("org.apache.hudi")
- .options(hudiOpts)
- .option(DataSourceWriteOptions.OPERATION.key, operation)
- .mode(saveMode)
- .save(basePath)
+ var counter = 0
+ var succeeded = false
+ while (!succeeded && counter < 5) {
+ try {
+ latestBatchDf.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(saveMode)
+ .save(basePath)
+ succeeded = true;
+ } catch {
+ case e: IllegalArgumentException => { // with HUDI-7507, we might get
illegal argument exception with multi-writers.
+ // Hence adding retries.
+ if (!e.getMessage.toString.contains("Found later commit time")) {
+ throw new HoodieException("Unexpected Exception thrown ", e)
+ }
+ counter += 1
+ }
+ }
+ }
val deletedDf = calculateMergedDf(latestBatchDf, operation)
deletedDf.cache()
if (validate) {
diff --git
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index 8e87755c294..171dd8cb6bd 100644
---
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.internal;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -74,7 +75,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
STRUCT_TYPE, populateMetaFields, sorted);
@@ -116,7 +117,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
for (int i = 0; i < 1; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
STRUCT_TYPE, populateMetaFields, sorted);
@@ -161,7 +162,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
- String instantTime = "001";
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
STRUCT_TYPE, true, false);
diff --git
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 61ceaebaee6..8c786068943 100644
---
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.internal;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -75,7 +76,7 @@ public class TestHoodieDataSourceInternalWriter extends
throws Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- String instantTime = "001";
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, new DataSourceOptions(extraMetadata),
populateMetaFields, false);
@@ -163,7 +164,7 @@ public class TestHoodieDataSourceInternalWriter extends
// execute N rounds
for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, new
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -210,7 +211,7 @@ public class TestHoodieDataSourceInternalWriter extends
// execute N rounds
for (int i = 0; i < 3; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, new
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -258,7 +259,7 @@ public class TestHoodieDataSourceInternalWriter extends
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- String instantTime0 = "00" + 0;
+ String instantTime0 = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, new
DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
@@ -298,7 +299,7 @@ public class TestHoodieDataSourceInternalWriter extends
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
- String instantTime1 = "00" + 1;
+ String instantTime1 = HoodieActiveTimeline.createNewInstantTime();
dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf,
new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields,
false);
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
index 206d4931b15..1050173ff11 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -19,6 +19,7 @@
package org.apache.hudi.spark3.internal;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -77,7 +78,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000),
RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted);
@@ -123,7 +124,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
- String instantTime = "001";
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000),
RANDOM.nextLong(), STRUCT_TYPE, true, false);
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
index 64042f2ebbb..0db60439075 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -21,6 +21,7 @@ package org.apache.hudi.spark3.internal;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -77,8 +78,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
private void testDataSourceWriterInternal(Map<String, String> extraMetadata,
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws
Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- String instantTime = "001";
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields,
false);
@@ -167,7 +167,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
// execute N rounds
for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
@@ -214,7 +214,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
// execute N rounds
for (int i = 0; i < 3; i++) {
- String instantTime = "00" + i;
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index 41ab16d7bfd..881b6599f77 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -403,7 +403,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private HoodieWriteConfig getWriteConfig() {
return getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
-
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index 319aa8540a4..696bea22334 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -105,22 +105,28 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(false).build())
.build();
+ String commit1 = HoodieActiveTimeline.createNewInstantTime();
+ String commit2 = HoodieActiveTimeline.createNewInstantTime(10);
+ String commit3 = HoodieActiveTimeline.createNewInstantTime(100);
+ String commit4 = HoodieActiveTimeline.createNewInstantTime(200);
+ String commit5 = HoodieActiveTimeline.createNewInstantTime(300);
+ String commit6 = HoodieActiveTimeline.createNewInstantTime(400);
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
- Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient,
INSERT, null, "100");
- Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient,
INSERT, null, "200");
- Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient,
INSERT, null, "300");
- Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient,
INSERT, null, "400");
- Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient,
INSERT, null, "500");
+ Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient,
INSERT, null, commit1);
+ Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient,
INSERT, null, commit2);
+ Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient,
INSERT, null, commit3);
+ Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient,
INSERT, null, commit4);
+ Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient,
INSERT, null, commit5);
// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.empty(), 500, inserts5.getKey());
- // even if the begin timestamp is archived (100), full table scan should
kick in, but should filter for records having commit time > 100
-
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of("100"), 400, inserts5.getKey());
+ // even if the begin timestamp is archived (commit1), full table scan
should kick in, but should filter for records having commit time > 100
+
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of(commit1), 400, inserts5.getKey());
// even if the read upto latest is set, if begin timestamp is in active
timeline, only incremental should kick in.
-
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of("400"), 100, inserts5.getKey());
+
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.of(commit4), 100, inserts5.getKey());
// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.empty(), 100, inserts5.getKey());
@@ -128,7 +134,7 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
// ensure checkpoint does not move
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.of(inserts5.getKey()), 0, inserts5.getKey());
- Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient,
INSERT, null, "600");
+ Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient,
INSERT, null, commit6);
// insert new batch and ensure the checkpoint moves
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
Option.of(inserts5.getKey()), 100, inserts6.getKey());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 2a011cd9812..dd1d15940da 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -220,7 +220,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
private HoodieWriteConfig getWriteConfig() {
return getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2,
3).build())
-
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();