This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d9a0ce8e71b [HUDI-8385] Keep ColumnStats consistent during Restore 
(#13664)
1d9a0ce8e71b is described below

commit 1d9a0ce8e71b2232e233178b80fd71a505ef25ab
Author: Lin Liu <[email protected]>
AuthorDate: Tue Sep 30 17:33:30 2025 -0700

    [HUDI-8385] Keep ColumnStats consistent during Restore (#13664)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  53 ++++++-
 .../TestHoodieBackedTableMetadataWriter.java       | 158 +++++++++++++++++++++
 .../hudi/metadata/MetadataPartitionType.java       |   5 +-
 .../hudi/functional/TestPartitionStatsIndex.scala  | 117 +++++++++++----
 4 files changed, 302 insertions(+), 31 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5eb170c97881..b2ee4d4fc984 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -593,8 +593,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
     LOG.info("Indexing {} columns for column stats index", 
columnsToIndex.size());
 
     // during initialization, we need stats for base and log files.
-    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-        engineContext, Collections.emptyMap(), partitionIdToAllFilesMap, 
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(engineContext, 
Collections.emptyMap(), partitionIdToAllFilesMap,
+        dataMetaClient, dataWriteConfig.getMetadataConfig(),
         dataWriteConfig.getColumnStatsIndexParallelism(),
         dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
         columnsToIndex);
@@ -1611,14 +1611,59 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
       // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
       // or a new timestamp which we use for MDT clean, compaction etc.
       String syncCommitTime = createRestoreInstantTime();
-      processAndCommit(syncCommitTime, () -> 
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
-          partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
+      processAndCommit(syncCommitTime, () -> {
+        // For Files partition.
+        Map<String, HoodieData<HoodieRecord>> partitionRecords = new 
HashMap<>();
+        
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+            partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete, 
syncCommitTime));
+        // For ColumnStats partition if enabled.
+        if 
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
 {
+          partitionRecords.putAll(convertToColumnStatsRecord(
+              partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient,
+              dataWriteConfig.getMetadataConfig(), 
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+              
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+        }
+        return partitionRecords;
+      });
       closeInternal();
     } catch (IOException e) {
       throw new HoodieMetadataException("IOException during MDT restore sync", 
e);
     }
   }
 
+  static Map<String, HoodieData<HoodieRecord>> 
convertToColumnStatsRecord(Map<String, Map<String, Long>> partitionFilesToAdd,
+                                                                          
Map<String, List<String>> partitionFilesToDelete,
+                                                                          
HoodieEngineContext engineContext,
+                                                                          
HoodieTableMetaClient dataMetaClient,
+                                                                          
HoodieMetadataConfig metadataConfig,
+                                                                          
Option<HoodieRecord.HoodieRecordType> recordTypeOpt,
+                                                                          int 
columnStatsIndexParallelism) {
+    if (partitionFilesToDelete.isEmpty() && partitionFilesToAdd.isEmpty()) {
+      return Collections.emptyMap();
+    }
+    Lazy<Option<Schema>> tableSchema =
+        Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
+    final List<String> columnsToIndex = new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(
+        dataMetaClient.getTableConfig(),
+        metadataConfig,
+        tableSchema,
+        false,
+        recordTypeOpt,
+        
HoodieTableMetadataUtil.existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS,
 dataMetaClient)).keySet());
+    if (columnsToIndex.isEmpty()) {
+      LOG.info("Since there are no columns to index, stop to generate 
ColumnStats records.");
+      return Collections.emptyMap();
+    }
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+        engineContext, partitionFilesToDelete,
+        partitionFilesToAdd, dataMetaClient,
+        metadataConfig,
+        columnStatsIndexParallelism,
+        metadataConfig.getMaxReaderBufferSize(),
+        columnsToIndex);
+    return Collections.singletonMap(COLUMN_STATS.getPartitionPath(), records);
+  }
+
   String createRestoreInstantTime() {
     return writeClient.createNewInstantTime(false);
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index 7b91d197e6da..e583dead5ddc 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -19,26 +19,40 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.MockedStatic;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -47,6 +61,21 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class TestHoodieBackedTableMetadataWriter {
+  private HoodieEngineContext engineContext;
+  private HoodieTableMetaClient dataMetaClient;
+  private HoodieMetadataConfig metadataConfig;
+  private StorageConfiguration<?> storageConf;
+
+  @BeforeEach
+  void setUp() {
+    engineContext = mock(HoodieEngineContext.class);
+    dataMetaClient = mock(HoodieTableMetaClient.class);
+    metadataConfig = mock(HoodieMetadataConfig.class);
+    storageConf = mock(StorageConfiguration.class);
+
+    when(metadataConfig.getMaxReaderBufferSize()).thenReturn(1024);
+  }
+
   @ParameterizedTest
   @CsvSource(value = {
       "true,true,false,true",
@@ -117,6 +146,135 @@ class TestHoodieBackedTableMetadataWriter {
     assertSame(mockMetaClient, 
HoodieBackedTableMetadataWriter.rollbackFailedWrites(lazyWriteConfig, 
mockWriteClient, mockMetaClient));
   }
 
+  @Test
+  void testConvertToColumnStatsRecordWithEmptyInputs() {
+    Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+    Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+    Map<String, HoodieData<HoodieRecord>> result =
+        HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+            partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient, metadataConfig, Option.empty(), 4);
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void testConvertToColumnStatsRecordWithEmptyColumnsToIndex() {
+    // Mock HoodieTableMetadataUtil.getColumnsToIndex to return empty list
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      Map<String, Object> emptyColumnsMap = new HashMap<>();
+      mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+              any(), any(), any(), eq(false), any()))
+          .thenReturn(emptyColumnsMap);
+
+      Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+      Map<String, Long> filesToAdd = new HashMap<>();
+      filesToAdd.put("file1.parquet", 1024L);
+      partitionFilesToAdd.put("partition1", filesToAdd);
+      Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+      Map<String, HoodieData<HoodieRecord>> result = 
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+          partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient, metadataConfig,
+          Option.empty(), 4);
+      assertTrue(result.isEmpty());
+    }
+  }
+
+  @Test
+  void testConvertToColumnStatsRecordWithValidColumns() {
+    // Mock HoodieTableMetadataUtil.getColumnsToIndex to return valid columns
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      Map<String, Object> columnsMap = new HashMap<>();
+      columnsMap.put("col1", null);
+      columnsMap.put("col2", null);
+      mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+              any(), any(), any(), eq(false), any(), any()))
+          .thenReturn(columnsMap);
+
+      // Mock convertFilesToColumnStatsRecords to return empty HoodieData
+      HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+              any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
+          .thenReturn(mockHoodieData);
+
+      Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+      partitionFilesToAdd.put("partition1", new HashMap<>());
+      Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+      Map<String, HoodieData<HoodieRecord>> result = 
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+          partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient, metadataConfig,
+          Option.empty(), 4);
+
+      // Verify result contains COLUMN_STATS partition
+      assertEquals(1, result.size());
+      
assertTrue(result.containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+      assertEquals(mockHoodieData, 
result.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+
+      // Verify the method was called with correct parameters
+      mockedUtil.verify(() -> 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+          eq(engineContext),
+          eq(partitionFilesToDelete),
+          eq(partitionFilesToAdd),
+          eq(dataMetaClient),
+          eq(metadataConfig),
+          eq(4),
+          eq(1024),
+          any()
+      ));
+    }
+  }
+
+  @Test
+  void testConvertToColumnStatsRecordWithMixedInputs() {
+    // Mock HoodieTableMetadataUtil.getColumnsToIndex to return valid columns
+    try (MockedStatic<HoodieTableMetadataUtil> mockedUtil = 
mockStatic(HoodieTableMetadataUtil.class)) {
+      Map<String, Object> columnsMap = new HashMap<>();
+      columnsMap.put("col1", null);
+      columnsMap.put("col2", null);
+      columnsMap.put("col3", null);
+      mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+              any(), any(), any(), eq(false), any(), any()))
+          .thenReturn(columnsMap);
+
+      // Mock convertFilesToColumnStatsRecords to return empty HoodieData
+      HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
+      mockedUtil.when(() -> 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+              any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
+          .thenReturn(mockHoodieData);
+
+      Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+      Map<String, Long> filesToAdd = new HashMap<>();
+      filesToAdd.put("file1.parquet", 1024L);
+      filesToAdd.put("file2.parquet", 2048L);
+      partitionFilesToAdd.put("partition1", filesToAdd);
+
+      Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+      List<String> filesToDelete = new ArrayList<>();
+      filesToDelete.add("old_file1.parquet");
+      filesToDelete.add("old_file2.parquet");
+      partitionFilesToDelete.put("partition1", filesToDelete);
+
+      Map<String, HoodieData<HoodieRecord>> result = 
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+          partitionFilesToAdd, partitionFilesToDelete, engineContext, 
dataMetaClient, metadataConfig,
+          Option.empty(), 4);
+
+      // Verify result contains COLUMN_STATS partition.
+      assertEquals(1, result.size());
+      
assertTrue(result.containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+
+      // Verify the method was called with correct parameters.
+      mockedUtil.verify(() -> 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+          eq(engineContext),
+          eq(partitionFilesToDelete),
+          eq(partitionFilesToAdd),
+          eq(dataMetaClient),
+          eq(metadataConfig),
+          eq(4),
+          eq(1024),
+          any()
+      ));
+    }
+  }
+
   @Test
   void testValidateRollbackForMDT() throws Exception {
     List<HoodieInstant> instants = new ArrayList<>();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 6aa9857b075e..4e52cb378172 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -422,7 +422,10 @@ public enum MetadataPartitionType {
    * Check if the partition path should be deleted on restore.
    */
   public static boolean shouldDeletePartitionOnRestore(String partitionPath) {
-    return fromPartitionPath(partitionPath) != FILES && 
fromPartitionPath(partitionPath) != RECORD_INDEX;
+    MetadataPartitionType partitionType = fromPartitionPath(partitionPath);
+    return partitionType != FILES
+        && partitionType != RECORD_INDEX
+        && partitionType != COLUMN_STATS;
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 86afa16f6f77..e58fd6c103bd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,8 +19,9 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex, PartitionStatsIndexSupport}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieFileIndex, PartitionStatsIndexSupport}
 import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.avro.model.HoodieCleanMetadata
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
@@ -29,6 +30,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, 
WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieInstant
+import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadataLegacy
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, 
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
@@ -395,50 +397,113 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
   }
 
   /**
-   * 1. Enable column_stats, partition_stats and record_index (files already 
enabled by default).
-   * 2. Do an insert and validate the partition stats index initialization.
-   * 3. Do an update and validate the partition stats index.
-   * 4. Do a savepoint and restore, and validate partition_stats and 
column_stats are deleted.
-   * 5. Do an update and validate the partition stats index.
+   * 1. Enable column_stats, partition_stats and record_index (files/RLI 
already enabled by default).
+   * 2. Do two inserts and validate index initialization.
+   * 3. Do a savepoint on the second commits.
+   * 4. Add three more commits to trigger clean, which cleans the files from 
the first commit.
+   * 5. Restore, and validate that partition_stats is deleted, but 
column_stats partition exists.
+   * 6. Validate that column_stats does not contain records with file names 
from first commit.
    */
   @Test
   def testPartitionStatsWithRestore(): Unit = {
     val hudiOpts = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.MERGE_ON_READ.name(),
-      HoodieMetadataConfig.ENABLE.key() -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
-      HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() -> 
"true",
-      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true")
+      HoodieMetadataConfig.ENABLE.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true",
+      HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
 
+    // First ingest.
     doWriteAndValidateDataAndPartitionStats(
       hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite)
     val firstCompletedInstant = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
-    doWriteAndValidateDataAndPartitionStats(hudiOpts, operation = 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append)
-    // validate files and record_index are present
-    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
-    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
-    // Do a savepoint
+    // Second ingest.
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val secondCompletedInstant = metaClient.getActiveTimeline
+      .getCommitsTimeline.filterCompletedInstants().getInstants().get(1)
+    // Validate index partitions are present.
+    val initialMetadataPartitions = 
metaClient.getTableConfig.getMetadataPartitions
+    
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+    
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+    
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+    
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+    // Do a savepoint on the second commit.
     val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
-    writeClient.savepoint(firstCompletedInstant.get().requestedTime, 
"testUser", "savepoint to first commit")
+    writeClient.savepoint(secondCompletedInstant.requestedTime, "testUser", 
"savepoint to second commit")
     writeClient.close()
-    val savepointTimestamp = 
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().requestedTime
-    assertEquals(firstCompletedInstant.get().requestedTime, savepointTimestamp)
+    val savepointTimestamp = metaClient.reloadActiveTimeline()
+      
.getSavePointTimeline.filterCompletedInstants().lastInstant().get().requestedTime
+    assertEquals(secondCompletedInstant.requestedTime, savepointTimestamp)
+
+    // Add more ingests and trigger a clean to remove files from first 
ingestion.
+    val writeOpt = hudiOpts ++ Map(
+      HoodieCleanConfig.AUTO_CLEAN.key -> "true",
+      HoodieCleanConfig.CLEAN_MAX_COMMITS.key -> "1",
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "2")
+    // Do three more ingestion to trigger clean operation.
+    for (i <- 0 until 3) {
+      doWriteAndValidateDataAndPartitionStats(
+        writeOpt,
+        operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+        saveMode = SaveMode.Append)
+    }
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    // Clean commit should be triggered.
+    val cleanInstantOpt = 
metaClient.getActiveTimeline.getCleanerTimeline.lastInstant()
+    assertTrue(cleanInstantOpt.isPresent)
+    val cleanMetadataBytes = 
metaClient.getActiveTimeline.getInstantDetails(cleanInstantOpt.get)
+    val cleanMetadata = 
deserializeAvroMetadataLegacy(cleanMetadataBytes.get(), 
classOf[HoodieCleanMetadata])
+    // This clean operation deletes 6 files created by the first commit.
+    assertTrue(cleanMetadata.getTotalFilesDeleted > 0)
     // Restore to savepoint
     writeClient.restoreToSavepoint(savepointTimestamp)
-    // verify restore completed
+    // Verify restore completed
     
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
-    // verify partition stats and column stats are deleted
+    // Verify partition stats is delete, but other index are not.
     metaClient = HoodieTableMetaClient.reload(metaClient)
-    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
-    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
-    // do another upsert and validate the partition stats
-    doWriteAndValidateDataAndPartitionStats(hudiOpts, operation = 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, 
false)
+    val metadataPartitions = metaClient.getTableConfig.getMetadataPartitions
+    
assertTrue(metadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+    
assertTrue(metadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+    
assertFalse(metadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+    
assertTrue(metadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+    // Do another upsert and validate the partition stats
+    doWriteAndValidateDataAndPartitionStats(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      validate = false)
     val latestDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
-    val partitionStatsIndex = new PartitionStatsIndexSupport(spark, 
latestDf.schema, 
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(),
 metaClient)
-    val partitionStats = 
partitionStatsIndex.loadColumnStatsIndexRecords(targetColumnsToIndex, 
shouldReadInMemory = true).collectAsList()
+    val partitionStatsIndex = new PartitionStatsIndexSupport(
+      spark, latestDf.schema, HoodieMetadataConfig.newBuilder()
+        .enable(true)
+        .withMetadataIndexPartitionStats(true)
+        .build(),
+      metaClient)
+    val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(
+      targetColumnsToIndex,
+      shouldReadInMemory = true)
+      .collectAsList()
     assertTrue(partitionStats.size() > 0)
+    // Assert column stats after restore.
+    val columnStatsIndex = new ColumnStatsIndexSupport(
+      spark, latestDf.schema, HoodieMetadataConfig.newBuilder()
+        .enable(true)
+        .withMetadataIndexPartitionStats(true)
+        .build(),
+      metaClient)
+    val columnStats = columnStatsIndex
+      .loadColumnStatsIndexRecords(targetColumnsToIndex, shouldReadInMemory = 
true)
+      .collectAsList()
+    // All files from first commit have been removed.
+    for (stats <- columnStats.asScala) {
+      
assertFalse(stats.getFileName.contains(firstCompletedInstant.get().requestedTime()))
+    }
   }
 
   /**

Reply via email to