This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ee2f4671e5 [HUDI-9789] Call clean and restore methods for pluggable 
table format (#13842)
e7ee2f4671e5 is described below

commit e7ee2f4671e5263e2258b449feb657fa3ac815fc
Author: Vinish Reddy <[email protected]>
AuthorDate: Sun Sep 7 19:48:38 2025 +0530

    [HUDI-9789] Call clean and restore methods for pluggable table format 
(#13842)
---
 .../timeline/versioning/v2/TimelineArchiverV2.java |   5 +-
 .../table/action/clean/CleanActionExecutor.java    |   6 +-
 .../action/restore/BaseRestoreActionExecutor.java  |   6 +-
 ...tSavepointRestoreCopyOnWriteWithTestFormat.java | 214 +++++++++++++++++++++
 ...yOnWriteTableCleanAndArchiveWithTestFormat.java | 107 +++++++++++
 .../hudi/testutils/HoodieClientTestBase.java       |   3 +-
 .../org.apache.hudi.common.HoodieTableFormat       |  19 ++
 .../org/apache/hudi/common/HoodieTableFormat.java  |  22 ++-
 .../hudi/tableformat/TestActiveTimeline.java       |   7 +-
 .../apache/hudi/tableformat/TestTableFormat.java   |   9 +-
 10 files changed, 386 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 9a39a45ead6d..03a16b9cee87 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.timeline.versioning.v2;
 
 import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
 import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.NativeTableFormat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -214,7 +215,9 @@ public class TimelineArchiverV2<T extends 
HoodieAvroPayload, I, K, O> implements
 
     // 4. If metadata table is enabled, do not archive instants which are more 
recent than the last compaction on the
     // metadata table.
-    if (config.isMetadataTableEnabled() && 
table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
+    if (config.isMetadataTableEnabled()
+        && table.getMetaClient().getTableConfig().isMetadataTableAvailable()
+        && 
table.getMetaClient().getTableFormat().getName().equals(NativeTableFormat.TABLE_FORMAT))
 {
       try (HoodieTableMetadata tableMetadata = 
table.refreshAndGetTableMetadata()) {
         Option<String> latestCompactionTime = 
tableMetadata.getLatestCompactionTime();
         if (!latestCompactionTime.isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index bed24daa7e7a..f754b5781e9f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -222,7 +222,11 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
       );
       this.txnManager.beginStateChange(Option.of(inflightInstant), 
Option.empty());
       writeTableMetadata(metadata, inflightInstant.requestedTime());
-      table.getActiveTimeline().transitionCleanInflightToComplete(false, 
inflightInstant, Option.of(metadata));
+      table.getActiveTimeline().transitionCleanInflightToComplete(
+          false,
+          inflightInstant,
+          Option.of(metadata),
+          completedInstant -> 
table.getMetaClient().getTableFormat().clean(metadata, completedInstant, 
table.getContext(), table.getMetaClient(), table.getViewManager()));
       LOG.info("Marked clean started on {} as complete", 
inflightInstant.requestedTime());
       return metadata;
     } finally {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 36b4aae550eb..934b7233a0e8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -127,7 +127,11 @@ public abstract class BaseRestoreActionExecutor<T, I, K, 
O> extends BaseActionEx
         instantTime, durationInMs, instantsRolledBack, instantToMetadata);
     HoodieInstant restoreInflightInstant = 
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.RESTORE_ACTION, instantTime);
     writeToMetadata(restoreMetadata, restoreInflightInstant);
-    table.getActiveTimeline().saveAsComplete(restoreInflightInstant, 
Option.of(restoreMetadata));
+    table.getActiveTimeline().saveAsComplete(
+        true,
+        restoreInflightInstant,
+        Option.of(restoreMetadata),
+        restoreCompletedInstant -> 
table.getMetaClient().getTableFormat().restore(restoreCompletedInstant, 
table.getContext(), table.getMetaClient(), table.getViewManager()));
     // get all pending rollbacks instants after restore instant time and 
delete them.
     // if not, rollbacks will be considered not completed and might hinder 
metadata table compaction.
     List<HoodieInstant> instantsToRollback = 
table.getActiveTimeline().getRollbackTimeline()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
new file mode 100644
index 000000000000..42857d9f888c
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWriteWithTestFormat.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for COPY_ON_WRITE table savepoint restore.
+ */
+@Tag("functional")
+public class TestSavepointRestoreCopyOnWriteWithTestFormat extends 
HoodieClientTestBase {
+
+  /**
+   * Actions: C1, C2, savepoint C2, C3, C4 (inflight or complete), restore.
+   * Should go back to C2,
+   * C3 and C4 should be cleaned up.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testBasicRollback(boolean commitC4) throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+    initMetaClient(getTableType(), props);
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      List<String> commitTimestamps = new ArrayList<>();
+      for (int i = 1; i <= 3; i++) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        commitTimestamps.add(newCommitTime);
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty(), 
INSTANT_GENERATOR);
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      // Add C4 - either complete or inflight based on parameter
+      if (commitC4) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        commitTimestamps.add(newCommitTime);
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * 4, 1, Option.empty(), 
INSTANT_GENERATOR);
+        assertRowNumberEqualsTo(40);
+      } else {
+        // Leave C4 as inflight
+        String inflightCommit = insertBatchWithoutCommit(numRecords);
+        commitTimestamps.add(inflightCommit);
+        
Assertions.assertFalse(metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(inflightCommit));
+        assertRowNumberEqualsTo(30);
+      }
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
+      Assertions.assertEquals(1, 
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+      commitTimestamps.subList(2, 4).forEach(instant -> 
Assertions.assertFalse(metaClient.getActiveTimeline().containsInstant(instant)));
+      assertRowNumberEqualsTo(20);
+    }
+  }
+
+  @Test
+  void testRollbackBeyondLastMDTCompaction() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+    initMetaClient(getTableType(), props);
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(4)
+            .build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      final int iterations = 5;
+      for (int i = 1; i <= 5; i++) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty(), 
INSTANT_GENERATOR);
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(iterations * numRecords);
+      // restore will be forced to rebuild the metadata table
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
+      Assertions.assertEquals(1, 
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+      assertRowNumberEqualsTo(20);
+      // check if the metadata table is rebuilt
+      String metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(hoodieWriteConfig.getBasePath());
+      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+          .setBasePath(metadataTableBasePath)
+          .setConf(storageConf)
+          .setLoadActiveTimelineOnLoad(true)
+          .build();
+      assertEquals(1, metadataMetaClient.getCommitsTimeline().filter(instant 
-> !instant.requestedTime().startsWith(SOLO_COMMIT_TIMESTAMP)).countInstants());
+    }
+  }
+
+  /**
+   * The rollbacks(either inflight or complete) beyond the savepoint should be 
cleaned.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3), C5, restore.
+   * Should go back to C2.
+   * C3, C4(RB_C3), C5 should be cleaned up.
+   *
+   * <p>Actions: C1, C2, savepoint C2, C3, C4 (RB_C3) inflight, restore.
+   * Should go back to C2.
+   * C3, C4 (RB_C3) should be cleaned up.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCleaningRollbackInstants(boolean commitRollback) throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+    initMetaClient(getTableType(), props);
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 2; i++) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty(), 
INSTANT_GENERATOR);
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          // trigger savepoint
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(20);
+      // write another pending instant
+      String inflightCommit = insertBatchWithoutCommit(numRecords);
+      
Assertions.assertFalse(metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(inflightCommit));
+      // rollback the pending instant
+      if (commitRollback) {
+        client.rollbackFailedWrites(metaClient);
+      } else {
+        HoodieInstant pendingInstant = 
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
+            .lastInstant().orElseThrow(() -> new HoodieException("Pending 
instant does not exist"));
+        HoodieSparkTable.create(client.getConfig(), context)
+            .scheduleRollback(context, 
WriteClientTestUtils.createNewInstantTime(), pendingInstant, false, true, 
false);
+      }
+      Option<String> rollbackInstant = 
metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().map(HoodieInstant::requestedTime);
+      assertTrue(rollbackInstant.isPresent(), "The latest instant should be a 
rollback");
+      // write another batch
+      insertBatch(hoodieWriteConfig, client, 
WriteClientTestUtils.createNewInstantTime(),
+          rollbackInstant.get(), numRecords, SparkRDDWriteClient::insert,
+          false, true, numRecords, numRecords * 3, 1, Option.empty(), 
INSTANT_GENERATOR);
+      // restore
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
+      Assertions.assertEquals(1, 
metaClient.reloadActiveTimeline().getRestoreTimeline().filterCompletedInstants().countInstants());
+      assertRowNumberEqualsTo(20);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
new file mode 100644
index 000000000000..40bc7744ee10
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.functional;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
+import static 
org.apache.hudi.testutils.HoodieClientTestUtils.countRecordsOptionallySince;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+public class TestHoodieSparkCopyOnWriteTableCleanAndArchiveWithTestFormat 
extends SparkClientFunctionalTestHarness {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testDeletePartitionAndArchive(boolean metadataEnabled) throws 
IOException {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieTableConfig.TABLE_FORMAT.key(), "test-format");
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE, props);
+    HoodieWriteConfig writeConfig = getConfigBuilder(true)
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 
5).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
+         HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
+
+      // 1st write batch; 3 commits for 3 partitions
+      String instantTime1 = client.startCommit();
+      client.commit(instantTime1, 
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime1,
 10, DEFAULT_FIRST_PARTITION_PATH), 1), instantTime1));
+      String instantTime2 = client.startCommit();
+      client.commit(instantTime2, 
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime2,
 10, DEFAULT_SECOND_PARTITION_PATH), 1), instantTime2));
+      String instantTime3 = client.startCommit();
+      client.commit(instantTime3, 
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime3,
 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime3));
+
+      final HoodieTimeline timeline1 = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+      assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), 
sqlContext(), timeline1, Option.empty()));
+
+      // delete the 1st and the 2nd partition; 1 replace commit
+      final String instantTime4 = 
client.startCommit(HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      HoodieWriteResult result = 
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
+      client.commit(instantTime4, result.getWriteStatuses(), Option.empty(), 
REPLACE_COMMIT_ACTION, result.getPartitionToReplaceFileIds());
+
+      // 2nd write batch; 6 commits for the 4th partition; the 6th commit to 
trigger archiving the replace commit
+      for (int i = 5; i < 11; i++) {
+        String instantTime = client.startCommit();
+        client.commit(instantTime, 
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime,
 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime));
+      }
+
+      // verify archived timeline
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      final HoodieTimeline archivedTimeline = metaClient.getArchivedTimeline();
+      assertTrue(archivedTimeline.containsInstant(instantTime1));
+      assertTrue(archivedTimeline.containsInstant(instantTime2));
+      assertTrue(archivedTimeline.containsInstant(instantTime3));
+      assertTrue(archivedTimeline.containsInstant(instantTime4), "should 
contain the replace commit.");
+
+      // verify records
+      final HoodieTimeline timeline2 = 
metaClient.getCommitTimeline().filterCompletedInstants();
+      assertEquals(7, countRecordsOptionallySince(jsc(), basePath(), 
sqlContext(), timeline2, Option.empty()),
+          "should only have the 7 records from the 3rd partition.");
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index f08f1d580b3a..bc5c25867cae 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -639,7 +639,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
    * @param numRecords    The number of records to insert
    */
   @SuppressWarnings("rawtypes, unchecked")
-  protected void insertBatchWithoutCommit(int numRecords) {
+  protected String insertBatchWithoutCommit(int numRecords) {
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withRollbackUsingMarkers(true)
         .build();
@@ -652,6 +652,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
 
       List<WriteStatus> statuses = client.insert(writeRecords, 
newCommitTime).collect();
       assertNoWriteErrors(statuses);
+      return newCommitTime;
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
 
b/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
new file mode 100644
index 000000000000..eb7e6220e5e6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/resources/META-INF/services/org.apache.hudi.common.HoodieTableFormat
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hudi.tableformat.TestTableFormat
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
index 01b018af9b1b..b3365f995cd0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieTableFormat.java
@@ -100,15 +100,15 @@ public interface HoodieTableFormat extends Serializable {
   }
 
   /**
-   * Called before rolling back the instant  in hoodie timeline.
+   * Called before rolling back the instant in hoodie timeline.
    *
-   * @param completedInstant completed rollback instant in hoodie timeline
+   * @param instantToRollback instant to be rolled back in hoodie timeline.
    * @param engineContext engine context used for execution - local,spark or 
flink etc.
    * @param metaClient metaClient from HoodieTable.
    * @param viewManager viewManager from HoodieTable.
    */
   default void rollback(
-      HoodieInstant completedInstant,
+      HoodieInstant instantToRollback,
       HoodieEngineContext engineContext,
       HoodieTableMetaClient metaClient,
       FileSystemViewManager viewManager) {
@@ -144,6 +144,22 @@ public interface HoodieTableFormat extends Serializable {
       FileSystemViewManager viewManager) {
   }
 
+  /**
+   * Called after marking a "restore" action as complete in the hoodie 
timeline.
+   *
+   * @param restoreCompletedInstant The completed restore instant in hoodie 
timeline.
+   * @param engineContext           engine context used for execution - 
local,spark or flink etc.
+   * @param metaClient              metaClient from HoodieTable.
+   * @param viewManager             viewManager from HoodieTable.
+   */
+
+  default void restore(
+      HoodieInstant restoreCompletedInstant,
+      HoodieEngineContext engineContext,
+      HoodieTableMetaClient metaClient,
+      FileSystemViewManager viewManager) {
+  }
+
   /**
    * Return the timeline factory for table format.
    */
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java 
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
index bd3106a68e11..f8e4b8f0438d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestActiveTimeline.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
 import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.Collections;
 import java.util.List;
@@ -63,15 +64,15 @@ public class TestActiveTimeline extends ActiveTimelineV2 {
       HoodieTableMetaClient metaClient,
       Set<String> includedExtensions,
       boolean applyLayoutFilters) {
-    Map<String, HoodieInstant> instantsInTestTableFormat = 
TestTableFormat.getRecordedInstants(metaClient.getBasePath().toString())
+    Map<Pair<String, String>, HoodieInstant> instantsInTestTableFormat = 
TestTableFormat.getRecordedInstants(metaClient.getBasePath().toString())
         .stream()
-        .collect(Collectors.toMap(HoodieInstant::requestedTime, instant -> 
instant));
+        .collect(Collectors.toMap(instant -> Pair.of(instant.requestedTime(), 
instant.getAction()), instant -> instant));
     List<HoodieInstant> instantsFromHoodieTimeline =
         super.getInstantsFromFileSystem(metaClient, includedExtensions, 
applyLayoutFilters);
     List<HoodieInstant> inflightInstantsInTestTableFormat =
         instantsFromHoodieTimeline.stream()
             .filter(
-                hoodieInstant -> 
!instantsInTestTableFormat.containsKey(hoodieInstant.requestedTime()))
+                hoodieInstant -> 
!instantsInTestTableFormat.containsKey(Pair.of(hoodieInstant.requestedTime(), 
hoodieInstant.getAction())))
             .map(
                 instant -> {
                   if (instant.isCompleted()) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java 
b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
index 43d45af8ad14..9781978a51ef 100644
--- a/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/tableformat/TestTableFormat.java
@@ -81,9 +81,9 @@ public class TestTableFormat implements HoodieTableFormat {
   }
 
   @Override
-  public void rollback(HoodieInstant completedInstant, HoodieEngineContext 
engineContext, 
+  public void rollback(HoodieInstant instantToRollback, HoodieEngineContext 
engineContext,
                       HoodieTableMetaClient metaClient, FileSystemViewManager 
viewManager) {
-    // No-op.
+    
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).remove(instantToRollback);
   }
 
   @Override
@@ -99,6 +99,11 @@ public class TestTableFormat implements HoodieTableFormat {
     
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).add(savepointInstant);
   }
 
+  @Override
+  public void restore(HoodieInstant restoreCompletedInstant, 
HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, 
FileSystemViewManager viewManager) {
+    
RECORDED_INSTANTS.get(metaClient.getBasePath().toString()).add(restoreCompletedInstant);
+  }
+
   @Override
   public TimelineFactory getTimelineFactory() {
     return new TestTimelineFactory(null);

Reply via email to