This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ae3d886e991458fb145132357f0c0c490982491c
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Sep 7 15:09:54 2023 -0400

    [HUDI-6736] Fixing rollback completion and commit timeline files removal 
(#9521)
    
    The purpose of 8849 change is to fix the ordering of rollbacks such that, 
the completion of rollback instant happens first followed by commits file 
removal from the timeline.
    For eg,
    if t5.c.inflight is partially failed, and t6.rb.requested is triggered to 
rollback.
    towards the completion, t6.rb is moved to completed state. and later all t5 
commit files are removed from the timeline.
    This could lead to dangling commit files (t5) if the process crashes just 
after moving the t6 rollback to completion. So, 8849 also introduced polling 
completed rollbacks and ensure we don't trigger another rollback for t5.
    
    But we missed that we already landed 5148 which was addressing a similar 
issue.
    As per 5148, we first need to delete the commit files from timeline (t5) 
and then transition the rollback to completion (t6.rb). So, even if there is a 
crash, if we re-attempt t6.rb.requested, it will get to completion w/o any 
issues (even if t5 is not in the timeline at all).
    Hence reverting some of the core changes added as part of 8849. But there 
are some tests added and so not reverting the entire patch.
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: sivabalan <[email protected]>
---
 .../hudi/client/BaseHoodieTableServiceClient.java  | 57 ----------------------
 .../rollback/BaseRollbackActionExecutor.java       | 25 +++++-----
 .../java/org/apache/hudi/table/TestCleaner.java    | 38 +++++++++++++++
 .../TestCopyOnWriteRollbackActionExecutor.java     | 47 ------------------
 .../hudi/testutils/HoodieClientTestBase.java       | 44 -----------------
 .../hudi/common/testutils/HoodieTestTable.java     |  8 ---
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 14 ++++--
 7 files changed, 62 insertions(+), 171 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 0af2ace25f0..5af681d9a8a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -61,7 +60,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.CompactHelpers;
-import org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
@@ -913,7 +911,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
   protected Boolean rollbackFailedWrites() {
     HoodieTable table = createTable(config, hadoopConf);
     List<String> instantsToRollback = 
getInstantsToRollback(table.getMetaClient(), 
config.getFailedWritesCleanPolicy(), Option.empty());
-    removeInflightFilesAlreadyRolledBack(instantsToRollback, 
table.getMetaClient());
     Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(table.getMetaClient());
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, 
Option.empty()));
     rollbackFailedWrites(pendingRollbacks);
@@ -978,60 +975,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     }
   }
 
-  /**
-   * This method filters out the instants that are already rolled back, but 
their pending commit files are left
-   * because of job failures. In addition to filtering out these instants, it 
will also cleanup the inflight instants
-   * from the timeline.
-   */
-  protected void removeInflightFilesAlreadyRolledBack(List<String> 
instantsToRollback, HoodieTableMetaClient metaClient) {
-    if (instantsToRollback.isEmpty()) {
-      return;
-    }
-    // Find the oldest inflight timestamp.
-    String lowestInflightCommitTime = Collections.min(instantsToRollback);
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
-    // RollbackInstantMap should only be created for instants that are > 
oldest inflight file to be removed.
-    Map<String, String> failedInstantToRollbackCommitMap = 
activeTimeline.getRollbackTimeline().filterCompletedInstants()
-        .findInstantsAfter(lowestInflightCommitTime)
-        .getInstantsAsStream()
-        .map(rollbackInstant -> {
-          try {
-            return 
Pair.of(TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-                    
activeTimeline.getInstantDetails(rollbackInstant).get()).getInstantsRollback().get(0).getCommitTime(),
-                rollbackInstant.getTimestamp());
-          } catch (IOException e) {
-            LOG.error("Error reading rollback metadata for instant {}", 
rollbackInstant, e);
-            return Pair.of("", rollbackInstant.getTimestamp());
-          }
-        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (v1, v2) -> 
v1));
-    // List of inflight instants that are already completed.
-    List<String> rollbackCompletedInstants =
-        instantsToRollback.stream()
-            .filter(failedInstantToRollbackCommitMap::containsKey)
-            .collect(Collectors.toList());
-    LOG.info("Rollback completed instants {}", rollbackCompletedInstants);
-    try {
-      this.txnManager.beginTransaction(Option.empty(), Option.empty());
-      rollbackCompletedInstants.forEach(instant -> {
-        // remove pending commit files.
-        HoodieInstant hoodieInstant = activeTimeline
-            .filter(instantTime ->
-                HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), 
HoodieTimeline.EQUALS, instant))
-            .firstInstant().get();
-        BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(
-            true, activeTimeline, metaClient, hoodieInstant);
-      });
-      instantsToRollback.removeAll(rollbackCompletedInstants);
-    } catch (Exception e) {
-      LOG.error("Error in deleting the inflight instants that are already 
rolled back {}",
-          rollbackCompletedInstants, e);
-      throw new HoodieRollbackException("Error in deleting the inflight 
instants that are already rolled back");
-    } finally {
-      this.txnManager.endTransaction(Option.empty());
-    }
-  }
-
   private List<String> 
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
                                                                
Stream<HoodieInstant> inflightInstantsStream) {
     // Get expired instants, must store them into list before double-checking
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 43e3e814bda..662bfe36299 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -26,7 +25,6 @@ import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -43,6 +41,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -254,17 +253,18 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
       // Then transition the inflight rollback to completed state.
       if (!skipTimelinePublish) {
         writeTableMetadata(rollbackMetadata);
+      }
+
+      // Then we delete the inflight instant in the data table timeline if 
enabled
+      deleteInflightAndRequestedInstant(deleteInstants, 
table.getActiveTimeline(), resolvedInstant);
+
+      // If publish the rollback to the timeline, we finally transition the 
inflight rollback
+      // to complete in the data table timeline
+      if (!skipTimelinePublish) {
         
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
             TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
         LOG.info("Rollback of Commits " + 
rollbackMetadata.getCommitsRollback() + " is complete");
       }
-
-      // Commit to rollback instant files are deleted after the rollback 
commit is transitioned from inflight to completed
-      // If job were to fail after transitioning rollback from inflight to 
complete and before delete the instant files,
-      // then subsequent retries of the rollback for this instant will see if 
there is a completed rollback present for this instant
-      // and then directly delete the files and abort.
-      deleteInflightAndRequestedInstant(deleteInstants, 
table.getActiveTimeline(), table.getMetaClient(), resolvedInstant);
-
     } catch (IOException e) {
       throw new HoodieIOException("Error executing rollback at instant " + 
instantTime, e);
     } finally {
@@ -280,13 +280,14 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
    * @param activeTimeline Hoodie active timeline
    * @param instantToBeDeleted Instant to be deleted
    */
-  public static void deleteInflightAndRequestedInstant(boolean deleteInstant, 
HoodieActiveTimeline activeTimeline,
-                                                       HoodieTableMetaClient 
metaClient, HoodieInstant instantToBeDeleted) {
+  protected void deleteInflightAndRequestedInstant(boolean deleteInstant,
+                                                   HoodieActiveTimeline 
activeTimeline,
+                                                   HoodieInstant 
instantToBeDeleted) {
     // Remove the rolled back inflight commits
     if (deleteInstant) {
       LOG.info("Deleting instant=" + instantToBeDeleted);
       activeTimeline.deletePending(instantToBeDeleted);
-      if (instantToBeDeleted.isInflight() && 
!metaClient.getTimelineLayoutVersion().isNullVersion()) {
+      if (instantToBeDeleted.isInflight() && 
!table.getMetaClient().getTimelineLayoutVersion().isNullVersion()) {
         // Delete corresponding requested instant
         instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED, 
instantToBeDeleted.getAction(),
             instantToBeDeleted.getTimestamp());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index c2aceae0b52..cb540cd4624 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -22,9 +22,13 @@ import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.HoodieTimelineArchiver;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.SparkRDDReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -40,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -60,6 +65,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -95,6 +101,7 @@ import java.util.stream.Stream;
 
 import scala.Tuple3;
 
+import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NO_PARTITION_PATH;
 import static 
org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
@@ -690,6 +697,37 @@ public class TestCleaner extends HoodieCleanerTestBase {
     assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
   }
 
+  private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
generateReplaceCommitMetadata(
+      String instantTime, String partition, String replacedFileId, String 
newFileId) {
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
+    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
+    requestedReplaceMetadata.setVersion(1);
+    HoodieSliceInfo sliceInfo = 
HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
+    List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
+    clusteringGroups.add(HoodieClusteringGroup.newBuilder()
+        
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
+        .setSlices(Collections.singletonList(sliceInfo)).build());
+    requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
+    
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
+        .setVersion(1).setExtraMetadata(Collections.emptyMap())
+        
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
+        .setInputGroups(clusteringGroups).build());
+
+    HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
+    replaceMetadata.addReplaceFileId(partition, replacedFileId);
+    replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
+    if (!StringUtils.isNullOrEmpty(newFileId)) {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(partition);
+      writeStat.setPath(partition + "/" + getBaseFilename(instantTime, 
newFileId));
+      writeStat.setFileId(newFileId);
+      writeStat.setTotalWriteBytes(1);
+      writeStat.setFileSizeInBytes(1);
+      replaceMetadata.addWriteStat(partition, writeStat);
+    }
+    return Pair.of(requestedReplaceMetadata, replaceMetadata);
+  }
+
   @Test
   public void testCleanMetadataUpgradeDowngrade() {
     String instantTime = "000";
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 37266950c04..07dc831578c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -468,51 +468,4 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
         context, table.getConfig(), table, rollbackInstant, 
needRollBackInstant, true, false, true);
     copyOnWriteRollbackActionExecutorForClustering.execute();
   }
-
-  /**
-   * This method tests rollback of completed ingestion commits and 
replacecommit inflight files
-   * when there is another replacecommit with greater timestamp already 
present in the timeline.
-   */
-  @Test
-  public void testDeletingInflightsWhichAreAlreadyRolledBack() throws 
Exception {
-
-    // insert data
-    HoodieWriteConfig writeConfig = 
getConfigBuilder().withAutoCommit(false).build();
-    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
-
-    // Create a base commit.
-    int numRecords = 200;
-    String firstCommit = HoodieActiveTimeline.createNewInstantTime();
-    String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
-    dataGen = new HoodieTestDataGenerator(new String[]{partitionStr});
-    writeBatch(writeClient, firstCommit, "000", 
Option.of(Arrays.asList("000")), "000",
-        numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, 
true, numRecords, numRecords,
-        1, true);
-    // Create inflight commit.
-    String secondCommit = writeClient.startCommit();
-    // Insert completed commit
-    String thirdCommit = HoodieActiveTimeline.createNewInstantTime();
-    writeBatch(writeClient, thirdCommit, firstCommit, 
Option.of(Arrays.asList("000")), "000",
-        numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, 
false, numRecords, numRecords,
-        1, true);
-    // Rollback secondCommit which is an inflight.
-    writeClient.rollback(secondCommit);
-    assertEquals(1, metaClient.reloadActiveTimeline()
-        .getRollbackTimeline().filterCompletedInstants().getInstants().size());
-    
assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
-
-    // Create inflight commit back into timeline for testing purposes.
-    writeClient.startCommitWithTime(secondCommit);
-    
assertTrue(metaClient.reloadActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
-
-    // Insert completed commit
-    String fourthCommit = HoodieActiveTimeline.createNewInstantTime();
-    writeBatch(writeClient, fourthCommit, thirdCommit, 
Option.of(Arrays.asList("000")), "000",
-        numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, 
false, numRecords, numRecords,
-        1, true);
-    assertEquals(1, metaClient.reloadActiveTimeline()
-        .getRollbackTimeline().filterCompletedInstants().getInstants().size());
-    
assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent());
-    assertEquals(3, 
metaClient.getActiveTimeline().getCommitsTimeline().countInstants());
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 6c68a4ad403..c4a150e7f8f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import org.apache.hudi.avro.model.HoodieClusteringStrategy;
-import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
-import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -32,16 +27,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -55,12 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
 
-import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -564,37 +551,6 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
     return result;
   }
 
-  public static Pair<HoodieRequestedReplaceMetadata, 
HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(
-      String instantTime, String partition, String replacedFileId, String 
newFileId) {
-    HoodieRequestedReplaceMetadata requestedReplaceMetadata = new 
HoodieRequestedReplaceMetadata();
-    
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
-    requestedReplaceMetadata.setVersion(1);
-    HoodieSliceInfo sliceInfo = 
HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
-    List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
-    clusteringGroups.add(HoodieClusteringGroup.newBuilder()
-        
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
-        .setSlices(Collections.singletonList(sliceInfo)).build());
-    requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
-    
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
-        .setVersion(1).setExtraMetadata(Collections.emptyMap())
-        
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
-        .setInputGroups(clusteringGroups).build());
-
-    HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
-    replaceMetadata.addReplaceFileId(partition, replacedFileId);
-    replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
-    if (!StringUtils.isNullOrEmpty(newFileId)) {
-      HoodieWriteStat writeStat = new HoodieWriteStat();
-      writeStat.setPartitionPath(partition);
-      writeStat.setPath(partition + "/" + getBaseFilename(instantTime, 
newFileId));
-      writeStat.setFileId(newFileId);
-      writeStat.setTotalWriteBytes(1);
-      writeStat.setFileSizeInBytes(1);
-      replaceMetadata.addWriteStat(partition, writeStat);
-    }
-    return Pair.of(requestedReplaceMetadata, replaceMetadata);
-  }
-
   /**
    * Insert a batch of records without commit(so that the instant is 
in-flight).
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index b1dfa366dd8..e3e1760eab9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -1219,14 +1219,6 @@ public class HoodieTestTable {
     return writeStats;
   }
 
-  public HoodieTestTable addRequestedAndInflightReplaceCommit(String 
instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, 
HoodieReplaceCommitMetadata metadata) throws Exception {
-    createRequestedReplaceCommit(basePath, instantTime, 
Option.of(requestedReplaceMetadata));
-    createInflightReplaceCommit(basePath, instantTime);
-    currentInstantTime = instantTime;
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    return this;
-  }
-
   /**
    * Exception for {@link HoodieTestTable}.
    */
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5a79295c331..6324fb83fc9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1340,9 +1340,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
-  public void testAsyncClusteringService(HoodieRecordType recordType) throws 
Exception {
+  @Disabled("HUDI-6753")
+  public void testAsyncClusteringServiceSparkRecordType() throws Exception {
+    testAsyncClusteringService(HoodieRecordType.SPARK);
+  }
+
+  @Test
+  public void testAsyncClusteringServiceAvroRecordType() throws Exception {
+    testAsyncClusteringService(HoodieRecordType.AVRO);
+  }
+
+  private void testAsyncClusteringService(HoodieRecordType recordType) throws 
Exception {
     String tableBasePath = basePath + "/asyncClustering";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 2000;

Reply via email to