nsivabalan commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2153513693


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      // generate Secondary index stats if streaming is enabled.
+      if (!isSecondaryIndexStreamingDisabled()) {
+        // Adds secondary index only for the last log file write status. We do 
not need to add secondary index stats
+        // for every log file written as part of the append handle write. The 
last write status would update the
+        // secondary index considering all the log files.
+        
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
 statuses.stream().map(status -> 
status.getStat().getPath()).collect(Collectors.toList()),
+            statuses.get(statuses.size() - 1));
+      }
+
       return statuses;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
     }
   }
 
+  private void 
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice> 
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+    // TODO: Optimise the computation for multiple secondary indexes

Review Comment:
   can you file a jira and add to docs here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java:
##########
@@ -36,11 +36,17 @@ public class SingleFileHandleCreateFactory<T, I, K, O> 
extends CreateHandleFacto
   private final AtomicBoolean isHandleCreated = new AtomicBoolean(false);
   private final String fileId;
   private final boolean preserveHoodieMetadata;
+  private final boolean isSecondaryIndexStreamingDisabled;
 
   public SingleFileHandleCreateFactory(String fileId, boolean 
preserveHoodieMetadata) {
+    this(fileId, preserveHoodieMetadata, false);
+  }
+
+  public SingleFileHandleCreateFactory(String fileId, boolean 
preserveHoodieMetadata, boolean isSecondaryIndexStreamingDisabled) {

Review Comment:
   left comment above. 
   we should try to make this generic. 
   just passing in operationType to write handle constructor might be better 
idea



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieMergeHandle}.
+ */
+public class TestMergeHandle extends BaseTestHandle {
+
+  @Test
+  public void testMergeHandleRLIStats() throws IOException {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> updates = 
dataGenerator.generateUniqueUpdates(instantTime, 10);
+    HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime, 
table, updates.iterator(), partitionPath, fileId, 
table.getTaskContextSupplier(),
+        new HoodieBaseFile(writeStatus.getStat().getPath()), Option.of(new 
KeyGeneratorForDataGeneratorRecords(config.getProps())));
+    HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
+    writeStatus = mergeHandle.writeStatus;
+    // verify stats after merge
+    assertEquals(records.size(), writeStatus.getStat().getNumWrites());
+    assertEquals(10, writeStatus.getStat().getNumUpdateWrites());
+  }
+
+  @Test
+  public void testMergeHandleSecondaryIndexStatsWithUpdates() throws Exception 
{
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> updates = 
dataGenerator.generateUniqueUpdates(instantTime, 10);
+    HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime, 
table, updates.iterator(), partitionPath, fileId, new 
LocalTaskContextSupplier(),
+        new HoodieBaseFile(writeStatus.getStat().getPath()), Option.of(new 
KeyGeneratorForDataGeneratorRecords(config.getProps())));
+    HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
+    writeStatus = mergeHandle.writeStatus;
+    // verify stats after merge
+    assertEquals(records.size(), writeStatus.getStat().getNumWrites());
+    assertEquals(10, writeStatus.getStat().getNumUpdateWrites());
+    // verify secondary index stats
+    assertEquals(1, 
writeStatus.getIndexStats().getSecondaryIndexStats().size());
+    // 10 si records for old secondary keys and 10 for new secondary keys
+    assertEquals(20, 
writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get().size());
+
+    // Validate the secondary index stats returned
+    Map<String, String> deletedRecordAndSecondaryKeys = new HashMap<>();
+    Map<String, String> newRecordAndSecondaryKeys = new HashMap<>();
+    for (SecondaryIndexStats stat : 
writeStatus.getIndexStats().getSecondaryIndexStats().values().stream().findFirst().get())
 {
+      // verify si stat marks record as not deleted
+      if (stat.isDeleted()) {
+        deletedRecordAndSecondaryKeys.put(stat.getRecordKey(), 
stat.getSecondaryKeyValue());
+      } else {
+        newRecordAndSecondaryKeys.put(stat.getRecordKey(), 
stat.getSecondaryKeyValue());
+      }
+      // verify the record key and secondary key is present
+      assertTrue(StringUtils.nonEmpty(stat.getRecordKey()));
+      assertTrue(StringUtils.nonEmpty(stat.getSecondaryKeyValue()));
+    }
+
+    // Ensure that all record keys are unique and match the initial update size
+    // There should be 10 delete and 10 new secondary index records
+    assertEquals(10, deletedRecordAndSecondaryKeys.size());
+    assertEquals(10, newRecordAndSecondaryKeys.size());
+    assertEquals(deletedRecordAndSecondaryKeys.keySet(), 
newRecordAndSecondaryKeys.keySet());
+    for (String recordKey : deletedRecordAndSecondaryKeys.keySet()) {
+      // verify secondary key for deleted and new secondary index records is 
different
+      assertNotEquals(deletedRecordAndSecondaryKeys.get(recordKey), 
newRecordAndSecondaryKeys.get(recordKey));
+    }
+  }
+
+  @Test
+  public void testMergeHandleSecondaryIndexStatsWithDeletes() throws Exception 
{
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> deletes = 
dataGenerator.generateUniqueDeleteRecords(instantTime, 10);

Review Comment:
   ok, I see it here. 
   but why diff tests. why can't we have just 1 test w/ all possible cases. 10 
records for each flavor. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      // generate Secondary index stats if streaming is enabled.
+      if (!isSecondaryIndexStreamingDisabled()) {
+        // Adds secondary index only for the last log file write status. We do 
not need to add secondary index stats
+        // for every log file written as part of the append handle write. The 
last write status would update the
+        // secondary index considering all the log files.
+        
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
 statuses.stream().map(status -> 
status.getStat().getPath()).collect(Collectors.toList()),

Review Comment:
   yes, its a fair assumption. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      // generate Secondary index stats if streaming is enabled.
+      if (!isSecondaryIndexStreamingDisabled()) {
+        // Adds secondary index only for the last log file write status. We do 
not need to add secondary index stats
+        // for every log file written as part of the append handle write. The 
last write status would update the
+        // secondary index considering all the log files.
+        
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
 statuses.stream().map(status -> 
status.getStat().getPath()).collect(Collectors.toList()),
+            statuses.get(statuses.size() - 1));
+      }
+
       return statuses;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
     }
   }
 
+  private void 
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice> 
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+    // TODO: Optimise the computation for multiple secondary indexes
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(hoodieTable.getStorageConf(), taskContextSupplier);
+    HoodieReaderContext readerContext = 
engineContext.getReaderContextFactory(hoodieTable.getMetaClient()).getContext();
+
+    secondaryIndexDefns.forEach(secondaryIndexDefnPair -> {
+      // fetch primary key -> secondary index for prev file slice.
+      Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice = 
fileSliceOpt.map(fileSlice -> {
+        try {
+          return 
SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(hoodieTable.getMetaClient(),
 readerContext, fileSlice, writeSchemaWithMetaFields,
+              secondaryIndexDefnPair.getValue(), instantTime, 
config.getProps(), false);
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to generate secondary index 
stats ", e);
+        }
+      }).orElse(Collections.emptyMap());
+
+      // fetch primary key -> secondary index for latest file slice including 
inflight.

Review Comment:
   we need updates as well right(for eg, rk1 -> val1 in base file, rk1 -> val2  
in log file1, rk1 -> val3 in log file2. In this scenario, primary key to 
secondary index value could be 3 diff values everytime). and we need to account 
for any merge mode/custom merger flows as well.
   
   so high level, we are taking previous version of file slice to find primary 
key -> sec key value mapping. 
   and we find similar mapping including new log files for the file slice of 
interest. 
   
   if we really wanted to, we can make some optimizations for commit time based 
merge mode. otherwise, not sure if we can do much here. 
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      // generate Secondary index stats if streaming is enabled.
+      if (!isSecondaryIndexStreamingDisabled()) {
+        // Adds secondary index only for the last log file write status. We do 
not need to add secondary index stats
+        // for every log file written as part of the append handle write. The 
last write status would update the
+        // secondary index considering all the log files.
+        
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
 statuses.stream().map(status -> 
status.getStat().getPath()).collect(Collectors.toList()),
+            statuses.get(statuses.size() - 1));
+      }
+
       return statuses;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
     }
   }
 
+  private void 
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice> 
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+    // TODO: Optimise the computation for multiple secondary indexes
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(hoodieTable.getStorageConf(), taskContextSupplier);
+    HoodieReaderContext readerContext = 
engineContext.getReaderContextFactory(hoodieTable.getMetaClient()).getContext();
+
+    secondaryIndexDefns.forEach(secondaryIndexDefnPair -> {
+      // fetch primary key -> secondary index for prev file slice.

Review Comment:
   it might involve doing a full table scan. here we don't know the sec index 
values. So, we might end up reading entire sec index partition from MDT and 
filter for this file id of interest. And every spark task might need to do 
this. 
   comparatively cur approach we take in this patch seems simple and direct and 
involves less I/O. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java:
##########
@@ -27,13 +27,19 @@
 public class CreateHandleFactory<T, I, K, O> extends WriteHandleFactory<T, I, 
K, O> implements Serializable {
 
   private boolean preserveMetadata = false;
+  private final boolean isSecondaryIndexStreamingDisabled;

Review Comment:
   lets see if we can make this generic. 
   for eg, we might add more mdt partitions from batch to streaming mode in 
future. 
   So, lets ensure any changes we do here is future proof or bit generic which 
does not need additional fixes when we add new mdt indexes to streaming flow.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestAppendHandle.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieAppendHandle}.
+ */
+public class TestAppendHandle extends BaseTestHandle {
+
+  @Test
+  public void testAppendHandleRLIStats() {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    // create parquet file
+    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    // generate update records
+    instantTime = "001";
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime, 
table, partitionPath, fileId, records.iterator(), new 
LocalTaskContextSupplier());
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    // write the update records
+    handle.write(recordMap);
+    WriteStatus writeStatus = handle.writeStatus;
+    handle.close();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates
+    assertEquals(records.size(), 
writeStatus.getIndexStats().getWrittenRecordDelegates().size());
+    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
+      assertTrue(recordDelegate.getNewLocation().isPresent());
+      assertEquals(fileId, recordDelegate.getNewLocation().get().getFileId());
+      assertEquals(instantTime, 
recordDelegate.getNewLocation().get().getInstantTime());
+    }
+  }
+
+  @Test
+  public void testAppendHandleSecondaryIndexStats() throws Exception {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    // create parquet file
+    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    // generate update records
+    instantTime = "001";
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime, 
table, partitionPath, fileId, records.iterator(), new 
LocalTaskContextSupplier());
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    // write the update records

Review Comment:
   are we generating delete records as well?
   



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestCreateHandle.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieCreateHandle}.
+ */
+public class TestCreateHandle extends BaseTestHandle {
+
+  @Test
+  public void testCreateHandleRLIStats() {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates
+    assertEquals(records.size(), 
writeStatus.getIndexStats().getWrittenRecordDelegates().size());
+    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
+      assertTrue(recordDelegate.getNewLocation().isPresent());
+      assertEquals(fileId, recordDelegate.getNewLocation().get().getFileId());
+      assertEquals(instantTime, 
recordDelegate.getNewLocation().get().getInstantTime());
+    }
+  }
+
+  @Test
+  public void testCreateHandleSecondaryIndexStats() throws Exception {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates

Review Comment:
   same comment as above 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java:
##########
@@ -214,22 +214,11 @@ public void testRemoveMetadataStats() {
     stats.put("field1", HoodieColumnRangeMetadata.<Comparable>create("f1", 
"field1", 1, 2, 0, 2, 5, 10));
     status.setStat(new HoodieWriteStat());
     status.getStat().putRecordsStats(stats);
-    assertEquals(1, status.getWrittenRecordDelegates().size());
+    assertEquals(1, status.getIndexStats().getWrittenRecordDelegates().size());
     assertEquals(1, status.getStat().getColumnStats().get().size());
 
     // Remove metadata stats
     status.removeMetadataStats();
-    assertEquals(0, status.getWrittenRecordDelegates().size());
-  }
-
-  @Test
-  public void testDropErrorRecords() {

Review Comment:
   the api is not used anymore. responded earlier as well



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -106,6 +114,31 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
     this.recordMerger = config.getRecordMerger();
     this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
         hoodieTable.shouldTrackSuccessRecords(), 
config.getWriteStatusFailureFraction(), hoodieTable.isMetadataTable());
+    this.isStreamingWriteToMetadataEnabled = 
config.isMetadataStreamingWritesEnabled(hoodieTable.getMetaClient().getTableConfig().getTableVersion());
+    initMetadataPartitionsToCollectStats();
+  }
+
+  private void initMetadataPartitionsToCollectStats() {
+    if (isStreamingWriteToMetadataEnabled) {
+      // secondary index.
+      if (config.isSecondaryIndexEnabled()) {
+        ValidationUtils.checkArgument(recordMerger.getRecordType() == 
HoodieRecord.HoodieRecordType.AVRO,
+            "Only Avro record type is supported for streaming writes to 
metadata table with write handles");
+        secondaryIndexDefns = 
hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
+            .stream()
+            .filter(mdtPartition -> 
mdtPartition.startsWith(PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+            .map(mdtPartitionPath -> Pair.of(mdtPartitionPath, 
HoodieTableMetadataUtil.getHoodieIndexDefinition(mdtPartitionPath, 
hoodieTable.getMetaClient())))
+            .collect(Collectors.toList());
+        secondaryIndexDefns.forEach(pair -> 
writeStatus.getIndexStats().instantiateSecondaryIndexStatsForIndex(pair.getKey()));
+      }
+    }
+  }
+
+  /**
+   * Returns true if secondary index streaming is disabled for the table.
+   */
+  boolean isSecondaryIndexStreamingDisabled() {

Review Comment:
   can we convert this to something named 
   `streamingMetadataPartitionsEnabled` and return Set<MetadataPartitionType>. 
   and let each write handle impl account for generating stats for diff mdt 
partitions. 
   



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestAppendHandle.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieAppendHandle}.
+ */
+public class TestAppendHandle extends BaseTestHandle {
+
+  @Test
+  public void testAppendHandleRLIStats() {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    // create parquet file
+    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    // generate update records
+    instantTime = "001";
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime, 
table, partitionPath, fileId, records.iterator(), new 
LocalTaskContextSupplier());
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    // write the update records
+    handle.write(recordMap);
+    WriteStatus writeStatus = handle.writeStatus;
+    handle.close();
+
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+    // validate write status has all record delegates
+    assertEquals(records.size(), 
writeStatus.getIndexStats().getWrittenRecordDelegates().size());
+    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
+      assertTrue(recordDelegate.getNewLocation().isPresent());
+      assertEquals(fileId, recordDelegate.getNewLocation().get().getFileId());
+      assertEquals(instantTime, 
recordDelegate.getNewLocation().get().getInstantTime());
+    }
+  }
+
+  @Test
+  public void testAppendHandleSecondaryIndexStats() throws Exception {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        .build();
+
+    HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    // create parquet file
+    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    // generate update records
+    instantTime = "001";
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieSparkTable.create(config, context, metaClient);
+    HoodieAppendHandle handle = new HoodieAppendHandle(config, instantTime, 
table, partitionPath, fileId, records.iterator(), new 
LocalTaskContextSupplier());
+    Map<String, HoodieRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < records.size(); i++) {
+      recordMap.put(String.valueOf(i), records.get(i));
+    }
+    // write the update records

Review Comment:
   why do we need two diff tests. 
   can't we just have 1 test and validate both RLI and SI 



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkMetadataWriterFactory;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests {@link HoodieMergeHandle}.
+ */
+public class TestMergeHandle extends BaseTestHandle {
+
+  @Test
+  public void testMergeHandleRLIStats() throws IOException {
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> updates = 
dataGenerator.generateUniqueUpdates(instantTime, 10);
+    HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config, instantTime, 
table, updates.iterator(), partitionPath, fileId, 
table.getTaskContextSupplier(),
+        new HoodieBaseFile(writeStatus.getStat().getPath()), Option.of(new 
KeyGeneratorForDataGeneratorRecords(config.getProps())));
+    HoodieMergeHelper.newInstance().runMerge(table, mergeHandle);
+    writeStatus = mergeHandle.writeStatus;
+    // verify stats after merge
+    assertEquals(records.size(), writeStatus.getStat().getNumWrites());
+    assertEquals(10, writeStatus.getStat().getNumUpdateWrites());
+  }
+
+  @Test
+  public void testMergeHandleSecondaryIndexStatsWithUpdates() throws Exception 
{
+    // init config and table
+    HoodieWriteConfig config = getConfigBuilder(basePath)
+        .withPopulateMetaFields(false)
+        
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
+        
.withKeyGenerator(KeyGeneratorForDataGeneratorRecords.class.getCanonicalName())
+        .build();
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, new HoodieLocalEngineContext(storageConf), 
metaClient);
+    HoodieTableMetadataWriter metadataWriter = 
SparkMetadataWriterFactory.create(storageConf, config, context, 
table.getMetaClient().getTableConfig());
+    metadataWriter.close();
+
+    // one round per partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    // init some args
+    String fileId = UUID.randomUUID().toString();
+    String instantTime = "000";
+
+    // Create a parquet file
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
+
+    Pair<WriteStatus, List<HoodieRecord>> statusListPair = 
createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
+    WriteStatus writeStatus = statusListPair.getLeft();
+    List<HoodieRecord> records = statusListPair.getRight();
+    assertEquals(records.size(), writeStatus.getTotalRecords());
+    assertEquals(0, writeStatus.getTotalErrorRecords());
+
+    instantTime = "001";
+    List<HoodieRecord> updates = 
dataGenerator.generateUniqueUpdates(instantTime, 10);

Review Comment:
   can we add deletes as well. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -173,6 +186,23 @@ record = record.prependMetaFields(schema, 
writeSchemaWithMetaFields, new Metadat
     }
   }
 
+  private void trackMetadataIndexStats(HoodieRecord record) {
+    if (isSecondaryIndexStreamingDisabled()) {
+      return;
+    }
+
+    // Add secondary index records for all the inserted records
+    secondaryIndexDefns.forEach(secondaryIndexPartitionPathFieldPair -> {
+      String secondaryIndexSourceField = 
String.join(".",secondaryIndexPartitionPathFieldPair.getValue().getSourceFields());
+      if (record instanceof HoodieAvroIndexedRecord) {

Review Comment:
   once we land your other patch, we can incorporate. 
   just that based on our investigation, even in spark engine, only AVRO seems 
to have full fledge support on the writer side. we ran into issue while 
enabling SPARK merger on the write side for SPARK engine. 
   
   thats why we kept it this way. 
   and we do ensure only avro record type is enabled for writers. 
   
   check L 125 in HoodieWriteHandle 
   
   ```
    ValidationUtils.checkArgument(recordMerger.getRecordType() == 
HoodieRecord.HoodieRecordType.AVRO,
               "Only Avro record type is supported for streaming writes to 
metadata table with write handles"); 
   ```
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
         status.getStat().setFileSizeInBytes(logFileSize);
       }
 
+      // generate Secondary index stats if streaming is enabled.
+      if (!isSecondaryIndexStreamingDisabled()) {
+        // Adds secondary index only for the last log file write status. We do 
not need to add secondary index stats
+        // for every log file written as part of the append handle write. The 
last write status would update the
+        // secondary index considering all the log files.
+        
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
 statuses.stream().map(status -> 
status.getStat().getPath()).collect(Collectors.toList()),
+            statuses.get(statuses.size() - 1));
+      }
+
       return statuses;
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to close UpdateHandle", e);
     }
   }
 
+  private void 
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice> 
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+    // TODO: Optimise the computation for multiple secondary indexes

Review Comment:
   also, can we add java docs as to why we can't generate these stats in a 
streaming manner? 
   for instance, in CreateHandle and MergeHandle, we do the streaming way to 
generate SI stats. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java:
##########
@@ -27,13 +27,19 @@
 public class CreateHandleFactory<T, I, K, O> extends WriteHandleFactory<T, I, 
K, O> implements Serializable {
 
   private boolean preserveMetadata = false;
+  private final boolean isSecondaryIndexStreamingDisabled;

Review Comment:
   how about adding operation type as arg argument to Write handle constructor. 
   Then, most of these might be simplified right. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java:
##########
@@ -136,4 +136,9 @@ public void write() {
       throw new HoodieUpsertException("Failed to compact file slice: " + 
fileSlice, e);
     }
   }
+
+  @Override
+  boolean isSecondaryIndexStreamingDisabled() {
+    return true;

Review Comment:
   since clustering and compaction does not make any material changes to 
records, we skip updating SI. 
   
   
https://github.com/apache/hudi/blob/cb566b9785fc73a3076a5afabe8ebb38669712f9/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java#L1340
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to