This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ae1008d940b fix: Fixing secondary record generation for MDT (#14045)
9ae1008d940b is described below

commit 9ae1008d940b7817a2925820a630dff586ee5679
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Oct 3 11:15:18 2025 -0700

    fix: Fixing secondary record generation for MDT (#14045)
    
    * Fixing secondary record generation for MDT
    
    * Addressing feedback and adding async compaction tests
    
    * Fixing checkstyle
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   7 +-
 .../SecondaryIndexRecordGenerationUtils.java       |  94 +++++++++---
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |  10 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   2 +
 .../functional/TestSecondaryIndexPruning.scala     | 164 ++++++++++++++++++++-
 5 files changed, 243 insertions(+), 34 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index cf6990635291..3975da6bdac9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1531,12 +1531,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       return engineContext.emptyHoodieData();
     }
     HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
-    // Load file system view for only the affected partitions on the driver.
-    // By loading on the driver one time, we avoid loading the same metadata 
multiple times on the executors.
-    HoodieTableFileSystemView fsView = getMetadataView();
-    fsView.loadPartitions(new 
ArrayList<>(commitMetadata.getWritePartitionPaths()));
-    return convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
instantTime, indexDefinition, dataWriteConfig.getMetadataConfig(),
-        fsView, dataMetaClient, engineContext, dataWriteConfig.getProps());
+    return convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
instantTime, indexDefinition, dataWriteConfig.getMetadataConfig(), 
dataMetaClient, engineContext, dataWriteConfig);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
similarity index 75%
rename from 
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index bde693f82eac..2a6599244e05 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -23,28 +23,35 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.engine.ReaderContextFactory;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import org.apache.avro.Schema;
 
@@ -57,12 +64,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createSecondaryIndexRecord;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.filePath;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.tryResolveSchemaForTable;
 
 /**
@@ -80,7 +87,7 @@ public class SecondaryIndexRecordGenerationUtils {
    * @param fsView          file system view as of instant time
    * @param dataMetaClient  data table meta client
    * @param engineContext   engine context
-   * @param props           the writer properties
+   * @param writeConfig     hoodie write config.
    * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the 
metadata table for the given secondary index partition
    */
   @VisibleForTesting
@@ -88,10 +95,11 @@ public class SecondaryIndexRecordGenerationUtils {
                                                                                
       String instantTime,
                                                                                
       HoodieIndexDefinition indexDefinition,
                                                                                
       HoodieMetadataConfig metadataConfig,
-                                                                               
       HoodieTableFileSystemView fsView,
                                                                                
       HoodieTableMetaClient dataMetaClient,
                                                                                
       HoodieEngineContext engineContext,
-                                                                               
       TypedProperties props) {
+                                                                               
       HoodieWriteConfig writeConfig
+                                                                               
       ) {
+    TypedProperties props = writeConfig.getProps();
     // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
     if (allWriteStats.stream().anyMatch(writeStat -> {
       String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
@@ -114,21 +122,60 @@ public class SecondaryIndexRecordGenerationUtils {
       String fileId = writeStatsByFileIdEntry.getKey();
       List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
       String partition = writeStats.get(0).getPartitionPath();
-      FileSlice previousFileSliceForFileId = 
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+      StoragePath basePath = dataMetaClient.getBasePath();
+
+      // validate that for a given fileId, either we have 1 parquet file or N 
log files.
+      AtomicInteger totalParquetFiles = new AtomicInteger();
+      AtomicInteger totalLogFiles = new AtomicInteger();
+      writeStats.stream().forEach(writeStat -> {
+        if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath()))) 
{
+          totalLogFiles.getAndIncrement();
+        } else {
+          totalParquetFiles.getAndIncrement();
+        }
+      });
+
+      ValidationUtils.checkArgument(!(totalParquetFiles.get() > 0 && 
totalLogFiles.get() > 0), "Only either of base file or log files are expected 
for a given file group. "
+          + "Partition " + partition + ", fileId " + fileId);
+      if (totalParquetFiles.get() > 0) {
+        // we should expect only 1 parquet file
+        ValidationUtils.checkArgument(writeStats.size() == 1, "Only one new 
parquet file expected per file group per commit");
+      }
+      // Instantiate Remote table FSV
+      TableFileSystemView.SliceView sliceView = getSliceView(writeConfig,  
dataMetaClient);
+      Option<FileSlice> fileSliceOption = 
sliceView.getLatestMergedFileSliceBeforeOrOn(partition, instantTime, fileId);
       Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
-      if (previousFileSliceForFileId == null) {
-        // new file slice, so empty mapping for previous slice
-        recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
-      } else {
+      Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice;
+      if (fileSliceOption.isPresent()) { // if previous file slice is present.
         recordKeyToSecondaryKeyForPreviousFileSlice =
-            getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), previousFileSliceForFileId, tableSchema, 
indexDefinition, instantTime, props, false);
-      }
-      List<FileSlice> latestIncludingInflightFileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), 
partition);
-      FileSlice currentFileSliceForFileId = 
latestIncludingInflightFileSlices.stream().filter(fs -> 
fs.getFileId().equals(fileId)).findFirst()
-          .orElseThrow(() -> new HoodieException("Could not find any file 
slice for fileId " + fileId));
-      Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
-          getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema, 
indexDefinition, instantTime, props, true);
-      // Need to find what secondary index record should be deleted, and what 
should be inserted.
+            getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), fileSliceOption.get(), tableSchema, 
indexDefinition, instantTime, props, false);
+        // branch out based on whether new parquet file is added or log files 
are added.
+        if (totalParquetFiles.get() > 0) { // new base file/file slice is 
created in current commit.
+          FileSlice currentFileSliceForFileId = new FileSlice(partition, 
instantTime, fileId);
+          HoodieWriteStat stat = writeStats.get(0);
+          StoragePathInfo baseFilePathInfo = new StoragePathInfo(new 
StoragePath(basePath, stat.getPath()), stat.getFileSizeInBytes(), false, 
(short) 0, 0, 0);
+          currentFileSliceForFileId.setBaseFile(new 
HoodieBaseFile(baseFilePathInfo));
+          recordKeyToSecondaryKeyForCurrentFileSlice =
+              getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema, 
indexDefinition, instantTime, props, true);
+        } else { // log files are added in current commit
+          // add new log files to existing latest file slice and compute the 
secondary index to primary key mapping.
+          FileSlice latestFileSlice = fileSliceOption.get();
+          writeStats.stream().forEach(writeStat -> {
+            StoragePathInfo logFile = new StoragePathInfo(new 
StoragePath(basePath, writeStat.getPath()), writeStat.getFileSizeInBytes(), 
false, (short) 0, 0, 0);
+            latestFileSlice.addLogFile(new HoodieLogFile(logFile));
+          });
+          recordKeyToSecondaryKeyForCurrentFileSlice =
+              getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), latestFileSlice, tableSchema, 
indexDefinition, instantTime, props, true);
+        }
+      } else { // new file group
+        recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap(); 
// previous slice is empty.
+        FileSlice currentFileSliceForFileId = new FileSlice(partition, 
instantTime, fileId);
+        HoodieWriteStat stat = writeStats.get(0);
+        StoragePathInfo baseFilePathInfo = new StoragePathInfo(new 
StoragePath(basePath, stat.getPath()), stat.getFileSizeInBytes(), false, 
(short) 0, 0, 0);
+        currentFileSliceForFileId.setBaseFile(new 
HoodieBaseFile(baseFilePathInfo));
+        recordKeyToSecondaryKeyForCurrentFileSlice =
+            getRecordKeyToSecondaryKey(dataMetaClient, 
readerContextFactory.getContext(), currentFileSliceForFileId, tableSchema, 
indexDefinition, instantTime, props, true);
+      }   // Need to find what secondary index record should be deleted, and 
what should be inserted.
       // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is not present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should 
be inserted.
       // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should be 
updated.
       // For each entry in recordKeyToSecondaryKeyForPreviousFileSlice, if it 
is not present in recordKeyToSecondaryKeyForCurrentFileSlice, then it should be 
deleted.
@@ -154,6 +201,17 @@ public class SecondaryIndexRecordGenerationUtils {
     });
   }
 
+  private static TableFileSystemView.SliceView getSliceView(HoodieWriteConfig 
config, HoodieTableMetaClient dataMetaClient) {
+    HoodieEngineContext context = new 
HoodieLocalEngineContext(dataMetaClient.getStorageConf());
+    FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), 
config.getViewStorageConfig(),
+        config.getCommonConfig(), unused -> getMetadataTable(config, 
dataMetaClient, context));
+    return viewManager.getFileSystemView(dataMetaClient);
+  }
+
+  private static HoodieTableMetadata getMetadataTable(HoodieWriteConfig 
config, HoodieTableMetaClient metaClient, HoodieEngineContext context) {
+    return metaClient.getTableFormat().getMetadataFactory().create(context, 
metaClient.getStorage(), config.getMetadataConfig(), config.getBasePath());
+  }
+
   public static <T> Map<String, String> 
getRecordKeyToSecondaryKey(HoodieTableMetaClient metaClient,
                                                                     
HoodieReaderContext<T> readerContext,
                                                                     FileSlice 
fileSlice,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 0dfda13075d7..0bf079b34279 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -372,7 +372,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       metadataView = new HoodieTableFileSystemView(metadata, metaClient, 
metaClient.getActiveTimeline());
       List<HoodieWriteStat> allWriteStats = 
writeStatusList2.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       secondaryIndexRecords =
-          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
secondCommitTime, indexDefinition, metadataConfig, metadataView, metaClient, 
engineContext, writeConfig.getProps()).collectAsList();
+          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
secondCommitTime, indexDefinition, metadataConfig, metaClient, engineContext, 
writeConfig).collectAsList();
       client.commit(secondCommitTime, jsc.parallelize(writeStatusList2));
 
       // There should be 3 SI records:
@@ -406,7 +406,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       metadataView = new HoodieTableFileSystemView(metadata, metaClient, 
metaClient.getActiveTimeline());
       allWriteStats = 
writeStatusList3.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       secondaryIndexRecords =
-          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
thirdCommitTime, indexDefinition, metadataConfig, metadataView, metaClient, 
engineContext, writeConfig.getProps()).collectAsList();
+          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
thirdCommitTime, indexDefinition, metadataConfig, metaClient, engineContext, 
writeConfig).collectAsList();
       client.commit(thirdCommitTime, jsc.parallelize(writeStatusList3));
 
       // There should be 1 SI records: 1 delete due to deletes3
@@ -437,7 +437,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       metadataView = new HoodieTableFileSystemView(metadata, metaClient, 
metaClient.getActiveTimeline());
       allWriteStats = 
writeStatusList4.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       secondaryIndexRecords =
-          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
fourthCommitTime, indexDefinition, metadataConfig, metadataView, metaClient, 
engineContext, writeConfig.getProps()).collectAsList();
+          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
fourthCommitTime, indexDefinition, metadataConfig, metaClient, engineContext, 
writeConfig).collectAsList();
       client.commit(fourthCommitTime, jsc.parallelize(writeStatusList4));
 
       // There should be 1 SI records: 1 insert due to inserts4
@@ -460,7 +460,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       metadataView = new HoodieTableFileSystemView(metadata, metaClient, 
metaClient.getActiveTimeline());
       allWriteStats = 
writeStatusList5.stream().map(WriteStatus::getStat).collect(Collectors.toList());
       secondaryIndexRecords =
-          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
fifthCommitTime, indexDefinition, metadataConfig, metadataView, metaClient, 
engineContext, writeConfig.getProps()).collectAsList();
+          convertWriteStatsToSecondaryIndexRecords(allWriteStats, 
fifthCommitTime, indexDefinition, metadataConfig, metaClient, engineContext, 
writeConfig).collectAsList();
       client.commit(fifthCommitTime, jsc.parallelize(writeStatusList5));
 
       // There should be 0 SI records because the secondary key field "rider" 
value has not changed.
@@ -479,7 +479,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       metadataView = new HoodieTableFileSystemView(metadata, metaClient, 
metaClient.getActiveTimeline());
       allWriteStats = compactionCommitMetadata.getWriteStats();
       secondaryIndexRecords = convertWriteStatsToSecondaryIndexRecords(
-          allWriteStats, compactionInstantOpt.get(), indexDefinition, 
metadataConfig, metadataView, metaClient, engineContext, 
writeConfig.getProps()).collectAsList();
+          allWriteStats, compactionInstantOpt.get(), indexDefinition, 
metadataConfig, metaClient, engineContext, writeConfig).collectAsList();
       // Get valid and deleted secondary index records
       List<HoodieRecord> validSecondaryIndexRecords3 = new ArrayList<>();
       List<HoodieRecord> deletedSecondaryIndexRecords3 = new ArrayList<>();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 873803018939..c9f2dd5cd433 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -763,6 +763,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option("hoodie.write.merge.handle.class", 
"org.apache.hudi.io.FileGroupReaderBasedMergeHandle")
+      .option("hoodie.index.type","SIMPLE")
+      .option("hoodie.metadata.enable","false")
       .mode(SaveMode.Append)
       .save(basePath)
     val metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 6e3670203cb6..18cbb29163bb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -33,7 +33,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config._
 import org.apache.hudi.exception.{HoodieMetadataIndexException, 
HoodieWriteConflictException}
-import 
org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
+import 
org.apache.hudi.functional.TestSecondaryIndexPruning.{SecondaryIndexStreamingWritesTestCase,
 SecondaryIndexTestCase}
 import org.apache.hudi.metadata._
 import 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
 import org.apache.hudi.storage.StoragePath
@@ -49,7 +49,7 @@ import org.apache.spark.sql.types.StringType
 import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
 import org.junit.jupiter.params.provider.Arguments.arguments
 import org.scalatest.Assertions.{assertResult, assertThrows}
 
@@ -278,10 +278,11 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
   }
 
   @ParameterizedTest
-  @MethodSource(Array("testSecondaryIndexPruningParameters"))
-  def testSecondaryIndexPruningWithUpdates(testCase: SecondaryIndexTestCase): 
Unit = {
+  @MethodSource(Array("testSecondaryIndexPruningStreamingParameters"))
+  def testSecondaryIndexPruningWithUpdates(testCase: 
SecondaryIndexStreamingWritesTestCase): Unit = {
     val tableType = testCase.tableType
     val isPartitioned = testCase.isPartitioned
+    val isStreamingWritesEnabled = testCase.isStreamingWrites
     var hudiOpts = commonOpts
     hudiOpts = hudiOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
@@ -300,11 +301,13 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
          |) using hudi
          | options (
          |  primaryKey ='record_key_col',
+         |  type = '$sqlTableType',
          |  hoodie.metadata.enable = 'true',
          |  hoodie.metadata.record.index.enable = 'true',
          |  hoodie.datasource.write.recordkey.field = 'record_key_col',
          |  hoodie.enable.data.skipping = 'true',
-         |  hoodie.datasource.write.payload.class = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+         |  hoodie.datasource.write.payload.class = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+         |  hoodie.metadata.streaming.write.enabled = 
'$isStreamingWritesEnabled'
          | )
          | $partitionedByClause
          | location '$basePath'
@@ -323,6 +326,9 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       .setBasePath(basePath)
       .setConf(HoodieTestUtils.getDefaultStorageConf)
       .build()
+
+    assertEquals(metaClient.getTableConfig.getTableType.name(), tableType)
+
     // validate the secondary index records themselves
     checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
       Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
@@ -351,6 +357,145 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       Seq(1, "row1", "xyz", "p1")
     )
     verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+
+    // update the secondary key column
+    spark.sql(s"update $tableName set not_record_key_col = 'xyz2' where 
record_key_col = 'row1'")
+    // validate the secondary index records themselves
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"xyz2${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+    )
+    // validate data and data skipping
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+      Seq(1, "row1", "xyz2", "p1")
+    )
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testSecondaryIndexWithAsyncCompaction(isStreamingWritesEnabled: 
Boolean): Unit = {
+    val tableType = "MERGE_ON_READ"
+    val isPartitioned = true
+    var hudiOpts = commonOpts
+    hudiOpts = hudiOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+    val sqlTableType = if 
(tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor"
+    tableName += "test_async_compaction" + (if (isPartitioned) "_partitioned" 
else "") + sqlTableType + isStreamingWritesEnabled
+    val partitionedByClause = if (isPartitioned) "partitioned 
by(partition_key_col)" else ""
+
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  ts bigint,
+         |  record_key_col string,
+         |  not_record_key_col string,
+         |  partition_key_col string
+         |) using hudi
+         | options (
+         |  primaryKey ='record_key_col',
+         |  type = '$sqlTableType',
+         |  hoodie.metadata.enable = 'true',
+         |  hoodie.metadata.record.index.enable = 'true',
+         |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+         |  hoodie.enable.data.skipping = 'true',
+         |  hoodie.datasource.write.payload.class = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+         |  hoodie.metadata.streaming.write.enabled = 
'$isStreamingWritesEnabled'
+         | )
+         | $partitionedByClause
+         | location '$basePath'
+       """.stripMargin)
+    // by setting small file limit to 0, each insert will create a new file
+    // need to generate more file for non-partitioned table to test data 
skipping
+    // as the partitioned table will have only one file per partition
+    spark.sql("set hoodie.parquet.small.file.limit=0")
+    spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+    spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
+    spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
+    // create secondary index
+    spark.sql(s"create index idx_not_record_key_col on $tableName 
(not_record_key_col)")
+    // validate index created successfully
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+
+    assertEquals(metaClient.getTableConfig.getTableType.name(), tableType)
+
+    // validate the secondary index records themselves
+    checkAnswer(s"select key from hudi_metadata('$basePath') where type=7")(
+      Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1"),
+      Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2"),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3")
+    )
+    // validate data skipping with filters on secondary key column
+    spark.sql("set hoodie.metadata.enable=true")
+    spark.sql("set hoodie.enable.data.skipping=true")
+    spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'abc'")(
+      Seq(1, "row1", "abc", "p1")
+    )
+    verifyQueryPredicate(hudiOpts, "not_record_key_col")
+
+    // update the secondary key column
+    spark.sql(s"update $tableName set not_record_key_col = 'xyz' where 
record_key_col = 'row1'")
+    // validate the secondary index records themselves
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+    )
+    // validate data and data skipping
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+      Seq(1, "row1", "xyz", "p1")
+    )
+    verifyQueryPredicate(hudiOpts, "not_record_key_col", "abc")
+
+    // Schedule compaction (inflight)
+    spark.sql(s"refresh table $tableName")
+    spark.sql("set hoodie.compact.inline=false")
+    spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+    spark.sql(s"schedule compaction on $tableName")
+    val compactionRows = spark.sql(s"show compaction on $tableName").collect()
+    val compactionInstant = compactionRows(0).getString(0)
+    assertTrue(compactionRows.length == 1)
+
+    // update the secondary key column
+    spark.sql(s"update $tableName set not_record_key_col = 'xyz3' where 
record_key_col = 'row1'")
+    // validate the secondary index records themselves
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"xyz3${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+    )
+    // validate data and data skipping
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+      Seq(1, "row1", "xyz3", "p1")
+    )
+
+    // Complete compaction
+    spark.sql(s"run compaction on $tableName at $compactionInstant")
+    spark.sql(s"refresh table $tableName")
+
+    // Verify compaction
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+    
assertTrue(metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants.lastInstant.isPresent)
+
+    // validate the secondary index records themselves
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"xyz3${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
+    )
+    // validate data and data skipping
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where record_key_col = 'row1'")(
+      Seq(1, "row1", "xyz3", "p1")
+    )
+
   }
 
   @ParameterizedTest
@@ -1703,4 +1848,13 @@ object TestSecondaryIndexPruning {
       arguments(SecondaryIndexTestCase("MERGE_ON_READ", isPartitioned = false))
     )
   }
+
+  case class SecondaryIndexStreamingWritesTestCase(tableType: String, 
isPartitioned: Boolean, isStreamingWrites: Boolean)
+
+  def testSecondaryIndexPruningStreamingParameters(): 
java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      arguments(SecondaryIndexStreamingWritesTestCase("COPY_ON_WRITE", 
isPartitioned = true, isStreamingWrites = true)),
+      arguments(SecondaryIndexStreamingWritesTestCase("MERGE_ON_READ", 
isPartitioned = true, isStreamingWrites = true))
+    )
+  }
 }

Reply via email to