This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e4d8f971d77 [HUDI-6736] Fixing rollback completion and commit timeline
files removal (#9521)
e4d8f971d77 is described below
commit e4d8f971d77885bea935fb685cc161186911677d
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 7 15:09:54 2023 -0400
[HUDI-6736] Fixing rollback completion and commit timeline files removal
(#9521)
The purpose of 8849 change is to fix the ordering of rollbacks such that,
the completion of rollback instant happens first followed by commits file
removal from the timeline.
For eg,
if t5.c.inflight is partially failed, and t6.rb.requested is triggered to
rollback.
towards the completion, t6.rb is moved to completed state. and later all t5
commit files are removed from the timeline.
This could lead to dangling commit files (t5) if the process crashes just
after moving the t6 rollback to completion. So, 8849 also introduced polling
completed rollbacks and ensure we don't trigger another rollback for t5.
But we missed that we already landed 5148 which was addressing a similar
issue.
As per 5148, we first need to delete the commit files from timeline (t5)
and then transition the rollback to completion (t6.rb). So, even if there is a
crash, if we re-attempt t6.rb.requested, it will get to completion w/o any
issues (even if t5 is not in the timeline at all).
Hence reverting some of the core changes added as part of 8849. But there
are some tests added and so not reverting the entire patch.
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <[email protected]>
---
.../hudi/client/BaseHoodieTableServiceClient.java | 57 ----------------------
.../rollback/BaseRollbackActionExecutor.java | 25 +++++-----
.../java/org/apache/hudi/table/TestCleaner.java | 38 +++++++++++++++
.../TestCopyOnWriteRollbackActionExecutor.java | 47 ------------------
.../hudi/testutils/HoodieClientTestBase.java | 44 -----------------
.../hudi/common/testutils/HoodieTestTable.java | 8 ---
.../deltastreamer/TestHoodieDeltaStreamer.java | 14 ++++--
7 files changed, 62 insertions(+), 171 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index ead1571ba26..090fa069c3d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -43,7 +43,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
@@ -62,7 +61,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -914,7 +912,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
protected Boolean rollbackFailedWrites() {
HoodieTable table = createTable(config, hadoopConf);
List<String> instantsToRollback =
getInstantsToRollback(table.getMetaClient(),
config.getFailedWritesCleanPolicy(), Option.empty());
- removeInflightFilesAlreadyRolledBack(instantsToRollback,
table.getMetaClient());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
getPendingRollbackInfos(table.getMetaClient());
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry,
Option.empty()));
rollbackFailedWrites(pendingRollbacks);
@@ -979,60 +976,6 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
}
}
- /**
- * This method filters out the instants that are already rolled back, but
their pending commit files are left
- * because of job failures. In addition to filtering out these instants, it
will also cleanup the inflight instants
- * from the timeline.
- */
- protected void removeInflightFilesAlreadyRolledBack(List<String>
instantsToRollback, HoodieTableMetaClient metaClient) {
- if (instantsToRollback.isEmpty()) {
- return;
- }
- // Find the oldest inflight timestamp.
- String lowestInflightCommitTime = Collections.min(instantsToRollback);
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
- // RollbackInstantMap should only be created for instants that are >
oldest inflight file to be removed.
- Map<String, String> failedInstantToRollbackCommitMap =
activeTimeline.getRollbackTimeline().filterCompletedInstants()
- .findInstantsAfter(lowestInflightCommitTime)
- .getInstantsAsStream()
- .map(rollbackInstant -> {
- try {
- return
Pair.of(TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-
activeTimeline.getInstantDetails(rollbackInstant).get()).getInstantsRollback().get(0).getCommitTime(),
- rollbackInstant.getTimestamp());
- } catch (IOException e) {
- LOG.error("Error reading rollback metadata for instant {}",
rollbackInstant, e);
- return Pair.of("", rollbackInstant.getTimestamp());
- }
- }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (v1, v2) ->
v1));
- // List of inflight instants that are already completed.
- List<String> rollbackCompletedInstants =
- instantsToRollback.stream()
- .filter(failedInstantToRollbackCommitMap::containsKey)
- .collect(Collectors.toList());
- LOG.info("Rollback completed instants {}", rollbackCompletedInstants);
- try {
- this.txnManager.beginTransaction(Option.empty(), Option.empty());
- rollbackCompletedInstants.forEach(instant -> {
- // remove pending commit files.
- HoodieInstant hoodieInstant = activeTimeline
- .filter(instantTime ->
- HoodieTimeline.compareTimestamps(instantTime.getTimestamp(),
HoodieTimeline.EQUALS, instant))
- .firstInstant().get();
- BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(
- true, activeTimeline, metaClient, hoodieInstant);
- });
- instantsToRollback.removeAll(rollbackCompletedInstants);
- } catch (Exception e) {
- LOG.error("Error in deleting the inflight instants that are already
rolled back {}",
- rollbackCompletedInstants, e);
- throw new HoodieRollbackException("Error in deleting the inflight
instants that are already rolled back");
- } finally {
- this.txnManager.endTransaction(Option.empty());
- }
- }
-
private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
Stream<HoodieInstant> inflightInstantsStream) {
// Get expired instants, must store them into list before double-checking
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 43e3e814bda..662bfe36299 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.rollback;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -26,7 +25,6 @@ import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -43,6 +41,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.marker.WriteMarkersFactory;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,17 +253,18 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
// Then transition the inflight rollback to completed state.
if (!skipTimelinePublish) {
writeTableMetadata(rollbackMetadata);
+ }
+
+ // Then we delete the inflight instant in the data table timeline if
enabled
+ deleteInflightAndRequestedInstant(deleteInstants,
table.getActiveTimeline(), resolvedInstant);
+
+ // If publish the rollback to the timeline, we finally transition the
inflight rollback
+ // to complete in the data table timeline
+ if (!skipTimelinePublish) {
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " +
rollbackMetadata.getCommitsRollback() + " is complete");
}
-
- // Commit to rollback instant files are deleted after the rollback
commit is transitioned from inflight to completed
- // If job were to fail after transitioning rollback from inflight to
complete and before delete the instant files,
- // then subsequent retries of the rollback for this instant will see if
there is a completed rollback present for this instant
- // and then directly delete the files and abort.
- deleteInflightAndRequestedInstant(deleteInstants,
table.getActiveTimeline(), table.getMetaClient(), resolvedInstant);
-
} catch (IOException e) {
throw new HoodieIOException("Error executing rollback at instant " +
instantTime, e);
} finally {
@@ -280,13 +280,14 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
- public static void deleteInflightAndRequestedInstant(boolean deleteInstant,
HoodieActiveTimeline activeTimeline,
- HoodieTableMetaClient
metaClient, HoodieInstant instantToBeDeleted) {
+ protected void deleteInflightAndRequestedInstant(boolean deleteInstant,
+ HoodieActiveTimeline
activeTimeline,
+ HoodieInstant
instantToBeDeleted) {
// Remove the rolled back inflight commits
if (deleteInstant) {
LOG.info("Deleting instant=" + instantToBeDeleted);
activeTimeline.deletePending(instantToBeDeleted);
- if (instantToBeDeleted.isInflight() &&
!metaClient.getTimelineLayoutVersion().isNullVersion()) {
+ if (instantToBeDeleted.isInflight() &&
!table.getMetaClient().getTimelineLayoutVersion().isNullVersion()) {
// Delete corresponding requested instant
instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED,
instantToBeDeleted.getAction(),
instantToBeDeleted.getTimestamp());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 17d8adbe25c..24ced3456f6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -22,8 +22,12 @@ import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -40,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -60,6 +65,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -95,6 +101,7 @@ import java.util.stream.Stream;
import scala.Tuple3;
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NO_PARTITION_PATH;
import static
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
@@ -690,6 +697,37 @@ public class TestCleaner extends HoodieCleanerTestBase {
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}
+ private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata>
generateReplaceCommitMetadata(
+ String instantTime, String partition, String replacedFileId, String
newFileId) {
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
+
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
+ requestedReplaceMetadata.setVersion(1);
+ HoodieSliceInfo sliceInfo =
HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
+ List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
+ clusteringGroups.add(HoodieClusteringGroup.newBuilder()
+
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
+ .setSlices(Collections.singletonList(sliceInfo)).build());
+ requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
+
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
+ .setVersion(1).setExtraMetadata(Collections.emptyMap())
+
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
+ .setInputGroups(clusteringGroups).build());
+
+ HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
+ replaceMetadata.addReplaceFileId(partition, replacedFileId);
+ replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
+ if (!StringUtils.isNullOrEmpty(newFileId)) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partition);
+ writeStat.setPath(partition + "/" + getBaseFilename(instantTime,
newFileId));
+ writeStat.setFileId(newFileId);
+ writeStat.setTotalWriteBytes(1);
+ writeStat.setFileSizeInBytes(1);
+ replaceMetadata.addWriteStat(partition, writeStat);
+ }
+ return Pair.of(requestedReplaceMetadata, replaceMetadata);
+ }
+
@Test
public void testCleanMetadataUpgradeDowngrade() {
String instantTime = "000";
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 37266950c04..07dc831578c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -468,51 +468,4 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
context, table.getConfig(), table, rollbackInstant,
needRollBackInstant, true, false, true);
copyOnWriteRollbackActionExecutorForClustering.execute();
}
-
- /**
- * This method tests rollback of completed ingestion commits and
replacecommit inflight files
- * when there is another replacecommit with greater timestamp already
present in the timeline.
- */
- @Test
- public void testDeletingInflightsWhichAreAlreadyRolledBack() throws
Exception {
-
- // insert data
- HoodieWriteConfig writeConfig =
getConfigBuilder().withAutoCommit(false).build();
- SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
-
- // Create a base commit.
- int numRecords = 200;
- String firstCommit = HoodieActiveTimeline.createNewInstantTime();
- String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
- dataGen = new HoodieTestDataGenerator(new String[]{partitionStr});
- writeBatch(writeClient, firstCommit, "000",
Option.of(Arrays.asList("000")), "000",
- numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert,
true, numRecords, numRecords,
- 1, true);
- // Create inflight commit.
- String secondCommit = writeClient.startCommit();
- // Insert completed commit
- String thirdCommit = HoodieActiveTimeline.createNewInstantTime();
- writeBatch(writeClient, thirdCommit, firstCommit,
Option.of(Arrays.asList("000")), "000",
- numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert,
false, numRecords, numRecords,
- 1, true);
- // Rollback secondCommit which is an inflight.
- writeClient.rollback(secondCommit);
- assertEquals(1, metaClient.reloadActiveTimeline()
- .getRollbackTimeline().filterCompletedInstants().getInstants().size());
-
assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
-
- // Create inflight commit back into timeline for testing purposes.
- writeClient.startCommitWithTime(secondCommit);
-
assertTrue(metaClient.reloadActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
-
- // Insert completed commit
- String fourthCommit = HoodieActiveTimeline.createNewInstantTime();
- writeBatch(writeClient, fourthCommit, thirdCommit,
Option.of(Arrays.asList("000")), "000",
- numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert,
false, numRecords, numRecords,
- 1, true);
- assertEquals(1, metaClient.reloadActiveTimeline()
- .getRollbackTimeline().filterCompletedInstants().getInstants().size());
-
assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
- assertEquals(3,
metaClient.getActiveTimeline().getCommitsTimeline().countInstants());
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 6c68a4ad403..c4a150e7f8f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -18,11 +18,6 @@
package org.apache.hudi.testutils;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import org.apache.hudi.avro.model.HoodieClusteringStrategy;
-import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
-import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -32,16 +27,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -55,12 +45,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.function.Function;
-import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -564,37 +551,6 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
return result;
}
- public static Pair<HoodieRequestedReplaceMetadata,
HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(
- String instantTime, String partition, String replacedFileId, String
newFileId) {
- HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
-
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
- requestedReplaceMetadata.setVersion(1);
- HoodieSliceInfo sliceInfo =
HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
- List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
- clusteringGroups.add(HoodieClusteringGroup.newBuilder()
-
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
- .setSlices(Collections.singletonList(sliceInfo)).build());
- requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
-
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
- .setVersion(1).setExtraMetadata(Collections.emptyMap())
-
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
- .setInputGroups(clusteringGroups).build());
-
- HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
- replaceMetadata.addReplaceFileId(partition, replacedFileId);
- replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
- if (!StringUtils.isNullOrEmpty(newFileId)) {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setPartitionPath(partition);
- writeStat.setPath(partition + "/" + getBaseFilename(instantTime,
newFileId));
- writeStat.setFileId(newFileId);
- writeStat.setTotalWriteBytes(1);
- writeStat.setFileSizeInBytes(1);
- replaceMetadata.addWriteStat(partition, writeStat);
- }
- return Pair.of(requestedReplaceMetadata, replaceMetadata);
- }
-
/**
* Insert a batch of records without commit(so that the instant is
in-flight).
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 8e59981f09b..4773db0f7ef 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -1237,14 +1237,6 @@ public class HoodieTestTable {
return writeStats;
}
- public HoodieTestTable addRequestedAndInflightReplaceCommit(String
instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata,
HoodieReplaceCommitMetadata metadata) throws Exception {
- createRequestedReplaceCommit(basePath, instantTime,
Option.of(requestedReplaceMetadata));
- createInflightReplaceCommit(basePath, instantTime);
- currentInstantTime = instantTime;
- metaClient = HoodieTableMetaClient.reload(metaClient);
- return this;
- }
-
/**
* Exception for {@link HoodieTestTable}.
*/
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5a79295c331..6324fb83fc9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1340,9 +1340,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
- @ParameterizedTest
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
- public void testAsyncClusteringService(HoodieRecordType recordType) throws
Exception {
+ @Disabled("HUDI-6753")
+ public void testAsyncClusteringServiceSparkRecordType() throws Exception {
+ testAsyncClusteringService(HoodieRecordType.SPARK);
+ }
+
+ @Test
+ public void testAsyncClusteringServiceAvroRecordType() throws Exception {
+ testAsyncClusteringService(HoodieRecordType.AVRO);
+ }
+
+ private void testAsyncClusteringService(HoodieRecordType recordType) throws
Exception {
String tableBasePath = basePath + "/asyncClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 2000;