This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 55793b19710 [HUDI-8344] Delete certain indexes for restore operation
(#12098)
55793b19710 is described below
commit 55793b19710b458929fc50d9d407492ff802c9df
Author: Lin Liu <[email protected]>
AuthorDate: Tue Oct 15 07:50:35 2024 -0700
[HUDI-8344] Delete certain indexes for restore operation (#12098)
Anything other than FILES and RECORD_INDEX are deleted.
`MetadataPartitionType.shouldDeletePartitionOnRestore` holds
the logic for which partitions to delete. Added a test for column
stats, partition stats, sec index with files and record index.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 44 ++++++------
.../hudi/table/action/BaseActionExecutor.java | 15 ++++
.../hudi/client/TestJavaHoodieBackedMetadata.java | 10 ++-
.../functional/TestHoodieBackedMetadata.java | 15 ++--
.../hudi/metadata/MetadataPartitionType.java | 7 ++
.../hudi/functional/TestPartitionStatsIndex.scala | 52 +++++++++++++-
.../functional/TestSecondaryIndexPruning.scala | 84 +++++++++++++++++++++-
7 files changed, 195 insertions(+), 32 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cc101ed6ade..f340d86d102 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -112,7 +112,10 @@ import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchem
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromBaseFiles;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices;
+import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
import static
org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
@@ -389,40 +392,47 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
String commitTimeForPartition =
generateUniqueCommitInstantTime(initializationTime);
String partitionTypeName = partitionType.name();
LOG.info("Initializing MDT partition {} at instant {}",
partitionTypeName, commitTimeForPartition);
-
+ String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
try {
switch (partitionType) {
case FILES:
fileGroupCountAndRecordsPair =
initializeFilesPartition(partitionInfoList);
+ partitionName = FILES.getPartitionPath();
break;
case BLOOM_FILTERS:
fileGroupCountAndRecordsPair =
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+ partitionName = BLOOM_FILTERS.getPartitionPath();
break;
case COLUMN_STATS:
fileGroupCountAndRecordsPair =
initializeColumnStatsPartition(partitionToFilesMap);
+ partitionName = COLUMN_STATS.getPartitionPath();
break;
case RECORD_INDEX:
fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+ partitionName = RECORD_INDEX.getPartitionPath();
break;
case FUNCTIONAL_INDEX:
- Set<String> functionalIndexPartitionsToInit =
getFunctionalIndexPartitionsToInit();
+ Set<String> functionalIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType);
if (functionalIndexPartitionsToInit.isEmpty()) {
continue;
}
ValidationUtils.checkState(functionalIndexPartitionsToInit.size()
== 1, "Only one functional index at a time is supported for now");
- fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
+ partitionName = functionalIndexPartitionsToInit.iterator().next();
+ fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(partitionName);
break;
case PARTITION_STATS:
fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(partitionInfoList);
+ partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
- Set<String> secondaryIndexPartitionsToInit =
getSecondaryIndexPartitionsToInit();
+ Set<String> secondaryIndexPartitionsToInit =
getIndexPartitionsToInit(partitionType);
if (secondaryIndexPartitionsToInit.size() != 1) {
LOG.warn("Skipping secondary index initialization as only one
secondary index bootstrap at a time is supported for now. Provided: {}",
secondaryIndexPartitionsToInit);
continue;
}
- fileGroupCountAndRecordsPair =
initializeSecondaryIndexPartition(secondaryIndexPartitionsToInit.iterator().next());
+ partitionName = secondaryIndexPartitionsToInit.iterator().next();
+ fileGroupCountAndRecordsPair =
initializeSecondaryIndexPartition(partitionName);
break;
default:
throw new HoodieMetadataException(String.format("Unsupported MDT
partition type: %s", partitionType));
@@ -445,15 +455,14 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Generate the file groups
final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for
MDT partition " + partitionTypeName + " should be > 0");
- initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount);
+ initializeFileGroups(dataMetaClient, partitionType,
commitTimeForPartition, fileGroupCount, partitionName);
// Perform the commit using bulkCommit
HoodieData<HoodieRecord> records =
fileGroupCountAndRecordsPair.getValue();
- String partitionPath = partitionType.getPartitionPath(dataMetaClient,
dataWriteConfig.getIndexingConfig().getIndexName());
- bulkCommit(commitTimeForPartition, partitionPath, records,
fileGroupCount);
+ bulkCommit(commitTimeForPartition, partitionName, records,
fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
-
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionPath, true);
+
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient,
partitionName, true);
// initialize the metadata reader again so the MDT partition can be read
after initialization
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
@@ -537,16 +546,6 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
return Pair.of(fileGroupCount,
getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf));
}
- private Set<String> getFunctionalIndexPartitionsToInit() {
- if (dataMetaClient.getIndexMetadata().isEmpty()) {
- return Collections.emptySet();
- }
- Set<String> functionalIndexPartitions =
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().keySet();
- Set<String> completedMetadataPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
- functionalIndexPartitions.removeAll(completedMetadataPartitions);
- return functionalIndexPartitions;
- }
-
private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName)
{
Option<HoodieIndexMetadata> functionalIndexMetadata =
dataMetaClient.getIndexMetadata();
if (functionalIndexMetadata.isPresent()) {
@@ -556,10 +555,10 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- private Set<String> getSecondaryIndexPartitionsToInit() {
+ private Set<String> getIndexPartitionsToInit(MetadataPartitionType
partitionType) {
Set<String> secondaryIndexPartitions =
dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
.map(HoodieIndexDefinition::getIndexName)
- .filter(indexName ->
indexName.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .filter(indexName ->
indexName.startsWith(partitionType.getPartitionPath()))
.collect(Collectors.toSet());
Set<String> completedMetadataPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
secondaryIndexPartitions.removeAll(completedMetadataPartitions);
@@ -850,8 +849,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* record-index-bucket-0000, .... -> ..., record-index-bucket-0009
*/
private void initializeFileGroups(HoodieTableMetaClient dataMetaClient,
MetadataPartitionType metadataPartition, String instantTime,
- int fileGroupCount) throws IOException {
- String partitionName = metadataPartition.getPartitionPath(dataMetaClient,
dataWriteConfig.getIndexingConfig().getIndexName());
+ int fileGroupCount, String partitionName)
throws IOException {
// Remove all existing file groups or leftover files in the partition
final StoragePath partitionPath = new
StoragePath(metadataWriteConfig.getBasePath(), partitionName);
HoodieStorage storage = metadataMetaClient.getStorage();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 6de40c63c8c..06f6b67f99f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -28,7 +28,9 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieTable;
@@ -122,6 +124,7 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get())
{
+ dropIndexOnRestore();
metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
@@ -132,4 +135,16 @@ public abstract class BaseActionExecutor<T, I, K, O, R>
implements Serializable
}
}
}
+
+ /**
+ * Drop metadata partition, for restore operation for certain metadata
partitions.
+ */
+ protected final void dropIndexOnRestore() {
+ for (String partitionPath :
table.getMetaClient().getTableConfig().getMetadataPartitions()) {
+ if (MetadataPartitionType.shouldDeletePartitionOnRestore(partitionPath))
{
+ // setting backup to true as this delete is part of restore operation
+
HoodieTableMetadataUtil.deleteMetadataTablePartition(table.getMetaClient(),
context, partitionPath, true);
+ }
+ }
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index fce57a9d066..256035250e8 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -2803,7 +2803,15 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as
that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions =
FSUtils.getAllPartitionPaths(engineContext, storage,
getMetadataTableBasePath(basePath), false);
- assertEquals(metadataWriter.getEnabledPartitionTypes().size(),
metadataTablePartitions.size());
+ // check if the last instant is restore, then the metadata table should
have only the partitions that are not deleted
+
metaClient.reloadActiveTimeline().getReverseOrderedInstants().findFirst().ifPresent(instant
-> {
+ if (instant.getAction().equals(HoodieActiveTimeline.RESTORE_ACTION)) {
+
metadataWriter.getEnabledPartitionTypes().stream().filter(partitionType ->
!MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
+ .forEach(partitionType ->
assertTrue(metadataTablePartitions.contains(partitionType.getPartitionPath())));
+ } else {
+ assertEquals(metadataWriter.getEnabledPartitionTypes().size(),
metadataTablePartitions.size());
+ }
+ });
final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes =
new HashMap<>();
metadataWriter.getEnabledPartitionTypes().forEach(e ->
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 50b5f9a26bf..5d7e6d5aae2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2048,7 +2048,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
List<StoragePathInfo> commit3Files = metaFiles.stream()
.filter(pathInfo ->
pathInfo.getPath().getName().contains(commit3)
- &&
pathInfo.getPath().getName().contains(HoodieTimeline.DELTA_COMMIT_ACTION))
+ &&
pathInfo.getPath().getName().endsWith(HoodieTimeline.DELTA_COMMIT_ACTION))
.collect(Collectors.toList());
List<StoragePathInfo> rollbackFiles = metaFiles.stream()
.filter(pathInfo ->
@@ -2057,8 +2057,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// ensure commit3's delta commit in MDT has last mod time > the actual
rollback for previous failed commit i.e. commit2.
// if rollback wasn't eager, rollback's last mod time will be lower than
the commit3'd delta commit last mod time.
- assertTrue(
- commit3Files.get(0).getModificationTime() >
rollbackFiles.get(0).getModificationTime());
+ assertTrue(commit3Files.get(0).getModificationTime() >=
rollbackFiles.get(0).getModificationTime());
client.close();
}
@@ -3678,7 +3677,15 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as
that function filters all directory
// in the .hoodie folder.
List<String> metadataTablePartitions =
FSUtils.getAllPartitionPaths(engineContext, storage,
getMetadataTableBasePath(basePath), false);
- assertEquals(metadataWriter.getEnabledPartitionTypes().size(),
metadataTablePartitions.size());
+ // check if the last instant is restore, then the metadata table should
have only the partitions that are not deleted
+
metaClient.reloadActiveTimeline().getReverseOrderedInstants().findFirst().ifPresent(instant
-> {
+ if (instant.getAction().equals(HoodieActiveTimeline.RESTORE_ACTION)) {
+
metadataWriter.getEnabledPartitionTypes().stream().filter(partitionType ->
!MetadataPartitionType.shouldDeletePartitionOnRestore(partitionType.getPartitionPath()))
+ .forEach(partitionType ->
assertTrue(metadataTablePartitions.contains(partitionType.getPartitionPath())));
+ } else {
+ assertEquals(metadataWriter.getEnabledPartitionTypes().size(),
metadataTablePartitions.size());
+ }
+ });
final Map<String, MetadataPartitionType> metadataEnabledPartitionTypes =
new HashMap<>();
metadataWriter.getEnabledPartitionTypes().forEach(e ->
metadataEnabledPartitionTypes.put(e.getPartitionPath(), e));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index c539971162d..7d61a2b7fd9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -374,6 +374,13 @@ public enum MetadataPartitionType {
return newer;
}
+ /**
+ * Check if the partition path should be deleted on restore.
+ */
+ public static boolean shouldDeletePartitionOnRestore(String partitionPath) {
+ return fromPartitionPath(partitionPath) != FILES &&
fromPartitionPath(partitionPath) != RECORD_INDEX;
+ }
+
/**
* Get the metadata partition type for the given record type.
*/
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index 501613b5f2e..25f3258d8a5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -20,6 +20,8 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -32,11 +34,11 @@ import
org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieMetadataFileSystemView, MetadataPartitionType}
import org.apache.hudi.util.{JFunction, JavaConversions}
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex, PartitionStatsIndexSupport}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
import org.apache.spark.sql.types.StringType
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
@@ -322,6 +324,52 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
verifyQueryPredicate(hudiOpts)
}
+ /**
+ * 1. Enable column_stats, partition_stats and record_index (files already
enabled by default).
+ * 2. Do an insert and validate the partition stats index initialization.
+ * 3. Do an update and validate the partition stats index.
+ * 4. Do a savepoint and restore, and validate partition_stats and
column_stats are deleted.
+ * 5. Do an update and validate the partition stats index.
+ */
+ @Test
+ def testPartitionStatsWithRestore(): Unit = {
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.MERGE_ON_READ.name(),
+ HoodieMetadataConfig.ENABLE.key() -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() ->
"true",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true")
+
+ doWriteAndValidateDataAndPartitionStats(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ val firstCompletedInstant =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+ doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append)
+ // validate files and record_index are present
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.FILES.getPartitionPath))
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
+ // Do a savepoint
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
+ writeClient.savepoint(firstCompletedInstant.get().getTimestamp,
"testUser", "savepoint to first commit")
+ val savepointTimestamp =
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ assertEquals(firstCompletedInstant.get().getTimestamp, savepointTimestamp)
+ // Restore to savepoint
+ writeClient.restoreToSavepoint(savepointTimestamp)
+ // verify restore completed
+
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
+ // verify partition stats and column stats are deleted
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath))
+ // do another upsert and validate the partition stats
+ doWriteAndValidateDataAndPartitionStats(hudiOpts, operation =
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append,
false)
+ val latestDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
+ val partitionStatsIndex = new PartitionStatsIndexSupport(spark,
latestDf.schema,
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(),
metaClient)
+ val partitionStats =
partitionStatsIndex.loadColumnStatsIndexRecords(targetColumnsToIndex,
shouldReadInMemory = true).collectAsList()
+ assertTrue(partitionStats.size() > 0)
+ }
+
/**
* Test case to do updates and then validate partition stats with MDT
compaction.
* Any one table type is enough to test this as we are validating the
metadata table.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index d686f850816..395da41dff1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -20,6 +20,7 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING,
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
@@ -31,7 +32,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieMetadataIndexException,
HoodieWriteConflictException}
import
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
-import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView,
SparkHoodieBackedTableMetadataWriter}
+import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView,
MetadataPartitionType, SparkHoodieBackedTableMetadataWriter}
import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
@@ -41,7 +42,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, Row}
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
@@ -698,6 +699,85 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ /**
+ * 1. Enable secondary index and record_index (files already enabled by
default).
+ * 2. Do an insert and validate the secondary index initialization.
+ * 3. Do an update and validate the secondary index.
+ * 4. Do a savepoint and restore, and validate secondary index deleted.
+ * 5. Do an update and validate the secondary index is recreated.
+ */
+ @Test
+ def testSecondaryIndexWithSavepointAndRestore(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ val tableName = "test_secondary_index_with_savepoint_and_restore"
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ val sqlTableType = "mor"
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | record_key_col string,
+ | not_record_key_col string,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true'
+ | )
+ | partitioned by(partition_key_col)
+ | location '$basePath'
+ """.stripMargin)
+ // by setting small file limit to 0, each insert will create a new file
+ // need to generate more file for non-partitioned table to test data
skipping
+ // as the partitioned table will have only one file per partition
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+
+ // create secondary index
+ spark.sql(s"create index idx_not_record_key_col on $tableName using
secondary_index(not_record_key_col)")
+ // validate index created successfully
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ val secondaryIndexPartition = "secondary_index_idx_not_record_key_col"
+
assert(metaClient.getTableConfig.getMetadataPartitions.contains(secondaryIndexPartition))
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from
hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1")
+ )
+ // Do a savepoint
+ val firstCompletedInstant =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant()
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), getWriteConfig(hudiOpts))
+ writeClient.savepoint(firstCompletedInstant.get().getTimestamp,
"testUser", "savepoint to first commit")
+ val savepointTimestamp =
metaClient.reloadActiveTimeline().getSavePointTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ assertEquals(firstCompletedInstant.get().getTimestamp,
savepointTimestamp)
+ // Restore to savepoint
+ writeClient.restoreToSavepoint(savepointTimestamp)
+ // verify restore completed
+
assertTrue(metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().isPresent)
+ // verify secondary index partition is deleted
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath))
+ // however index definition should still be present
+ assertTrue(metaClient.getIndexMetadata.isPresent &&
metaClient.getIndexMetadata.get.getIndexDefinitions.get(secondaryIndexPartition).getIndexType.equals("secondary_index"))
+ // update the secondary key column
+ spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.recordKey,
SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
+ Seq("abc", "row1", true),
+ Seq("xyz", "row1", false)
+ )
+ }
+ }
+
private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row:
_*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}