This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1d9a0ce8e71b [HUDI-8385] Keep ColumnStats consistent during Restore
(#13664)
1d9a0ce8e71b is described below
commit 1d9a0ce8e71b2232e233178b80fd71a505ef25ab
Author: Lin Liu <[email protected]>
AuthorDate: Tue Sep 30 17:33:30 2025 -0700
[HUDI-8385] Keep ColumnStats consistent during Restore (#13664)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 53 ++++++-
.../TestHoodieBackedTableMetadataWriter.java | 158 +++++++++++++++++++++
.../hudi/metadata/MetadataPartitionType.java | 5 +-
.../hudi/functional/TestPartitionStatsIndex.scala | 117 +++++++++++----
4 files changed, 302 insertions(+), 31 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5eb170c97881..b2ee4d4fc984 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -593,8 +593,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
LOG.info("Indexing {} columns for column stats index",
columnsToIndex.size());
// during initialization, we need stats for base and log files.
- HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionIdToAllFilesMap,
dataMetaClient, dataWriteConfig.getMetadataConfig(),
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(engineContext,
Collections.emptyMap(), partitionIdToAllFilesMap,
+ dataMetaClient, dataWriteConfig.getMetadataConfig(),
dataWriteConfig.getColumnStatsIndexParallelism(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
columnsToIndex);
@@ -1611,14 +1611,59 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
// We need to choose a timestamp which would be a validInstantTime for
MDT. This is either a commit timestamp completed on the dataset
// or a new timestamp which we use for MDT clean, compaction etc.
String syncCommitTime = createRestoreInstantTime();
- processAndCommit(syncCommitTime, () ->
HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
- partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
+ processAndCommit(syncCommitTime, () -> {
+ // For Files partition.
+ Map<String, HoodieData<HoodieRecord>> partitionRecords = new
HashMap<>();
+
partitionRecords.putAll(HoodieTableMetadataUtil.convertMissingPartitionRecords(engineContext,
+ partitionsToDelete, partitionFilesToAdd, partitionFilesToDelete,
syncCommitTime));
+ // For ColumnStats partition if enabled.
+ if
(dataMetaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()))
{
+ partitionRecords.putAll(convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient,
+ dataWriteConfig.getMetadataConfig(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
+
dataWriteConfig.getMetadataConfig().getColumnStatsIndexParallelism()));
+ }
+ return partitionRecords;
+ });
closeInternal();
} catch (IOException e) {
throw new HoodieMetadataException("IOException during MDT restore sync",
e);
}
}
+ static Map<String, HoodieData<HoodieRecord>>
convertToColumnStatsRecord(Map<String, Map<String, Long>> partitionFilesToAdd,
+
Map<String, List<String>> partitionFilesToDelete,
+
HoodieEngineContext engineContext,
+
HoodieTableMetaClient dataMetaClient,
+
HoodieMetadataConfig metadataConfig,
+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt,
+ int
columnStatsIndexParallelism) {
+ if (partitionFilesToDelete.isEmpty() && partitionFilesToAdd.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Lazy<Option<Schema>> tableSchema =
+ Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
+ final List<String> columnsToIndex = new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(
+ dataMetaClient.getTableConfig(),
+ metadataConfig,
+ tableSchema,
+ false,
+ recordTypeOpt,
+
HoodieTableMetadataUtil.existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS,
dataMetaClient)).keySet());
+ if (columnsToIndex.isEmpty()) {
+ LOG.info("Since there are no columns to index, stop to generate
ColumnStats records.");
+ return Collections.emptyMap();
+ }
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, partitionFilesToDelete,
+ partitionFilesToAdd, dataMetaClient,
+ metadataConfig,
+ columnStatsIndexParallelism,
+ metadataConfig.getMaxReaderBufferSize(),
+ columnsToIndex);
+ return Collections.singletonMap(COLUMN_STATS.getPartitionPath(), records);
+ }
+
String createRestoreInstantTime() {
return writeClient.createNewInstantTime(false);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index 7b91d197e6da..e583dead5ddc 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -19,26 +19,40 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.MockedStatic;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -47,6 +61,21 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class TestHoodieBackedTableMetadataWriter {
+ private HoodieEngineContext engineContext;
+ private HoodieTableMetaClient dataMetaClient;
+ private HoodieMetadataConfig metadataConfig;
+ private StorageConfiguration<?> storageConf;
+
+ @BeforeEach
+ void setUp() {
+ engineContext = mock(HoodieEngineContext.class);
+ dataMetaClient = mock(HoodieTableMetaClient.class);
+ metadataConfig = mock(HoodieMetadataConfig.class);
+ storageConf = mock(StorageConfiguration.class);
+
+ when(metadataConfig.getMaxReaderBufferSize()).thenReturn(1024);
+ }
+
@ParameterizedTest
@CsvSource(value = {
"true,true,false,true",
@@ -117,6 +146,135 @@ class TestHoodieBackedTableMetadataWriter {
assertSame(mockMetaClient,
HoodieBackedTableMetadataWriter.rollbackFailedWrites(lazyWriteConfig,
mockWriteClient, mockMetaClient));
}
+ @Test
+ void testConvertToColumnStatsRecordWithEmptyInputs() {
+ Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+ Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+ Map<String, HoodieData<HoodieRecord>> result =
+ HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient, metadataConfig, Option.empty(), 4);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testConvertToColumnStatsRecordWithEmptyColumnsToIndex() {
+ // Mock HoodieTableMetadataUtil.getColumnsToIndex to return empty list
+ try (MockedStatic<HoodieTableMetadataUtil> mockedUtil =
mockStatic(HoodieTableMetadataUtil.class)) {
+ Map<String, Object> emptyColumnsMap = new HashMap<>();
+ mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+ any(), any(), any(), eq(false), any()))
+ .thenReturn(emptyColumnsMap);
+
+ Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+ Map<String, Long> filesToAdd = new HashMap<>();
+ filesToAdd.put("file1.parquet", 1024L);
+ partitionFilesToAdd.put("partition1", filesToAdd);
+ Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+ Map<String, HoodieData<HoodieRecord>> result =
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient, metadataConfig,
+ Option.empty(), 4);
+ assertTrue(result.isEmpty());
+ }
+ }
+
+ @Test
+ void testConvertToColumnStatsRecordWithValidColumns() {
+ // Mock HoodieTableMetadataUtil.getColumnsToIndex to return valid columns
+ try (MockedStatic<HoodieTableMetadataUtil> mockedUtil =
mockStatic(HoodieTableMetadataUtil.class)) {
+ Map<String, Object> columnsMap = new HashMap<>();
+ columnsMap.put("col1", null);
+ columnsMap.put("col2", null);
+ mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+ any(), any(), any(), eq(false), any(), any()))
+ .thenReturn(columnsMap);
+
+ // Mock convertFilesToColumnStatsRecords to return empty HoodieData
+ HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
+ mockedUtil.when(() ->
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
+ .thenReturn(mockHoodieData);
+
+ Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+ partitionFilesToAdd.put("partition1", new HashMap<>());
+ Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+
+ Map<String, HoodieData<HoodieRecord>> result =
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient, metadataConfig,
+ Option.empty(), 4);
+
+ // Verify result contains COLUMN_STATS partition
+ assertEquals(1, result.size());
+
assertTrue(result.containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+ assertEquals(mockHoodieData,
result.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+
+ // Verify the method was called with correct parameters
+ mockedUtil.verify(() ->
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ eq(engineContext),
+ eq(partitionFilesToDelete),
+ eq(partitionFilesToAdd),
+ eq(dataMetaClient),
+ eq(metadataConfig),
+ eq(4),
+ eq(1024),
+ any()
+ ));
+ }
+ }
+
+ @Test
+ void testConvertToColumnStatsRecordWithMixedInputs() {
+ // Mock HoodieTableMetadataUtil.getColumnsToIndex to return valid columns
+ try (MockedStatic<HoodieTableMetadataUtil> mockedUtil =
mockStatic(HoodieTableMetadataUtil.class)) {
+ Map<String, Object> columnsMap = new HashMap<>();
+ columnsMap.put("col1", null);
+ columnsMap.put("col2", null);
+ columnsMap.put("col3", null);
+ mockedUtil.when(() -> HoodieTableMetadataUtil.getColumnsToIndex(
+ any(), any(), any(), eq(false), any(), any()))
+ .thenReturn(columnsMap);
+
+ // Mock convertFilesToColumnStatsRecords to return empty HoodieData
+ HoodieData<HoodieRecord> mockHoodieData = mock(HoodieData.class);
+ mockedUtil.when(() ->
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ any(), any(), any(), any(), any(), anyInt(), anyInt(), any()))
+ .thenReturn(mockHoodieData);
+
+ Map<String, Map<String, Long>> partitionFilesToAdd = new HashMap<>();
+ Map<String, Long> filesToAdd = new HashMap<>();
+ filesToAdd.put("file1.parquet", 1024L);
+ filesToAdd.put("file2.parquet", 2048L);
+ partitionFilesToAdd.put("partition1", filesToAdd);
+
+ Map<String, List<String>> partitionFilesToDelete = new HashMap<>();
+ List<String> filesToDelete = new ArrayList<>();
+ filesToDelete.add("old_file1.parquet");
+ filesToDelete.add("old_file2.parquet");
+ partitionFilesToDelete.put("partition1", filesToDelete);
+
+ Map<String, HoodieData<HoodieRecord>> result =
HoodieBackedTableMetadataWriter.convertToColumnStatsRecord(
+ partitionFilesToAdd, partitionFilesToDelete, engineContext,
dataMetaClient, metadataConfig,
+ Option.empty(), 4);
+
+ // Verify result contains COLUMN_STATS partition.
+ assertEquals(1, result.size());
+
assertTrue(result.containsKey(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
+
+ // Verify the method was called with correct parameters.
+ mockedUtil.verify(() ->
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ eq(engineContext),
+ eq(partitionFilesToDelete),
+ eq(partitionFilesToAdd),
+ eq(dataMetaClient),
+ eq(metadataConfig),
+ eq(4),
+ eq(1024),
+ any()
+ ));
+ }
+ }
+
@Test
void testValidateRollbackForMDT() throws Exception {
List<HoodieInstant> instants = new ArrayList<>();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 6aa9857b075e..4e52cb378172 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -422,7 +422,10 @@ public enum MetadataPartitionType {
* Check if the partition path should be deleted on restore.
*/
public static boolean shouldDeletePartitionOnRestore(String partitionPath) {
- return fromPartitionPath(partitionPath) != FILES &&
fromPartitionPath(partitionPath) != RECORD_INDEX;
+ MetadataPartitionType partitionType = fromPartitionPath(partitionPath);
+ return partitionType != FILES
+ && partitionType != RECORD_INDEX
+ && partitionType != COLUMN_STATS;
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 86afa16f6f77..e58fd6c103bd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,8 +19,9 @@
package org.apache.hudi.functional
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex, PartitionStatsIndexSupport}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions,
DataSourceWriteOptions, HoodieFileIndex, PartitionStatsIndexSupport}
import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.avro.model.HoodieCleanMetadata
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
@@ -29,6 +30,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode,
WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
+import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadataLegacy
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
@@ -395,50 +397,113 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
}
/**
- * 1. Enable column_stats, partition_stats and record_index (files already
enabled by default).
- * 2. Do an insert and validate the partition stats index initialization.
- * 3. Do an update and validate the partition stats index.
- * 4. Do a savepoint and restore, and validate partition_stats and
column_stats are deleted.
- * 5. Do an update and validate the partition stats index.
+ * 1. Enable column_stats, partition_stats and record_index (files/RLI
already enabled by default).
+ * 2. Do two inserts and validate index initialization.
+ * 3. Do a savepoint on the second commits.
+ * 4. Add three more commits to trigger clean, which cleans the files from
the first commit.
+ * 5. Restore, and validate that partition_stats is deleted, but
column_stats partition exists.
+ * 6. Validate that column_stats does not contain records with file names
from first commit.
*/
@Test
def testPartitionStatsWithRestore(): Unit = {
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.MERGE_ON_READ.name(),
- HoodieMetadataConfig.ENABLE.key() -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() ->
"true",
- HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true")
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true")
+ // First ingest.
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
val firstCompletedInstant =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
- doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append)
- // validate files and record_index are present
-
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
-
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
- // Do a savepoint
+ // Second ingest.
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val secondCompletedInstant = metaClient.getActiveTimeline
+ .getCommitsTimeline.filterCompletedInstants().getInstants().get(1)
+ // Validate index partitions are present.
+ val initialMetadataPartitions =
metaClient.getTableConfig.getMetadataPartitions
+
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+
assertTrue(initialMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+ // Do a savepoint on the second commit.
val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
- writeClient.savepoint(firstCompletedInstant.get().requestedTime,
"testUser", "savepoint to first commit")
+ writeClient.savepoint(secondCompletedInstant.requestedTime, "testUser",
"savepoint to second commit")
writeClient.close()
- val savepointTimestamp =
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().requestedTime
- assertEquals(firstCompletedInstant.get().requestedTime, savepointTimestamp)
+ val savepointTimestamp = metaClient.reloadActiveTimeline()
+
.getSavePointTimeline.filterCompletedInstants().lastInstant().get().requestedTime
+ assertEquals(secondCompletedInstant.requestedTime, savepointTimestamp)
+
+ // Add more ingests and trigger a clean to remove files from first
ingestion.
+ val writeOpt = hudiOpts ++ Map(
+ HoodieCleanConfig.AUTO_CLEAN.key -> "true",
+ HoodieCleanConfig.CLEAN_MAX_COMMITS.key -> "1",
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "2")
+ // Do three more ingestion to trigger clean operation.
+ for (i <- 0 until 3) {
+ doWriteAndValidateDataAndPartitionStats(
+ writeOpt,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+ }
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ // Clean commit should be triggered.
+ val cleanInstantOpt =
metaClient.getActiveTimeline.getCleanerTimeline.lastInstant()
+ assertTrue(cleanInstantOpt.isPresent)
+ val cleanMetadataBytes =
metaClient.getActiveTimeline.getInstantDetails(cleanInstantOpt.get)
+ val cleanMetadata =
deserializeAvroMetadataLegacy(cleanMetadataBytes.get(),
classOf[HoodieCleanMetadata])
+ // This clean operation deletes 6 files created by the first commit.
+ assertTrue(cleanMetadata.getTotalFilesDeleted > 0)
// Restore to savepoint
writeClient.restoreToSavepoint(savepointTimestamp)
- // verify restore completed
+ // Verify restore completed
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
- // verify partition stats and column stats are deleted
+ // Verify partition stats is delete, but other index are not.
metaClient = HoodieTableMetaClient.reload(metaClient)
-
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
-
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
- // do another upsert and validate the partition stats
- doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append,
false)
+ val metadataPartitions = metaClient.getTableConfig.getMetadataPartitions
+
assertTrue(metadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+
assertTrue(metadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+
assertFalse(metadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+
assertTrue(metadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+ // Do another upsert and validate the partition stats
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
val latestDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
- val partitionStatsIndex = new PartitionStatsIndexSupport(spark,
latestDf.schema,
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(),
metaClient)
- val partitionStats =
partitionStatsIndex.loadColumnStatsIndexRecords(targetColumnsToIndex,
shouldReadInMemory = true).collectAsList()
+ val partitionStatsIndex = new PartitionStatsIndexSupport(
+ spark, latestDf.schema, HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMetadataIndexPartitionStats(true)
+ .build(),
+ metaClient)
+ val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(
+ targetColumnsToIndex,
+ shouldReadInMemory = true)
+ .collectAsList()
assertTrue(partitionStats.size() > 0)
+ // Assert column stats after restore.
+ val columnStatsIndex = new ColumnStatsIndexSupport(
+ spark, latestDf.schema, HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMetadataIndexPartitionStats(true)
+ .build(),
+ metaClient)
+ val columnStats = columnStatsIndex
+ .loadColumnStatsIndexRecords(targetColumnsToIndex, shouldReadInMemory =
true)
+ .collectAsList()
+ // All files from first commit have been removed.
+ for (stats <- columnStats.asScala) {
+
assertFalse(stats.getFileName.contains(firstCompletedInstant.get().requestedTime()))
+ }
}
/**