This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9ae1008d940b fix: Fixing secondary record generation for MDT (#14045)
9ae1008d940b is described below
commit 9ae1008d940b7817a2925820a630dff586ee5679
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Oct 3 11:15:18 2025 -0700
fix: Fixing secondary record generation for MDT (#14045)
* Fixing secondary record generation for MDT
* Addressing feedback and adding async compaction tests
* Fixing checkstyle
---
.../metadata/HoodieBackedTableMetadataWriter.java | 7 +-
.../SecondaryIndexRecordGenerationUtils.java | 94 +++++++++---
.../TestMetadataUtilRLIandSIRecordGeneration.java | 10 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +
.../functional/TestSecondaryIndexPruning.scala | 164 ++++++++++++++++++++-
5 files changed, 243 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cf6990635291..3975da6bdac9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1531,12 +1531,7 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
return engineContext.emptyHoodieData();
}
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
- // Load file system view for only the affected partitions on the driver.
- // By loading on the driver one time, we avoid loading the same metadata
multiple times on the executors.
- HoodieTableFileSystemView fsView = getMetadataView();
- fsView.loadPartitions(new
ArrayList<>(commitMetadata.getWritePartitionPaths()));
- return convertWriteStatsToSecondaryIndexRecords(allWriteStats,
instantTime, indexDefinition, dataWriteConfig.getMetadataConfig(),
- fsView, dataMetaClient, engineContext, dataWriteConfig.getProps());
+ return convertWriteStatsToSecondaryIndexRecords(allWriteStats,
instantTime, indexDefinition, dataWriteConfig.getMetadataConfig(),
dataMetaClient, engineContext, dataWriteConfig);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
similarity index 75%
rename from
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
rename to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index bde693f82eac..2a6599244e05 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -23,28 +23,35 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.avro.Schema;
@@ -57,12 +64,12 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createSecondaryIndexRecord;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.filePath;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.tryResolveSchemaForTable;
/**
@@ -80,7 +87,7 @@ public class SecondaryIndexRecordGenerationUtils {
* @param fsView file system view as of instant time
* @param dataMetaClient data table meta client
* @param engineContext engine context
- * @param props the writer properties
+ * @param writeConfig hoodie write config.
* @return {@link HoodieData} of {@link HoodieRecord} to be updated in the
metadata table for the given secondary index partition
*/
@VisibleForTesting
@@ -88,10 +95,11 @@ public class SecondaryIndexRecordGenerationUtils {
String instantTime,
HoodieIndexDefinition indexDefinition,
HoodieMetadataConfig metadataConfig,
-
HoodieTableFileSystemView fsView,
HoodieTableMetaClient dataMetaClient,
HoodieEngineContext engineContext,
-
TypedProperties props) {
+
HoodieWriteConfig writeConfig
+
) {
+ TypedProperties props = writeConfig.getProps();
// Secondary index cannot support logs having inserts with current
offering. So, lets validate that.
if (allWriteStats.stream().anyMatch(writeStat -> {
String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
@@ -114,21 +122,60 @@ public class SecondaryIndexRecordGenerationUtils {
String fileId = writeStatsByFileIdEntry.getKey();
List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
String partition = writeStats.get(0).getPartitionPath();
- FileSlice previousFileSliceForFileId =
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+ StoragePath basePath = dataMetaClient.getBasePath();
+
+ // validate that for a given fileId, either we have 1 parquet file or N
log files.
+ AtomicInteger totalParquetFiles = new AtomicInteger();
+ AtomicInteger totalLogFiles = new AtomicInteger();
+ writeStats.stream().forEach(writeStat -> {
+ if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath())))
{
+ totalLogFiles.getAndIncrement();
+ } else {
+ totalParquetFiles.getAndIncrement();
+ }
+ });
+
+ ValidationUtils.checkArgument(!(totalParquetFiles.get() > 0 &&
totalLogFiles.get() > 0), "Only either of base file or log files are expected
for a given file group. "
+ + "Partition " + partition + ", fileId " + fileId);
+ if (totalParquetFiles.get() > 0) {
+ // we should expect only 1 parquet file
+ ValidationUtils.checkArgument(writeStats.size() == 1, "Only one new
parquet file expected per file group per commit");
+ }
+ // Instantiate Remote table FSV
+ TableFileSystemView.SliceView sliceView = getSliceView(writeConfig,
dataMetaClient);
+ Option<FileSlice> fileSliceOption =
sliceView.getLatestMergedFileSliceBeforeOrOn(partition, instantTime, fileId);
Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
- if (previousFileSliceForFileId == null) {
- // new file slice, so empty mapping for previous slice
- recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
- } else {
+ Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice;
+ if (fileSliceOption.isPresent()) { // if previous file slice is present.
recordKeyToSecondaryKeyForPreviousFileSlice =
- getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), previousFileSliceForFileId, tableSchema,
indexDefinition, instantTime, props, false);
- }
- List<FileSlice> latestIncludingInflightFileSlices =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(),
partition);
- FileSlice currentFileSliceForFileId =
latestIncludingInflightFileSlices.stream().filter(fs ->
fs.getFileId().equals(fileId)).findFirst()
- .orElseThrow(() -> new HoodieException("Could not find any file
slice for fileId " + fileId));
- Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
- getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema,
indexDefinition, instantTime, props, true);
- // Need to find what secondary index record should be deleted, and what
should be inserted.
+ getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), fileSliceOption.get(), tableSchema,
indexDefinition, instantTime, props, false);
+ // branch out based on whether new parquet file is added or log files
are added.
+ if (totalParquetFiles.get() > 0) { // new base file/file slice is
created in current commit.
+ FileSlice currentFileSliceForFileId = new FileSlice(partition,
instantTime, fileId);
+ HoodieWriteStat stat = writeStats.get(0);
+ StoragePathInfo baseFilePathInfo = new StoragePathInfo(new
StoragePath(basePath, stat.getPath()), stat.getFileSizeInBytes(), false,
(short) 0, 0, 0);
+ currentFileSliceForFileId.setBaseFile(new
HoodieBaseFile(baseFilePathInfo));
+ recordKeyToSecondaryKeyForCurrentFileSlice =
+ getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema,
indexDefinition, instantTime, props, true);
+ } else { // log files are added in current commit
+ // add new log files to existing latest file slice and compute the
secondary index to primary key mapping.
+ FileSlice latestFileSlice = fileSliceOption.get();
+ writeStats.stream().forEach(writeStat -> {
+ StoragePathInfo logFile = new StoragePathInfo(new
StoragePath(basePath, writeStat.getPath()), writeStat.getFileSizeInBytes(),
false, (short) 0, 0, 0);
+ latestFileSlice.addLogFile(new HoodieLogFile(logFile));
+ });
+ recordKeyToSecondaryKeyForCurrentFileSlice =
+ getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), latestFileSlice, tableSchema,
indexDefinition, instantTime, props, true);
+ }
+ } else { // new file group
+ recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
// previous slice is empty.
+ FileSlice currentFileSliceForFileId = new FileSlice(partition,
instantTime, fileId);
+ HoodieWriteStat stat = writeStats.get(0);
+ StoragePathInfo baseFilePathInfo = new StoragePathInfo(new
StoragePath(basePath, stat.getPath()), stat.getFileSizeInBytes(), false,
(short) 0, 0, 0);
+ currentFileSliceForFileId.setBaseFile(new
HoodieBaseFile(baseFilePathInfo));
+ recordKeyToSecondaryKeyForCurrentFileSlice =
+ getRecordKeyToSecondaryKey(dataMetaClient,
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema,
indexDefinition, instantTime, props, true);
+ } // Need to find what secondary index record should be deleted, and
what should be inserted.
// For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it
is not present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should
be inserted.
// For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it
is present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should be
updated.
// For each entry in recordKeyToSecondaryKeyForPreviousFileSlice, if it
is not present in recordKeyToSecondaryKeyForCurrentFileSlice, then it should be
deleted.
@@ -154,6 +201,17 @@ public class SecondaryIndexRecordGenerationUtils {
});
}
+ private static TableFileSystemView.SliceView getSliceView(HoodieWriteConfig
config, HoodieTableMetaClient dataMetaClient) {
+ HoodieEngineContext context = new
HoodieLocalEngineContext(dataMetaClient.getStorageConf());
+ FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(context, config.getMetadataConfig(),
config.getViewStorageConfig(),
+ config.getCommonConfig(), unused -> getMetadataTable(config,
dataMetaClient, context));
+ return viewManager.getFileSystemView(dataMetaClient);
+ }
+
+ private static HoodieTableMetadata getMetadataTable(HoodieWriteConfig
config, HoodieTableMetaClient metaClient, HoodieEngineContext context) {
+ return metaClient.getTableFormat().getMetadataFactory().create(context,
metaClient.getStorage(), config.getMetadataConfig(), config.getBasePath());
+ }
+
public static <T> Map<String, String>
getRecordKeyToSecondaryKey(HoodieTableMetaClient metaClient,
HoodieReaderContext<T> readerContext,
FileSlice
fileSlice,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 0dfda13075d7..0bf079b34279 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -372,7 +372,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
metadataView = new HoodieTableFileSystemView(metadata, metaClient,
metaClient.getActiveTimeline());
List<HoodieWriteStat> allWriteStats =
writeStatusList2.stream().map(WriteStatus::getStat).collect(Collectors.toList());
secondaryIndexRecords =
- convertWriteStatsToSecondaryIndexRecords(allWriteStats,
secondCommitTime, indexDefinition, metadataConfig, metadataView, metaClient,
engineContext, writeConfig.getProps()).collectAsList();
+ convertWriteStatsToSecondaryIndexRecords(allWriteStats,
secondCommitTime, indexDefinition, metadataConfig, metaClient, engineContext,
writeConfig).collectAsList();
client.commit(secondCommitTime, jsc.parallelize(writeStatusList2));
// There should be 3 SI records:
@@ -406,7 +406,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
metadataView = new HoodieTableFileSystemView(metadata, metaClient,
metaClient.getActiveTimeline());
allWriteStats =
writeStatusList3.stream().map(WriteStatus::getStat).collect(Collectors.toList());
secondaryIndexRecords =
- convertWriteStatsToSecondaryIndexRecords(allWriteStats,
thirdCommitTime, indexDefinition, metadataConfig, metadataView, metaClient,
engineContext, writeConfig.getProps()).collectAsList();
+ convertWriteStatsToSecondaryIndexRecords(allWriteStats,
thirdCommitTime, indexDefinition, metadataConfig, metaClient, engineContext,
writeConfig).collectAsList();
client.commit(thirdCommitTime, jsc.parallelize(writeStatusList3));
// There should be 1 SI records: 1 delete due to deletes3
@@ -437,7 +437,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
metadataView = new HoodieTableFileSystemView(metadata, metaClient,
metaClient.getActiveTimeline());
allWriteStats =
writeStatusList4.stream().map(WriteStatus::getStat).collect(Collectors.toList());
secondaryIndexRecords =
- convertWriteStatsToSecondaryIndexRecords(allWriteStats,
fourthCommitTime, indexDefinition, metadataConfig, metadataView, metaClient,
engineContext, writeConfig.getProps()).collectAsList();
+ convertWriteStatsToSecondaryIndexRecords(allWriteStats,
fourthCommitTime, indexDefinition, metadataConfig, metaClient, engineContext,
writeConfig).collectAsList();
client.commit(fourthCommitTime, jsc.parallelize(writeStatusList4));
// There should be 1 SI records: 1 insert due to inserts4
@@ -460,7 +460,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
metadataView = new HoodieTableFileSystemView(metadata, metaClient,
metaClient.getActiveTimeline());
allWriteStats =
writeStatusList5.stream().map(WriteStatus::getStat).collect(Collectors.toList());
secondaryIndexRecords =
- convertWriteStatsToSecondaryIndexRecords(allWriteStats,
fifthCommitTime, indexDefinition, metadataConfig, metadataView, metaClient,
engineContext, writeConfig.getProps()).collectAsList();
+ convertWriteStatsToSecondaryIndexRecords(allWriteStats,
fifthCommitTime, indexDefinition, metadataConfig, metaClient, engineContext,
writeConfig).collectAsList();
client.commit(fifthCommitTime, jsc.parallelize(writeStatusList5));
// There should be 0 SI records because the secondary key field "rider"
value has not changed.
@@ -479,7 +479,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
metadataView = new HoodieTableFileSystemView(metadata, metaClient,
metaClient.getActiveTimeline());
allWriteStats = compactionCommitMetadata.getWriteStats();
secondaryIndexRecords = convertWriteStatsToSecondaryIndexRecords(
- allWriteStats, compactionInstantOpt.get(), indexDefinition,
metadataConfig, metadataView, metaClient, engineContext,
writeConfig.getProps()).collectAsList();
+ allWriteStats, compactionInstantOpt.get(), indexDefinition,
metadataConfig, metaClient, engineContext, writeConfig).collectAsList();
// Get valid and deleted secondary index records
List<HoodieRecord> validSecondaryIndexRecords3 = new ArrayList<>();
List<HoodieRecord> deletedSecondaryIndexRecords3 = new ArrayList<>();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 873803018939..c9f2dd5cd433 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -763,6 +763,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.write.merge.handle.class",
"org.apache.hudi.io.FileGroupReaderBasedMergeHandle")
+ .option("hoodie.index.type","SIMPLE")
+ .option("hoodie.metadata.enable","false")
.mode(SaveMode.Append)
.save(basePath)
val metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 6e3670203cb6..18cbb29163bb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -33,7 +33,7 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config._
import org.apache.hudi.exception.{HoodieMetadataIndexException,
HoodieWriteConflictException}
-import
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
+import
org.apache.hudi.functional.TestSecondaryIndexPruning.{SecondaryIndexStreamingWritesTestCase,
SecondaryIndexTestCase}
import org.apache.hudi.metadata._
import
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
import org.apache.hudi.storage.StoragePath
@@ -49,7 +49,7 @@ import org.apache.spark.sql.types.StringType
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource,
ValueSource}
import org.junit.jupiter.params.provider.Arguments.arguments
import org.scalatest.Assertions.{assertResult, assertThrows}
@@ -278,10 +278,11 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
@ParameterizedTest
- @MethodSource(Array("testSecondaryIndexPruningParameters"))
- def testSecondaryIndexPruningWithUpdates(testCase: SecondaryIndexTestCase):
Unit = {
+ @MethodSource(Array("testSecondaryIndexPruningStreamingParameters"))
+ def testSecondaryIndexPruningWithUpdates(testCase:
SecondaryIndexStreamingWritesTestCase): Unit = {
val tableType = testCase.tableType
val isPartitioned = testCase.isPartitioned
+ val isStreamingWritesEnabled = testCase.isStreamingWrites
var hudiOpts = commonOpts
hudiOpts = hudiOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
@@ -300,11 +301,13 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
|) using hudi
| options (
| primaryKey ='record_key_col',
+ | type = '$sqlTableType',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true',
- | hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ | hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ | hoodie.metadata.streaming.write.enabled =
'$isStreamingWritesEnabled'
| )
| $partitionedByClause
| location '$basePath'
@@ -323,6 +326,9 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
+
+ assertEquals(metaClient.getTableConfig.getTableType.name(), tableType)
+
// validate the secondary index records themselves
checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
@@ -351,6 +357,145 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
Seq(1, "row1", "xyz", "p1")
)
verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+
+ // update the secondary key column
+ spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where
record_key_col = 'row1'")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
+ Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+ Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+ Seq(s"xyz2${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+ )
+ // validate data and data skipping
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
+ Seq(1, "row1", "xyz2", "p1")
+ )
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testSecondaryIndexWithAsyncCompaction(isStreamingWritesEnabled:
Boolean): Unit = {
+ val tableType = "MERGE_ON_READ"
+ val isPartitioned = true
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ val sqlTableType = if
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
+ tableName += "test_async_compaction" + (if (isPartitioned) "_partitioned"
else "") + sqlTableType + isStreamingWritesEnabled
+ val partitionedByClause = if (isPartitioned) "partitioned
by(partition_key_col)" else ""
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | ts bigint,
+ | record_key_col string,
+ | not_record_key_col string,
+ | partition_key_col string
+ |) using hudi
+ | options (
+ | primaryKey ='record_key_col',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'record_key_col',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+ | hoodie.metadata.streaming.write.enabled =
'$isStreamingWritesEnabled'
+ | )
+ | $partitionedByClause
+ | location '$basePath'
+ """.stripMargin)
+ // by setting small file limit to 0, each insert will create a new file
+ // need to generate more file for non-partitioned table to test data
skipping
+ // as the partitioned table will have only one file per partition
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+ spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+ spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+ // create secondary index
+ spark.sql(s"create index idx_not_record_key_col on $tableName
(not_record_key_col)")
+ // validate index created successfully
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
+ assertEquals(metaClient.getTableConfig.getTableType.name(), tableType)
+
+ // validate the secondary index records themselves
+ checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
+ Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+ Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2"),
+ Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3")
+ )
+ // validate data skipping with filters on secondary key column
+ spark.sql("set hoodie.metadata.enable=true")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+ Seq(1, "row1", "abc", "p1")
+ )
+ verifyQueryPredicate(hudiOpts, "not_record_key_col")
+
+ // update the secondary key column
+ spark.sql(s"update $tableName set not_record_key_col = 'xyz' where
record_key_col = 'row1'")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
+ Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+ Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+ Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+ )
+ // validate data and data skipping
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
+ Seq(1, "row1", "xyz", "p1")
+ )
+ verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+
+ // Schedule compaction (inflight)
+ spark.sql(s"refresh table $tableName")
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+ spark.sql(s"schedule compaction on $tableName")
+ val compactionRows = spark.sql(s"show compaction on $tableName").collect()
+ val compactionInstant = compactionRows(0).getString(0)
+ assertTrue(compactionRows.length == 1)
+
+ // update the secondary key column
+ spark.sql(s"update $tableName set not_record_key_col = 'xyz3' where
record_key_col = 'row1'")
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
+ Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+ Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+ Seq(s"xyz3${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+ )
+ // validate data and data skipping
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
+ Seq(1, "row1", "xyz3", "p1")
+ )
+
+ // Complete compaction
+ spark.sql(s"run compaction on $tableName at $compactionInstant")
+ spark.sql(s"refresh table $tableName")
+
+ // Verify compaction
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
assertTrue(metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants.lastInstant.isPresent)
+
+ // validate the secondary index records themselves
+ checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
+ Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+ Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+ Seq(s"xyz3${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+ )
+ // validate data and data skipping
+ checkAnswer(s"select ts, record_key_col, not_record_key_col,
partition_key_col from $tableName where record_key_col = 'row1'")(
+ Seq(1, "row1", "xyz3", "p1")
+ )
+
}
@ParameterizedTest
@@ -1703,4 +1848,13 @@ object TestSecondaryIndexPruning {
arguments(SecondaryIndexTestCase("MERGE_ON_READ", isPartitioned = false))
)
}
+
+ case class SecondaryIndexStreamingWritesTestCase(tableType: String,
isPartitioned: Boolean, isStreamingWrites: Boolean)
+
+ def testSecondaryIndexPruningStreamingParameters():
java.util.stream.Stream[Arguments] = {
+ java.util.stream.Stream.of(
+ arguments(SecondaryIndexStreamingWritesTestCase("COPY_ON_WRITE",
isPartitioned = true, isStreamingWrites = true)),
+ arguments(SecondaryIndexStreamingWritesTestCase("MERGE_ON_READ",
isPartitioned = true, isStreamingWrites = true))
+ )
+ }
}