This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 55793b19710 [HUDI-8344] Delete certain indexes for restore operation 
(#12098)
55793b19710 is described below

commit 55793b19710b458929fc50d9d407492ff802c9df
Author: Lin Liu <[email protected]>
AuthorDate: Tue Oct 15 07:50:35 2024 -0700

    [HUDI-8344] Delete certain indexes for restore operation (#12098)
    
    Anything other than FILES and RECORD_INDEX are deleted.
    `MetadataPartitionType.shouldDeletePartitionOnRestore` holds
    the logic for which partitions to delete. Added a test for column
    stats, partition stats, sec index with files and record index.
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 44 ++++++------
 .../hudi/table/action/BaseActionExecutor.java      | 15 ++++
 .../hudi/client/TestJavaHoodieBackedMetadata.java  | 10 ++-
 .../functional/TestHoodieBackedMetadata.java       | 15 ++--
 .../hudi/metadata/MetadataPartitionType.java       |  7 ++
 .../hudi/functional/TestPartitionStatsIndex.scala  | 52 +++++++++++++-
 .../functional/TestSecondaryIndexPruning.scala     | 84 +++++++++++++++++++++-
 7 files changed, 195 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cc101ed6ade..f340d86d102 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -112,7 +112,10 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchem
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromBaseFiles;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices;
+import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS;
 import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
 import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
 import static 
org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
@@ -389,40 +392,47 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
       String partitionTypeName = partitionType.name();
       LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, commitTimeForPartition);
-
+      String partitionName;
       Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
       try {
         switch (partitionType) {
           case FILES:
             fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionInfoList);
+            partitionName = FILES.getPartitionPath();
             break;
           case BLOOM_FILTERS:
             fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+            partitionName = BLOOM_FILTERS.getPartitionPath();
             break;
           case COLUMN_STATS:
             fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+            partitionName = COLUMN_STATS.getPartitionPath();
             break;
           case RECORD_INDEX:
             fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+            partitionName = RECORD_INDEX.getPartitionPath();
             break;
           case FUNCTIONAL_INDEX:
-            Set<String> functionalIndexPartitionsToInit = 
getFunctionalIndexPartitionsToInit();
+            Set<String> functionalIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType);
             if (functionalIndexPartitionsToInit.isEmpty()) {
               continue;
             }
             ValidationUtils.checkState(functionalIndexPartitionsToInit.size() 
== 1, "Only one functional index at a time is supported for now");
-            fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
+            partitionName = functionalIndexPartitionsToInit.iterator().next();
+            fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition(partitionName);
             break;
           case PARTITION_STATS:
             fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(partitionInfoList);
+            partitionName = PARTITION_STATS.getPartitionPath();
             break;
           case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit();
+            Set<String> secondaryIndexPartitionsToInit = 
getIndexPartitionsToInit(partitionType);
             if (secondaryIndexPartitionsToInit.size() != 1) {
               LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
               continue;
             }
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(secondaryIndexPartitionsToInit.iterator().next());
+            partitionName = secondaryIndexPartitionsToInit.iterator().next();
+            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(partitionName);
             break;
           default:
             throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
@@ -445,15 +455,14 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // Generate the file groups
       final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
       ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionTypeName + " should be > 0");
-      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount, partitionName);
 
       // Perform the commit using bulkCommit
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
-      String partitionPath = partitionType.getPartitionPath(dataMetaClient, 
dataWriteConfig.getIndexingConfig().getIndexName());
-      bulkCommit(commitTimeForPartition, partitionPath, records, 
fileGroupCount);
+      bulkCommit(commitTimeForPartition, partitionName, records, 
fileGroupCount);
       metadataMetaClient.reloadActiveTimeline();
 
-      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionPath, true);
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionName, true);
       // initialize the metadata reader again so the MDT partition can be read 
after initialization
       initMetadataReader();
       long totalInitTime = partitionInitTimer.endTimer();
@@ -537,16 +546,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     return Pair.of(fileGroupCount, 
getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, storageConf));
   }
 
-  private Set<String> getFunctionalIndexPartitionsToInit() {
-    if (dataMetaClient.getIndexMetadata().isEmpty()) {
-      return Collections.emptySet();
-    }
-    Set<String> functionalIndexPartitions = 
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().keySet();
-    Set<String> completedMetadataPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
-    functionalIndexPartitions.removeAll(completedMetadataPartitions);
-    return functionalIndexPartitions;
-  }
-
   private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) 
{
     Option<HoodieIndexMetadata> functionalIndexMetadata = 
dataMetaClient.getIndexMetadata();
     if (functionalIndexMetadata.isPresent()) {
@@ -556,10 +555,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  private Set<String> getSecondaryIndexPartitionsToInit() {
+  private Set<String> getIndexPartitionsToInit(MetadataPartitionType 
partitionType) {
     Set<String> secondaryIndexPartitions = 
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
         .map(HoodieIndexDefinition::getIndexName)
-        .filter(indexName -> 
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+        .filter(indexName -> 
indexName.startsWith(partitionType.getPartitionPath()))
         .collect(Collectors.toSet());
     Set<String> completedMetadataPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
     secondaryIndexPartitions.removeAll(completedMetadataPartitions);
@@ -850,8 +849,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * record-index-bucket-0000, .... -> ..., record-index-bucket-0009
    */
   private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType metadataPartition, String instantTime,
-                                    int fileGroupCount) throws IOException {
-    String partitionName = metadataPartition.getPartitionPath(dataMetaClient, 
dataWriteConfig.getIndexingConfig().getIndexName());
+                                    int fileGroupCount, String partitionName) 
throws IOException {
     // Remove all existing file groups or leftover files in the partition
     final StoragePath partitionPath = new 
StoragePath(metadataWriteConfig.getBasePath(), partitionName);
     HoodieStorage storage = metadataMetaClient.getStorage();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 6de40c63c8c..06f6b67f99f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -28,7 +28,9 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 
@@ -122,6 +124,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(instantTime);
     if (metadataWriterOpt.isPresent()) {
       try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) 
{
+        dropIndexOnRestore();
         metadataWriter.update(metadata, instantTime);
       } catch (Exception e) {
         if (e instanceof HoodieException) {
@@ -132,4 +135,16 @@ public abstract class BaseActionExecutor<T, I, K, O, R> 
implements Serializable
       }
     }
   }
+
+  /**
+   * Drop metadata partition, for restore operation for certain metadata 
partitions.
+   */
+  protected final void dropIndexOnRestore() {
+    for (String partitionPath : 
table.getMetaClient().getTableConfig().getMetadataPartitions()) {
+      if (MetadataPartitionType.shouldDeletePartitionOnRestore(partitionPath)) 
{
+        // setting backup to true as this delete is part of restore operation
+        
HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(), 
context, partitionPath, true);
+      }
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index fce57a9d066..256035250e8 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -2803,7 +2803,15 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as 
that function filters all directory
       // in the .hoodie folder.
       List<String> metadataTablePartitions = 
FSUtils.getAllPartitionPaths(engineContext, storage, 
getMetadataTableBasePath(basePath), false);
-      assertEquals(metadataWriter.getEnabledPartitionTypes().size(), 
metadataTablePartitions.size());
+      // check if the last instant is restore, then the metadata table should 
have only the partitions that are not deleted
+      
metaClient.reloadActiveTimeline().getReverseOrderedInstants().findFirst().ifPresent(instant
 -> {
+        if (instant.getAction().equals(HoodieActiveTimeline.RESTORE_ACTION)) {
+          
metadataWriter.getEnabledPartitionTypes().stream().filter(partitionType -> 
!MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
+              .forEach(partitionType -> 
assertTrue(metadataTablePartitions.contains(partitionType.getPartitionPath())));
+        } else {
+          assertEquals(metadataWriter.getEnabledPartitionTypes().size(), 
metadataTablePartitions.size());
+        }
+      });
 
       final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = 
new HashMap<>();
       metadataWriter.getEnabledPartitionTypes().forEach(e -> 
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 50b5f9a26bf..5d7e6d5aae2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2048,7 +2048,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<StoragePathInfo> commit3Files = metaFiles.stream()
         .filter(pathInfo ->
             pathInfo.getPath().getName().contains(commit3)
-                && 
pathInfo.getPath().getName().contains(HoodieTimeline.DELTA_COMMIT_ACTION))
+                && 
pathInfo.getPath().getName().endsWith(HoodieTimeline.DELTA_COMMIT_ACTION))
         .collect(Collectors.toList());
     List<StoragePathInfo> rollbackFiles = metaFiles.stream()
         .filter(pathInfo ->
@@ -2057,8 +2057,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     // ensure commit3's delta commit in MDT has last mod time > the actual 
rollback for previous failed commit i.e. commit2.
     // if rollback wasn't eager, rollback's last mod time will be lower than 
the commit3'd delta commit last mod time.
-    assertTrue(
-        commit3Files.get(0).getModificationTime() > 
rollbackFiles.get(0).getModificationTime());
+    assertTrue(commit3Files.get(0).getModificationTime() >= 
rollbackFiles.get(0).getModificationTime());
     client.close();
   }
 
@@ -3678,7 +3677,15 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as 
that function filters all directory
       // in the .hoodie folder.
       List<String> metadataTablePartitions = 
FSUtils.getAllPartitionPaths(engineContext, storage, 
getMetadataTableBasePath(basePath), false);
-      assertEquals(metadataWriter.getEnabledPartitionTypes().size(), 
metadataTablePartitions.size());
+      // check if the last instant is restore, then the metadata table should 
have only the partitions that are not deleted
+      
metaClient.reloadActiveTimeline().getReverseOrderedInstants().findFirst().ifPresent(instant
 -> {
+        if (instant.getAction().equals(HoodieActiveTimeline.RESTORE_ACTION)) {
+          
metadataWriter.getEnabledPartitionTypes().stream().filter(partitionType -> 
!MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
+              .forEach(partitionType -> 
assertTrue(metadataTablePartitions.contains(partitionType.getPartitionPath())));
+        } else {
+          assertEquals(metadataWriter.getEnabledPartitionTypes().size(), 
metadataTablePartitions.size());
+        }
+      });
 
       final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes = 
new HashMap<>();
       metadataWriter.getEnabledPartitionTypes().forEach(e -> 
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index c539971162d..7d61a2b7fd9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -374,6 +374,13 @@ public enum MetadataPartitionType {
     return newer;
   }
 
+  /**
+   * Check if the partition path should be deleted on restore.
+   */
+  public static boolean shouldDeletePartitionOnRestore(String partitionPath) {
+    return fromPartitionPath(partitionPath) != FILES && 
fromPartitionPath(partitionPath) != RECORD_INDEX;
+  }
+
   /**
    * Get the metadata partition type for the given record type.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 501613b5f2e..25f3258d8a5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -20,6 +20,8 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -32,11 +34,11 @@ import 
org.apache.hudi.exception.HoodieWriteConflictException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieMetadataFileSystemView, MetadataPartitionType}
 import org.apache.hudi.util.{JFunction, JavaConversions}
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex, PartitionStatsIndexSupport}
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.types.StringType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
@@ -322,6 +324,52 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
     verifyQueryPredicate(hudiOpts)
   }
 
+  /**
+   * 1. Enable column_stats, partition_stats and record_index (files already 
enabled by default).
+   * 2. Do an insert and validate the partition stats index initialization.
+   * 3. Do an update and validate the partition stats index.
+   * 4. Do a savepoint and restore, and validate partition_stats and 
column_stats are deleted.
+   * 5. Do an update and validate the partition stats index.
+   */
+  @Test
+  def testPartitionStatsWithRestore(): Unit = {
+    val hudiOpts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.MERGE_ON_READ.name(),
+      HoodieMetadataConfig.ENABLE.key() -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() -> 
"true",
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true")
+
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    val firstCompletedInstant = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+    doWriteAndValidateDataAndPartitionStats(hudiOpts, operation = 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append)
+    // validate files and record_index are present
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+    // Do a savepoint
+    val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
+    writeClient.savepoint(firstCompletedInstant.get().getTimestamp, 
"testUser", "savepoint to first commit")
+    val savepointTimestamp = 
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    assertEquals(firstCompletedInstant.get().getTimestamp, savepointTimestamp)
+    // Restore to savepoint
+    writeClient.restoreToSavepoint(savepointTimestamp)
+    // verify restore completed
+    
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
+    // verify partition stats and column stats are deleted
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+    // do another upsert and validate the partition stats
+    doWriteAndValidateDataAndPartitionStats(hudiOpts, operation = 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, 
false)
+    val latestDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
+    val partitionStatsIndex = new PartitionStatsIndexSupport(spark, 
latestDf.schema, 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(),
 metaClient)
+    val partitionStats = 
partitionStatsIndex.loadColumnStatsIndexRecords(targetColumnsToIndex, 
shouldReadInMemory = true).collectAsList()
+    assertTrue(partitionStats.size() > 0)
+  }
+
   /**
    * Test case to do updates and then validate partition stats with MDT 
compaction.
    * Any one table type is enough to test this as we are validating the 
metadata table.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index d686f850816..395da41dff1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -20,6 +20,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, 
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
@@ -31,7 +32,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieMetadataIndexException, 
HoodieWriteConflictException}
 import 
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, 
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, 
MetadataPartitionType, SparkHoodieBackedTableMetadataWriter}
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
@@ -41,7 +42,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{DataFrame, Row}
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.Arguments.arguments
@@ -698,6 +699,85 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  /**
+   * 1. Enable secondary index and record_index (files already enabled by 
default).
+   * 2. Do an insert and validate the secondary index initialization.
+   * 3. Do an update and validate the secondary index.
+   * 4. Do a savepoint and restore, and validate secondary index deleted.
+   * 5. Do an update and validate the secondary index is recreated.
+   */
+  @Test
+  def testSecondaryIndexWithSavepointAndRestore(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      val tableName = "test_secondary_index_with_savepoint_and_restore"
+      val hudiOpts = commonOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      val sqlTableType = "mor"
+
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  ts bigint,
+           |  record_key_col string,
+           |  not_record_key_col string,
+           |  partition_key_col string
+           |) using hudi
+           | options (
+           |  primaryKey ='record_key_col',
+           |  type = '$sqlTableType',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true',
+           |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+           |  hoodie.enable.data.skipping = 'true'
+           | )
+           | partitioned by(partition_key_col)
+           | location '$basePath'
+       """.stripMargin)
+      // by setting small file limit to 0, each insert will create a new file
+      // need to generate more file for non-partitioned table to test data 
skipping
+      // as the partitioned table will have only one file per partition
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+
+      // create secondary index
+      spark.sql(s"create index idx_not_record_key_col on $tableName using 
secondary_index(not_record_key_col)")
+      // validate index created successfully
+      metaClient = HoodieTableMetaClient.builder()
+        .setBasePath(basePath)
+        .setConf(HoodieTestUtils.getDefaultStorageConf)
+        .build()
+      val secondaryIndexPartition = "secondary_index_idx_not_record_key_col"
+      
assert(metaClient.getTableConfig.getMetadataPartitions.contains(secondaryIndexPartition))
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from 
hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1")
+      )
+      // Do a savepoint
+      val firstCompletedInstant = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+      val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
+      writeClient.savepoint(firstCompletedInstant.get().getTimestamp, 
"testUser", "savepoint to first commit")
+      val savepointTimestamp = 
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+      assertEquals(firstCompletedInstant.get().getTimestamp, 
savepointTimestamp)
+      // Restore to savepoint
+      writeClient.restoreToSavepoint(savepointTimestamp)
+      // verify restore completed
+      
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
+      // verify secondary index partition is deleted
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+      // however index definition should still be present
+      assertTrue(metaClient.getIndexMetadata.isPresent && 
metaClient.getIndexMetadata.get.getIndexDefinitions.get(secondaryIndexPartition).getIndexType.equals("secondary_index"))
+      // update the secondary key column
+      spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
+      // validate the secondary index records themselves
+      checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, 
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
+        Seq("abc", "row1", true),
+        Seq("xyz", "row1", false)
+      )
+    }
+  }
+
   private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: 
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
   }

Reply via email to