This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e7ee2f4671e5 [HUDI-9789] Call clean and restore methods for pluggable
table format (#13842)
e7ee2f4671e5 is described below
commit e7ee2f4671e5263e2258b449feb657fa3ac815fc
Author: Vinish Reddy <[email protected]>
AuthorDate: Sun Sep 7 19:48:38 2025 +0530
[HUDI-9789] Call clean and restore methods for pluggable table format
(#13842)
---
.../timeline/versioning/v2/TimelineArchiverV2.java | 5 +-
.../table/action/clean/CleanActionExecutor.java | 6 +-
.../action/restore/BaseRestoreActionExecutor.java | 6 +-
...tSavepointRestoreCopyOnWriteWithTestFormat.java | 214 +++++++++++++++++++++
...yOnWriteTableCleanAndArchiveWithTestFormat.java | 107 +++++++++++
.../hudi/testutils/HoodieClientTestBase.java | 3 +-
.../org.apache.hudi.common.HoodieTableFormat | 19 ++
.../org/apache/hudi/common/HoodieTableFormat.java | 22 ++-
.../hudi/tableformat/TestActiveTimeline.java | 7 +-
.../apache/hudi/tableformat/TestTableFormat.java | 9 +-
10 files changed, 386 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 9a39a45ead6d..03a16b9cee87 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.timeline.versioning.v2;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.NativeTableFormat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
@@ -214,7 +215,9 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
// 4. If metadata table is enabled, do not archive instants which are more
recent than the last compaction on the
// metadata table.
- if (config.isMetadataTableEnabled() &&
table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
+ if (config.isMetadataTableEnabled()
+ && table.getMetaClient().getTableConfig().isMetadataTableAvailable()
+ &&
table.getMetaClient().getTableFormat().getName().equals(NativeTableFormat.TABLE_FORMAT))
{
try (HoodieTableMetadata tableMetadata =
table.refreshAndGetTableMetadata()) {
Option<String> latestCompactionTime =
tableMetadata.getLatestCompactionTime();
if (!latestCompactionTime.isPresent()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index bed24daa7e7a..f754b5781e9f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -222,7 +222,11 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
);
this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
writeTableMetadata(metadata, inflightInstant.requestedTime());
- table.getActiveTimeline().transitionCleanInflightToComplete(false,
inflightInstant, Option.of(metadata));
+ table.getActiveTimeline().transitionCleanInflightToComplete(
+ false,
+ inflightInstant,
+ Option.of(metadata),
+ completedInstant ->
table.getMetaClient().getTableFormat().clean(metadata, completedInstant,
table.getContext(), table.getMetaClient(), table.getViewManager()));
LOG.info("Marked clean started on {} as complete",
inflightInstant.requestedTime());
return metadata;
} finally {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 36b4aae550eb..934b7233a0e8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -127,7 +127,11 @@ public abstract class BaseRestoreActionExecutor<T, I, K,
O> extends BaseActionEx
instantTime, durationInMs, instantsRolledBack, instantToMetadata);
HoodieInstant restoreInflightInstant =
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.RESTORE_ACTION, instantTime);
writeToMetadata(restoreMetadata, restoreInflightInstant);
- table.getActiveTimeline().saveAsComplete(restoreInflightInstant,
Option.of(restoreMetadata));
+ table.getActiveTimeline().saveAsComplete(
+ true,
+ restoreInflightInstant,
+ Option.of(restoreMetadata),
+ restoreCompletedInstant ->
table.getMetaClient().getTableFormat().restore(restoreCompletedInstant,
table.getContext(), table.getMetaClient(), table.getViewManager()));
// get all pending rollbacks instants after restore instant time and
delete them.
// if not, rollbacks will be considered not completed and might hinder
metadata table compaction.
List<HoodieInstant> instantsToRollback =
table.getActiveTimeline().getRollbackTimeline()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
new file mode 100644
index 000000000000..42857d9f888c
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for COPY_ON_WRITE table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreCopyOnWriteWithTestFormat extends
HoodieClientTestBase {
+
+ /**
+ * Actions: C1, C2, savepoint C2, C3, C4 (inflight or complete), restore.
+ * Should go back to C2,
+ * C3 and C4 should be cleaned up.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testBasicRollback(boolean commitC4) throws Exception {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+ initMetaClient(getTableType(), props);
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ List<String> commitTimestamps = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ commitTimestamps.add(newCommitTime);
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty(),
INSTANT_GENERATOR);
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ // Add C4 - either complete or inflight based on parameter
+ if (commitC4) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ commitTimestamps.add(newCommitTime);
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * 4, 1, Option.empty(),
INSTANT_GENERATOR);
+ assertRowNumberEqualsTo(40);
+ } else {
+ // Leave C4 as inflight
+ String inflightCommit = insertBatchWithoutCommit(numRecords);
+ commitTimestamps.add(inflightCommit);
+
Assertions.assertFalse(metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(inflightCommit));
+ assertRowNumberEqualsTo(30);
+ }
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
+ Assertions.assertEquals(1,
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+ commitTimestamps.subList(2, 4).forEach(instant ->
Assertions.assertFalse(metaClient.getActiveTimeline().containsInstant(instant)));
+ assertRowNumberEqualsTo(20);
+ }
+ }
+
+ @Test
+ void testRollbackBeyondLastMDTCompaction() throws Exception {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+ initMetaClient(getTableType(), props);
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ final int iterations = 5;
+ for (int i = 1; i <= 5; i++) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty(),
INSTANT_GENERATOR);
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(iterations * numRecords);
+ // restore will be forced to rebuild the metadata table
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
+ Assertions.assertEquals(1,
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+ assertRowNumberEqualsTo(20);
+ // check if the metadata table is rebuilt
+ String metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(hoodieWriteConfig.getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
+ .setBasePath(metadataTableBasePath)
+ .setConf(storageConf)
+ .setLoadActiveTimelineOnLoad(true)
+ .build();
+ assertEquals(1, metadataMetaClient.getCommitsTimeline().filter(instant
-> !instant.requestedTime().startsWith(SOLO_COMMIT_TIMESTAMP)).countInstants());
+ }
+ }
+
+ /**
+ * The rollbacks(either inflight or complete) beyond the savepoint should be
cleaned.
+ *
+ * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore.
+ * Should go back to C2.
+ * C3, C4(RB_C3), C5 should be cleaned up.
+ *
+ * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore.
+ * Should go back to C2.
+ * C3, C4 (RB_C3) should be cleaned up.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCleaningRollbackInstants(boolean commitRollback) throws Exception {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+ initMetaClient(getTableType(), props);
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ for (int i = 1; i <= 2; i++) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty(),
INSTANT_GENERATOR);
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ // trigger savepoint
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(20);
+ // write another pending instant
+ String inflightCommit = insertBatchWithoutCommit(numRecords);
+
Assertions.assertFalse(metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(inflightCommit));
+ // rollback the pending instant
+ if (commitRollback) {
+ client.rollbackFailedWrites(metaClient);
+ } else {
+ HoodieInstant pendingInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
+ .lastInstant().orElseThrow(() -> new HoodieException("Pending
instant does not exist"));
+ HoodieSparkTable.create(client.getConfig(), context)
+ .scheduleRollback(context,
WriteClientTestUtils.createNewInstantTime(), pendingInstant, false, true,
false);
+ }
+ Option<String> rollbackInstant =
metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::requestedTime);
+ assertTrue(rollbackInstant.isPresent(), "The latest instant should be a
rollback");
+ // write another batch
+ insertBatch(hoodieWriteConfig, client,
WriteClientTestUtils.createNewInstantTime(),
+ rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * 3, 1, Option.empty(),
INSTANT_GENERATOR);
+ // restore
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
+ Assertions.assertEquals(1,
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+ assertRowNumberEqualsTo(20);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
new file mode 100644
index 000000000000..40bc7744ee10
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+import static
org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+public class TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat
extends SparkClientFunctionalTestHarness {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws
IOException {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE, props);
+ HoodieWriteConfig writeConfig = getConfigBuilder(true)
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4,
5).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
+ HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
+
+ // 1st write batch; 3 commits for 3 partitions
+ String instantTime1 = client.startCommit();
+ client.commit(instantTime1,
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1,
10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1));
+ String instantTime2 = client.startCommit();
+ client.commit(instantTime2,
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2,
10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2));
+ String instantTime3 = client.startCommit();
+ client.commit(instantTime3,
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3,
1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3));
+
+ final HoodieTimeline timeline1 =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ assertEquals(21, countRecordsOptionallySince(jsc(), basePath(),
sqlContext(), timeline1, Option.empty()));
+
+ // delete the 1st and the 2nd partition; 1 replace commit
+ final String instantTime4 =
client.startCommit(HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ HoodieWriteResult result =
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
+ client.commit(instantTime4, result.getWriteStatuses(), Option.empty(),
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
+
+ // 2nd write batch; 6 commits for the 4th partition; the 6th commit to
trigger archiving the replace commit
+ for (int i = 5; i < 11; i++) {
+ String instantTime = client.startCommit();
+ client.commit(instantTime,
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime,
1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime));
+ }
+
+ // verify archived timeline
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
+ assertTrue(archivedTimeline.containsInstant(instantTime1));
+ assertTrue(archivedTimeline.containsInstant(instantTime2));
+ assertTrue(archivedTimeline.containsInstant(instantTime3));
+ assertTrue(archivedTimeline.containsInstant(instantTime4), "should
contain the replace commit.");
+
+ // verify records
+ final HoodieTimeline timeline2 =
metaClient.getCommitTimeline().filterCompletedInstants();
+ assertEquals(7, countRecordsOptionallySince(jsc(), basePath(),
sqlContext(), timeline2, Option.empty()),
+ "should only have the 7 records from the 3rd partition.");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index f08f1d580b3a..bc5c25867cae 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -639,7 +639,7 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
* @param numRecords The number of records to insert
*/
@SuppressWarnings("rawtypes, unchecked")
- protected void insertBatchWithoutCommit(int numRecords) {
+ protected String insertBatchWithoutCommit(int numRecords) {
HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withRollbackUsingMarkers(true)
.build();
@@ -652,6 +652,7 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
List<WriteStatus> statuses = client.insert(writeRecords,
newCommitTime).collect();
assertNoWriteErrors(statuses);
+ return newCommitTime;
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
b/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
new file mode 100644
index 000000000000..eb7e6220e5e6
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hudi.tableformat.TestTableFormat
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
index 01b018af9b1b..b3365f995cd0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
@@ -100,15 +100,15 @@ public interface HoodieTableFormat extends Serializable {
}
/**
- * Called before rolling back the instant in hoodie timeline.
+ * Called before rolling back the instant in hoodie timeline.
*
- * @param completedInstant completed rollback instant in hoodie timeline
+ * @param instantToRollback instant to be rolled back in hoodie timeline.
* @param engineContext engine context used for execution - local,spark or
flink etc.
* @param metaClient metaClient from HoodieTable.
* @param viewManager viewManager from HoodieTable.
*/
default void rollback(
- HoodieInstant completedInstant,
+ HoodieInstant instantToRollback,
HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
FileSystemViewManager viewManager) {
@@ -144,6 +144,22 @@ public interface HoodieTableFormat extends Serializable {
FileSystemViewManager viewManager) {
}
+ /**
+ * Called after marking a "restore" action as complete in the hoodie
timeline.
+ *
+ * @param restoreCompletedInstant The completed restore instant in hoodie
timeline.
+ * @param engineContext engine context used for execution -
local,spark or flink etc.
+ * @param metaClient metaClient from HoodieTable.
+ * @param viewManager viewManager from HoodieTable.
+ */
+
+ default void restore(
+ HoodieInstant restoreCompletedInstant,
+ HoodieEngineContext engineContext,
+ HoodieTableMetaClient metaClient,
+ FileSystemViewManager viewManager) {
+ }
+
/**
* Return the timeline factory for table format.
*/
diff --git
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
index bd3106a68e11..f8e4b8f0438d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.collection.Pair;
import java.util.Collections;
import java.util.List;
@@ -63,15 +64,15 @@ public class TestActiveTimeline extends ActiveTimelineV2 {
HoodieTableMetaClient metaClient,
Set<String> includedExtensions,
boolean applyLayoutFilters) {
- Map<String, HoodieInstant> instantsInTestTableFormat =
TestTableFormat.getRecordedInstants(metaClient.getBasePath().toString())
+ Map<Pair<String, String>, HoodieInstant> instantsInTestTableFormat =
TestTableFormat.getRecordedInstants(metaClient.getBasePath().toString())
.stream()
- .collect(Collectors.toMap(HoodieInstant::requestedTime, instant ->
instant));
+ .collect(Collectors.toMap(instant -> Pair.of(instant.requestedTime(),
instant.getAction()), instant -> instant));
List<HoodieInstant> instantsFromHoodieTimeline =
super.getInstantsFromFileSystem(metaClient, includedExtensions,
applyLayoutFilters);
List<HoodieInstant> inflightInstantsInTestTableFormat =
instantsFromHoodieTimeline.stream()
.filter(
- hoodieInstant ->
!instantsInTestTableFormat.containsKey(hoodieInstant.requestedTime()))
+ hoodieInstant ->
!instantsInTestTableFormat.containsKey(Pair.of(hoodieInstant.requestedTime(),
hoodieInstant.getAction())))
.map(
instant -> {
if (instant.isCompleted()) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
index 43d45af8ad14..9781978a51ef 100644
--- a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
@@ -81,9 +81,9 @@ public class TestTableFormat implements HoodieTableFormat {
}
@Override
- public void rollback(HoodieInstant completedInstant, HoodieEngineContext
engineContext,
+ public void rollback(HoodieInstant instantToRollback, HoodieEngineContext
engineContext,
HoodieTableMetaClient metaClient, FileSystemViewManager
viewManager) {
- // No-op.
+
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).remove(instantToRollback);
}
@Override
@@ -99,6 +99,11 @@ public class TestTableFormat implements HoodieTableFormat {
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).add(savepointInstant);
}
+ @Override
+ public void restore(HoodieInstant restoreCompletedInstant,
HoodieEngineContext engineContext, HoodieTableMetaClient metaClient,
FileSystemViewManager viewManager) {
+
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).add(restoreCompletedInstant);
+ }
+
@Override
public TimelineFactory getTimelineFactory() {
return new TestTimelineFactory(null);