This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ae3d886e991458fb145132357f0c0c490982491c Author: Jon Vexler <[email protected]> AuthorDate: Thu Sep 7 15:09:54 2023 -0400 [HUDI-6736] Fixing rollback completion and commit timeline files removal (#9521) The purpose of 8849 change is to fix the ordering of rollbacks such that, the completion of rollback instant happens first followed by commits file removal from the timeline. For eg, if t5.c.inflight is partially failed, and t6.rb.requested is triggered to rollback. towards the completion, t6.rb is moved to completed state. and later all t5 commit files are removed from the timeline. This could lead to dangling commit files (t5) if the process crashes just after moving the t6 rollback to completion. So, 8849 also introduced polling completed rollbacks and ensure we don't trigger another rollback for t5. But we missed that we already landed 5148 which was addressing a similar issue. As per 5148, we first need to delete the commit files from timeline (t5) and then transition the rollback to completion (t6.rb). So, even if there is a crash, if we re-attempt t6.rb.requested, it will get to completion w/o any issues (even if t5 is not in the timeline at all). Hence reverting some of the core changes added as part of 8849. But there are some tests added and so not reverting the entire patch. --------- Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <[email protected]> --- .../hudi/client/BaseHoodieTableServiceClient.java | 57 ---------------------- .../rollback/BaseRollbackActionExecutor.java | 25 +++++----- .../java/org/apache/hudi/table/TestCleaner.java | 38 +++++++++++++++ .../TestCopyOnWriteRollbackActionExecutor.java | 47 ------------------ .../hudi/testutils/HoodieClientTestBase.java | 44 ----------------- .../hudi/common/testutils/HoodieTestTable.java | 8 --- .../deltastreamer/TestHoodieDeltaStreamer.java | 14 ++++-- 7 files changed, 62 insertions(+), 171 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 0af2ace25f0..5af681d9a8a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -42,7 +42,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; @@ -61,7 +60,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; -import org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -913,7 +911,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl protected Boolean rollbackFailedWrites() { HoodieTable table = createTable(config, hadoopConf); List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); - removeInflightFilesAlreadyRolledBack(instantsToRollback, table.getMetaClient()); Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); rollbackFailedWrites(pendingRollbacks); @@ -978,60 +975,6 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl } } - /** - * This method filters out the instants that are already rolled back, but their pending commit files are left - * because of job failures. In addition to filtering out these instants, it will also cleanup the inflight instants - * from the timeline. - */ - protected void removeInflightFilesAlreadyRolledBack(List<String> instantsToRollback, HoodieTableMetaClient metaClient) { - if (instantsToRollback.isEmpty()) { - return; - } - // Find the oldest inflight timestamp. - String lowestInflightCommitTime = Collections.min(instantsToRollback); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - // RollbackInstantMap should only be created for instants that are > oldest inflight file to be removed. - Map<String, String> failedInstantToRollbackCommitMap = activeTimeline.getRollbackTimeline().filterCompletedInstants() - .findInstantsAfter(lowestInflightCommitTime) - .getInstantsAsStream() - .map(rollbackInstant -> { - try { - return Pair.of(TimelineMetadataUtils.deserializeHoodieRollbackMetadata( - activeTimeline.getInstantDetails(rollbackInstant).get()).getInstantsRollback().get(0).getCommitTime(), - rollbackInstant.getTimestamp()); - } catch (IOException e) { - LOG.error("Error reading rollback metadata for instant {}", rollbackInstant, e); - return Pair.of("", rollbackInstant.getTimestamp()); - } - }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (v1, v2) -> v1)); - // List of inflight instants that are already completed. - List<String> rollbackCompletedInstants = - instantsToRollback.stream() - .filter(failedInstantToRollbackCommitMap::containsKey) - .collect(Collectors.toList()); - LOG.info("Rollback completed instants {}", rollbackCompletedInstants); - try { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - rollbackCompletedInstants.forEach(instant -> { - // remove pending commit files. - HoodieInstant hoodieInstant = activeTimeline - .filter(instantTime -> - HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.EQUALS, instant)) - .firstInstant().get(); - BaseRollbackActionExecutor.deleteInflightAndRequestedInstant( - true, activeTimeline, metaClient, hoodieInstant); - }); - instantsToRollback.removeAll(rollbackCompletedInstants); - } catch (Exception e) { - LOG.error("Error in deleting the inflight instants that are already rolled back {}", - rollbackCompletedInstants, e); - throw new HoodieRollbackException("Error in deleting the inflight instants that are already rolled back"); - } finally { - this.txnManager.endTransaction(Option.empty()); - } - } - private List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient, Stream<HoodieInstant> inflightInstantsStream) { // Get expired instants, must store them into list before double-checking diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 43e3e814bda..662bfe36299 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.rollback; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; @@ -26,7 +25,6 @@ import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -43,6 +41,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,17 +253,18 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> extends BaseActionE // Then transition the inflight rollback to completed state. if (!skipTimelinePublish) { writeTableMetadata(rollbackMetadata); + } + + // Then we delete the inflight instant in the data table timeline if enabled + deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant); + + // If publish the rollback to the timeline, we finally transition the inflight rollback + // to complete in the data table timeline + if (!skipTimelinePublish) { table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant, TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); } - - // Commit to rollback instant files are deleted after the rollback commit is transitioned from inflight to completed - // If job were to fail after transitioning rollback from inflight to complete and before delete the instant files, - // then subsequent retries of the rollback for this instant will see if there is a completed rollback present for this instant - // and then directly delete the files and abort. - deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), table.getMetaClient(), resolvedInstant); - } catch (IOException e) { throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); } finally { @@ -280,13 +280,14 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> extends BaseActionE * @param activeTimeline Hoodie active timeline * @param instantToBeDeleted Instant to be deleted */ - public static void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline, - HoodieTableMetaClient metaClient, HoodieInstant instantToBeDeleted) { + protected void deleteInflightAndRequestedInstant(boolean deleteInstant, + HoodieActiveTimeline activeTimeline, + HoodieInstant instantToBeDeleted) { // Remove the rolled back inflight commits if (deleteInstant) { LOG.info("Deleting instant=" + instantToBeDeleted); activeTimeline.deletePending(instantToBeDeleted); - if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) { + if (instantToBeDeleted.isInflight() && !table.getMetaClient().getTimelineLayoutVersion().isNullVersion()) { // Delete corresponding requested instant instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED, instantToBeDeleted.getAction(), instantToBeDeleted.getTimestamp()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index c2aceae0b52..cb540cd4624 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -22,9 +22,13 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -40,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -60,6 +65,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; @@ -95,6 +101,7 @@ import java.util.stream.Stream; import scala.Tuple3; +import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NO_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; @@ -690,6 +697,37 @@ public class TestCleaner extends HoodieCleanerTestBase { assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); } + private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata( + String instantTime, String partition, String replacedFileId, String newFileId) { + HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); + requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString()); + requestedReplaceMetadata.setVersion(1); + HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build(); + List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>(); + clusteringGroups.add(HoodieClusteringGroup.newBuilder() + .setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap()) + .setSlices(Collections.singletonList(sliceInfo)).build()); + requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap()); + requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder() + .setVersion(1).setExtraMetadata(Collections.emptyMap()) + .setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build()) + .setInputGroups(clusteringGroups).build()); + + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + replaceMetadata.addReplaceFileId(partition, replacedFileId); + replaceMetadata.setOperationType(WriteOperationType.CLUSTER); + if (!StringUtils.isNullOrEmpty(newFileId)) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partition); + writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); + writeStat.setFileId(newFileId); + writeStat.setTotalWriteBytes(1); + writeStat.setFileSizeInBytes(1); + replaceMetadata.addWriteStat(partition, writeStat); + } + return Pair.of(requestedReplaceMetadata, replaceMetadata); + } + @Test public void testCleanMetadataUpgradeDowngrade() { String instantTime = "000"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 37266950c04..07dc831578c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -468,51 +468,4 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT context, table.getConfig(), table, rollbackInstant, needRollBackInstant, true, false, true); copyOnWriteRollbackActionExecutorForClustering.execute(); } - - /** - * This method tests rollback of completed ingestion commits and replacecommit inflight files - * when there is another replacecommit with greater timestamp already present in the timeline. - */ - @Test - public void testDeletingInflightsWhichAreAlreadyRolledBack() throws Exception { - - // insert data - HoodieWriteConfig writeConfig = getConfigBuilder().withAutoCommit(false).build(); - SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); - - // Create a base commit. - int numRecords = 200; - String firstCommit = HoodieActiveTimeline.createNewInstantTime(); - String partitionStr = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; - dataGen = new HoodieTestDataGenerator(new String[]{partitionStr}); - writeBatch(writeClient, firstCommit, "000", Option.of(Arrays.asList("000")), "000", - numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, true, numRecords, numRecords, - 1, true); - // Create inflight commit. - String secondCommit = writeClient.startCommit(); - // Insert completed commit - String thirdCommit = HoodieActiveTimeline.createNewInstantTime(); - writeBatch(writeClient, thirdCommit, firstCommit, Option.of(Arrays.asList("000")), "000", - numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, false, numRecords, numRecords, - 1, true); - // Rollback secondCommit which is an inflight. - writeClient.rollback(secondCommit); - assertEquals(1, metaClient.reloadActiveTimeline() - .getRollbackTimeline().filterCompletedInstants().getInstants().size()); - assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent()); - - // Create inflight commit back into timeline for testing purposes. - writeClient.startCommitWithTime(secondCommit); - assertTrue(metaClient.reloadActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent()); - - // Insert completed commit - String fourthCommit = HoodieActiveTimeline.createNewInstantTime(); - writeBatch(writeClient, fourthCommit, thirdCommit, Option.of(Arrays.asList("000")), "000", - numRecords, dataGen::generateInserts, SparkRDDWriteClient::insert, false, numRecords, numRecords, - 1, true); - assertEquals(1, metaClient.reloadActiveTimeline() - .getRollbackTimeline().filterCompletedInstants().getInstants().size()); - assertFalse(metaClient.getActiveTimeline().filterInflightsAndRequested().firstInstant().isPresent()); - assertEquals(3, metaClient.getActiveTimeline().getCommitsTimeline().countInstants()); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 6c68a4ad403..c4a150e7f8f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -18,11 +18,6 @@ package org.apache.hudi.testutils; -import org.apache.hudi.avro.model.HoodieClusteringGroup; -import org.apache.hudi.avro.model.HoodieClusteringPlan; -import org.apache.hudi.avro.model.HoodieClusteringStrategy; -import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -32,16 +27,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; @@ -55,12 +45,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.function.Function; -import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -564,37 +551,6 @@ public class HoodieClientTestBase extends HoodieSparkClientTestHarness { return result; } - public static Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata( - String instantTime, String partition, String replacedFileId, String newFileId) { - HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); - requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString()); - requestedReplaceMetadata.setVersion(1); - HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build(); - List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>(); - clusteringGroups.add(HoodieClusteringGroup.newBuilder() - .setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap()) - .setSlices(Collections.singletonList(sliceInfo)).build()); - requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap()); - requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder() - .setVersion(1).setExtraMetadata(Collections.emptyMap()) - .setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build()) - .setInputGroups(clusteringGroups).build()); - - HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); - replaceMetadata.addReplaceFileId(partition, replacedFileId); - replaceMetadata.setOperationType(WriteOperationType.CLUSTER); - if (!StringUtils.isNullOrEmpty(newFileId)) { - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPartitionPath(partition); - writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); - writeStat.setFileId(newFileId); - writeStat.setTotalWriteBytes(1); - writeStat.setFileSizeInBytes(1); - replaceMetadata.addWriteStat(partition, writeStat); - } - return Pair.of(requestedReplaceMetadata, replaceMetadata); - } - /** * Insert a batch of records without commit(so that the instant is in-flight). * diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index b1dfa366dd8..e3e1760eab9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -1219,14 +1219,6 @@ public class HoodieTestTable { return writeStats; } - public HoodieTestTable addRequestedAndInflightReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception { - createRequestedReplaceCommit(basePath, instantTime, Option.of(requestedReplaceMetadata)); - createInflightReplaceCommit(basePath, instantTime); - currentInstantTime = instantTime; - metaClient = HoodieTableMetaClient.reload(metaClient); - return this; - } - /** * Exception for {@link HoodieTestTable}. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 5a79295c331..6324fb83fc9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1340,9 +1340,17 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } } - @ParameterizedTest - @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) - public void testAsyncClusteringService(HoodieRecordType recordType) throws Exception { + @Disabled("HUDI-6753") + public void testAsyncClusteringServiceSparkRecordType() throws Exception { + testAsyncClusteringService(HoodieRecordType.SPARK); + } + + @Test + public void testAsyncClusteringServiceAvroRecordType() throws Exception { + testAsyncClusteringService(HoodieRecordType.AVRO); + } + + private void testAsyncClusteringService(HoodieRecordType recordType) throws Exception { String tableBasePath = basePath + "/asyncClustering"; // Keep it higher than batch-size to test continuous mode int totalRecords = 2000;
