This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b634cfc47ee0dadf5da8ae9bca42a739daccca39
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Oct 22 23:00:38 2025 +0800

    fix: Handle deletes and updates properly in secondary index (#14090)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  34 +++--
 .../hudi/metadata/MetadataIndexGenerator.java      | 132 -------------------
 .../apache/hudi/metadata/MetadataIndexMapper.java  |  66 ++++++++++
 .../apache/hudi/metadata/RecordIndexMapper.java    |  84 ++++++++++++
 .../apache/hudi/metadata/SecondaryIndexMapper.java |  68 ++++++++++
 .../SecondaryIndexRecordGenerationUtils.java       |  13 +-
 ...Generator.java => TestMetadataIndexMapper.java} |  13 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |   5 -
 .../JavaHoodieBackedTableMetadataWriter.java       |   5 -
 .../SparkHoodieBackedTableMetadataWriter.java      |   4 -
 ...ieBackedTableMetadataWriterTableVersionSix.java |   5 -
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  40 ++++--
 .../functional/TestSecondaryIndexPruning.scala     | 142 ++++++++++++++++++++-
 13 files changed, 424 insertions(+), 187 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5d172095affd..51d5a9016081 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -180,7 +180,6 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
   boolean initialized = false;
   private HoodieTableFileSystemView metadataView;
   private final boolean streamingWritesEnabled;
-  private final Option<MetadataIndexGenerator> metadataIndexGenerator;
 
   protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> 
storageConf,
                                             HoodieWriteConfig writeConfig,
@@ -225,18 +224,12 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
     }
     ValidationUtils.checkArgument(!initialized || this.metadata != null, "MDT 
Reader should have been opened post initialization");
     this.streamingWritesEnabled = streamingWritesEnabled;
-    this.metadataIndexGenerator = streamingWritesEnabled ? 
Option.of(initializeMetadataIndexGenerator()) : Option.empty();
   }
 
   List<MetadataPartitionType> getEnabledPartitions(HoodieMetadataConfig 
metadataConfig, HoodieTableMetaClient metaClient) {
     return MetadataPartitionType.getEnabledPartitions(metadataConfig, 
metaClient);
   }
 
-  /**
-   * Returns the utilities for metadata index generation.
-   */
-  abstract MetadataIndexGenerator initializeMetadataIndexGenerator();
-
   private void mayBeReinitMetadataReader() {
     if (metadata == null || metadataMetaClient == null || 
metadata.getMetadataFileSystemView() == null) {
       initMetadataReader();
@@ -1239,12 +1232,31 @@ public abstract class 
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
     }
 
     maybeInitializeNewFileGroupsForPartitionedRLI(writeStatus, instantTime);
-    HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
-        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag, 
dataWriteConfig));
+
+    Map<MetadataPartitionType, MetadataIndexMapper> indexMapperMap = 
mdtPartitionsToTag.stream()
+        .filter(e -> e.equals(RECORD_INDEX) || e.equals(SECONDARY_INDEX))
+        .collect(Collectors.toMap(
+            key -> key,
+            key -> {
+              if (RECORD_INDEX.equals(key)) {
+                return new RecordIndexMapper(dataWriteConfig);
+              }
+              return new SecondaryIndexMapper(dataWriteConfig);
+            }
+        ));
+
+    if (indexMapperMap.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    HoodieData<HoodieRecord> processedRecords = 
indexMapperMap.values().stream()
+        .map(indexMapper -> 
indexMapper.postProcess(writeStatus.flatMap(indexMapper)))
+        .reduce(HoodieData::union)
+        .get();
 
     // tag records w/ location
-    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-        mdtPartitionPathsToTag);
+    Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
+        tagRecordsWithLocationForStreamingWrites(processedRecords, 
mdtPartitionPathsToTag);
 
     // write partial writes to MDT table (for those partitions where streaming 
writes are enabled)
     HoodieData<WriteStatus> writeStatusCollection = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
deleted file mode 100644
index 87fbbec05a45..000000000000
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexGenerator.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.metadata;
-
-import org.apache.hudi.client.SecondaryIndexStats;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordDelegate;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
-import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
-
-/**
- * For now this is a placeholder to generate all MDT records in one place.
- * Once <a href="https://github.com/apache/hudi/pull/13226";>Refactor MDT 
update logic with Indexer</a> is landed,
- * we will leverage the new abstraction to generate MDT records.
- */
-public class MetadataIndexGenerator implements Serializable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(MetadataIndexGenerator.class);
-
-  /**
-   * MDT record transformation utility. This function is expected to be 
invoked from a map Partition call, where one spark task will receive
-   * one WriteStatus as input and the output contains prepared MDT table 
records for all eligible partitions that can operate on one
-   * WriteStatus instance only.
-   */
-  static class WriteStatusBasedMetadataIndexMapper implements 
SerializableFunction<WriteStatus, Iterator<HoodieRecord>> {
-    private final List<MetadataPartitionType> enabledPartitionTypes;
-    private final HoodieWriteConfig dataWriteConfig;
-
-    public WriteStatusBasedMetadataIndexMapper(List<MetadataPartitionType> 
enabledPartitionTypes, HoodieWriteConfig dataWriteConfig) {
-      this.enabledPartitionTypes = enabledPartitionTypes;
-      this.dataWriteConfig = dataWriteConfig;
-    }
-
-    @Override
-    public Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws 
Exception {
-      List<HoodieRecord> allRecords = new ArrayList<>();
-      if (enabledPartitionTypes.contains(RECORD_INDEX)) {
-        allRecords.addAll(processWriteStatusForRLI(writeStatus, 
dataWriteConfig));
-      }
-      if (enabledPartitionTypes.contains(SECONDARY_INDEX)) {
-        allRecords.addAll(processWriteStatusForSecondaryIndex(writeStatus));
-      }
-      // yet to add support for more partitions.
-      // bloom filter
-      // secondary index
-      // expression index.
-      return allRecords.iterator();
-    }
-  }
-
-  protected static List<HoodieRecord> processWriteStatusForRLI(WriteStatus 
writeStatus, HoodieWriteConfig dataWriteConfig) {
-    List<HoodieRecord> allRecords = new ArrayList<>();
-    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
-      if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
-        if (recordDelegate.getIgnoreIndexUpdate()) {
-          continue;
-        }
-        HoodieRecord hoodieRecord;
-        Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
-        if (newLocation.isPresent()) {
-          if (recordDelegate.getCurrentLocation().isPresent()) {
-            // This is an update, no need to update index if the location has 
not changed
-            // newLocation should have the same fileID as currentLocation. The 
instantTimes differ as newLocation's
-            // instantTime refers to the current commit which was completed.
-            if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
-              final String msg = String.format("Detected update in location of 
record with key %s from %s to %s. The fileID should not change.",
-                  recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
-              LOG.error(msg);
-              throw new HoodieMetadataException(msg);
-            }
-            // for updates, we can skip updating RLI partition in MDT
-          } else {
-            // Insert new record case
-            hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
-                recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
-                newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
-            allRecords.add(hoodieRecord);
-          }
-        } else {
-          // Delete existing index for a deleted record
-          hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(), 
dataWriteConfig.isPartitionedRecordIndexEnabled());
-          allRecords.add(hoodieRecord);
-        }
-      }
-    }
-    return allRecords;
-  }
-
-  protected static List<HoodieRecord> 
processWriteStatusForSecondaryIndex(WriteStatus writeStatus) {
-    List<HoodieRecord> secondaryIndexRecords = new 
ArrayList<>(writeStatus.getIndexStats().getSecondaryIndexStats().size());
-    for (Map.Entry<String, List<SecondaryIndexStats>> entry : 
writeStatus.getIndexStats().getSecondaryIndexStats().entrySet()) {
-      String indexPartitionName = entry.getKey();
-      List<SecondaryIndexStats> secondaryIndexStats = entry.getValue();
-      for (SecondaryIndexStats stat : secondaryIndexStats) {
-        
secondaryIndexRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(stat.getRecordKey(),
 stat.getSecondaryKeyValue(), indexPartitionName, stat.isDeleted()));
-      }
-    }
-    return secondaryIndexRecords;
-  }
-}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java
new file mode 100644
index 000000000000..0b453de2b475
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataIndexMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for metadata index mappers. Each index type can extend 
this to implement
+ * its own record generation logic.
+ */
+public abstract class MetadataIndexMapper implements 
SerializableFunction<WriteStatus, Iterator<HoodieRecord>>, Serializable {
+  protected final HoodieWriteConfig dataWriteConfig;
+
+  public MetadataIndexMapper(HoodieWriteConfig dataWriteConfig) {
+    this.dataWriteConfig = dataWriteConfig;
+  }
+
+  /**
+   * Generates metadata index records from a WriteStatus.
+   *
+   * @param writeStatus the write status to process
+   * @return list of metadata records
+   */
+  protected abstract List<HoodieRecord> generateRecords(WriteStatus 
writeStatus);
+
+  /**
+   * Post-processes the generated records. Default implementation returns 
records as-is.
+   * Subclasses can override to add deduplication, validation, or other 
transformations.
+   *
+   * @param records the generated records
+   * @return post-processed records
+   */
+  public HoodieData<HoodieRecord> postProcess(HoodieData<HoodieRecord> 
records) {
+    return records;
+  }
+
+  @Override
+  public final Iterator<HoodieRecord> apply(WriteStatus writeStatus) throws 
Exception {
+    return generateRecords(writeStatus).iterator();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
new file mode 100644
index 000000000000..13029bca3df9
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/RecordIndexMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Mapper for Record Level Index (RLI).
+ */
+public class RecordIndexMapper extends MetadataIndexMapper {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecordIndexMapper.class);
+
+  public RecordIndexMapper(HoodieWriteConfig dataWriteConfig) {
+    super(dataWriteConfig);
+  }
+
+  @Override
+  protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) {
+    List<HoodieRecord> allRecords = new ArrayList<>();
+    for (HoodieRecordDelegate recordDelegate : 
writeStatus.getIndexStats().getWrittenRecordDelegates()) {
+      if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+        if (recordDelegate.getIgnoreIndexUpdate()) {
+          continue;
+        }
+        HoodieRecord hoodieRecord;
+        Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
+        if (newLocation.isPresent()) {
+          if (recordDelegate.getCurrentLocation().isPresent()) {
+            // This is an update, no need to update index if the location has 
not changed
+            // newLocation should have the same fileID as currentLocation. The 
instantTimes differ as newLocation's
+            // instantTime refers to the current commit which was completed.
+            if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
+              final String msg = String.format("Detected update in location of 
record with key %s from %s to %s. The fileID should not change.",
+                  recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
+              LOG.error(msg);
+              throw new HoodieMetadataException(msg);
+            }
+            // for updates, we can skip updating RLI partition in MDT
+          } else {
+            // Insert new record case
+            hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
+                recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
+                newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+            allRecords.add(hoodieRecord);
+          }
+        } else {
+          // Delete existing index for a deleted record
+          hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(), 
dataWriteConfig.isPartitionedRecordIndexEnabled());
+          allRecords.add(hoodieRecord);
+        }
+      }
+    }
+    return allRecords;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java
new file mode 100644
index 000000000000..b05901392057
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexMapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.client.SecondaryIndexStats;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys;
+
+public class SecondaryIndexMapper extends MetadataIndexMapper {
+  public SecondaryIndexMapper(HoodieWriteConfig dataWriteConfig) {
+    super(dataWriteConfig);
+  }
+
+  @Override
+  protected List<HoodieRecord> generateRecords(WriteStatus writeStatus) {
+    List<HoodieRecord> secondaryIndexRecords = new 
ArrayList<>(writeStatus.getIndexStats().getSecondaryIndexStats().size());
+    for (Map.Entry<String, List<SecondaryIndexStats>> entry : 
writeStatus.getIndexStats().getSecondaryIndexStats().entrySet()) {
+      String indexPartitionName = entry.getKey();
+      List<SecondaryIndexStats> secondaryIndexStats = entry.getValue();
+      for (SecondaryIndexStats stat : secondaryIndexStats) {
+        
secondaryIndexRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(stat.getRecordKey(),
 stat.getSecondaryKeyValue(), indexPartitionName, stat.isDeleted()));
+      }
+    }
+    return secondaryIndexRecords;
+  }
+
+  /**
+   * Post-processes secondary index records by deduplicating records with the 
same metadata record key.
+   * This handles partition path updates where the same record generates both 
DELETE and INSERT.
+   * Note that unlike record index, the record may have both partition path 
update and secondary key
+   * update; in this case, the delete record to the secondary index has to be 
honored. This is
+   * guaranteed as part of the reduceByKeys operation as in such a case, the 
DELETE and INSERT
+   * to the secondary index have different metadata record key.
+   */
+  @Override
+  public HoodieData<HoodieRecord> postProcess(HoodieData<HoodieRecord> 
records) {
+    // Deduplicate by metadata record key (secondaryKey$recordKey)
+    // usePartitionInKey = false because SI keys are globally unique
+    int parallelism = Math.max(
+        records.getNumPartitions(), 
dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
+    return reduceByKeys(records, parallelism, false);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index 2a6599244e05..1bf39c639d9e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -84,7 +84,6 @@ public class SecondaryIndexRecordGenerationUtils {
    * @param instantTime     instant time
    * @param indexDefinition secondary index definition
    * @param metadataConfig  metadata config
-   * @param fsView          file system view as of instant time
    * @param dataMetaClient  data table meta client
    * @param engineContext   engine context
    * @param writeConfig     hoodie write config.
@@ -97,8 +96,7 @@ public class SecondaryIndexRecordGenerationUtils {
                                                                                
       HoodieMetadataConfig metadataConfig,
                                                                                
       HoodieTableMetaClient dataMetaClient,
                                                                                
       HoodieEngineContext engineContext,
-                                                                               
       HoodieWriteConfig writeConfig
-                                                                               
       ) {
+                                                                               
       HoodieWriteConfig writeConfig) {
     TypedProperties props = writeConfig.getProps();
     // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
     if (allWriteStats.stream().anyMatch(writeStat -> {
@@ -118,7 +116,7 @@ public class SecondaryIndexRecordGenerationUtils {
     int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getSecondaryIndexParallelism()), 1);
 
     ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
-    return engineContext.parallelize(new 
ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism).flatMap(writeStatsByFileIdEntry -> {
+    HoodieData<HoodieRecord> secondaryIndexRecords = 
engineContext.parallelize(new ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism).flatMap(writeStatsByFileIdEntry -> {
       String fileId = writeStatsByFileIdEntry.getKey();
       List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
       String partition = writeStats.get(0).getPartitionPath();
@@ -199,6 +197,13 @@ public class SecondaryIndexRecordGenerationUtils {
       });
       return records.iterator();
     });
+
+    // Deduplicate secondary index records by grouping by the secondary index 
key
+    // (secondaryKey$recordKey). This handles the case where a record moves 
from one file group to
+    // another (partition path update), which generates both a delete (from 
old fileId) and an
+    // insert (to new fileId). Similar to how Record Level Index handles 
partition path update,
+    // we prefer non-deleted records.
+    return HoodieTableMetadataUtil.reduceByKeys(secondaryIndexRecords, 
parallelism, false);
   }
 
   private static TableFileSystemView.SliceView getSliceView(HoodieWriteConfig 
config, HoodieTableMetaClient dataMetaClient) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java
similarity index 85%
rename from 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
rename to 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java
index 188e93099ffe..4fe658ce3dcc 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestMetadataIndexMapper.java
@@ -32,7 +32,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestMetadataIndexGenerator extends HoodieCommonTestHarness {
+public class TestMetadataIndexMapper extends HoodieCommonTestHarness {
 
   @Test
   public void testRLIIndexMapperWithInsertsAndUpserts() throws Exception {
@@ -66,9 +65,8 @@ public class TestMetadataIndexGenerator extends 
HoodieCommonTestHarness {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withPath("random")
         .build();
-    MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper 
writeStatusBasedMetadataIndexMapper = new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
-        Collections.singletonList(MetadataPartitionType.RECORD_INDEX), 
writeConfig);
-    Iterator<HoodieRecord> rliRecords = 
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+    RecordIndexMapper recordIndexMapper = new RecordIndexMapper(writeConfig);
+    Iterator<HoodieRecord> rliRecords = recordIndexMapper.apply(writeStatus);
     AtomicInteger totalRLIRecords = new AtomicInteger();
     rliRecords.forEachRemaining(rliRecord -> {
       totalRLIRecords.getAndIncrement();
@@ -103,9 +101,8 @@ public class TestMetadataIndexGenerator extends 
HoodieCommonTestHarness {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
         .withPath("random")
         .build();
-    MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper 
writeStatusBasedMetadataIndexMapper = new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(
-        Collections.singletonList(MetadataPartitionType.RECORD_INDEX), 
writeConfig);
-    Iterator<HoodieRecord> rliRecords = 
writeStatusBasedMetadataIndexMapper.apply(writeStatus);
+    RecordIndexMapper recordIndexMapper = new RecordIndexMapper(writeConfig);
+    Iterator<HoodieRecord> rliRecords = recordIndexMapper.apply(writeStatus);
     AtomicInteger totalRLIRecords = new AtomicInteger();
     rliRecords.forEachRemaining(rliRecord -> {
       totalRLIRecords.getAndIncrement();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index aa2c04db44e4..e9a947a4d5a4 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -168,11 +168,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     throw new UnsupportedOperationException("Not implemented for Flink engine 
yet");
   }
 
-  @Override
-  MetadataIndexGenerator initializeMetadataIndexGenerator() {
-    throw new UnsupportedOperationException("Streaming writes are not 
supported for Flink");
-  }
-
   @Override
   protected EngineType getEngineType() {
     return EngineType.FLINK;
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index 16cfe2835d35..41ecca1fe7f1 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -63,11 +63,6 @@ public class JavaHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetada
     super(storageConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
   }
 
-  @Override
-  MetadataIndexGenerator initializeMetadataIndexGenerator() {
-    throw new UnsupportedOperationException("Streaming writes are not 
supported for Java");
-  }
-
   public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf,
                                                  HoodieWriteConfig writeConfig,
                                                  HoodieEngineContext context,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index d36e59728e90..7b79be758776 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -288,10 +288,6 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     return exprIndexRecords;
   }
 
-  protected MetadataIndexGenerator initializeMetadataIndexGenerator() {
-    return new MetadataIndexGenerator();
-  }
-
   protected SparkRDDMetadataWriteClient 
getSparkWriteClient(Option<BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, 
JavaRDD<WriteStatus>>> writeClientOpt) {
     return ((SparkRDDMetadataWriteClient) 
writeClientOpt.orElse(getWriteClient()));
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 587759829f83..70e3cfb7ac3b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -95,11 +95,6 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
     super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, 
inflightInstantTimestamp);
   }
 
-  @Override
-  MetadataIndexGenerator initializeMetadataIndexGenerator() {
-    throw new UnsupportedOperationException("Streaming writes are not 
supported for Spark table version six");
-  }
-
   @Override
   protected void initRegistry() {
     if (metadataWriteConfig.isMetricsOn()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index ab999f7589ef..39cd361b7102 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1089,25 +1089,40 @@ public class HoodieTableMetadataUtil {
   }
 
   /**
-   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
-   * So, this method performs reduce by key to ignore the deleted entry.
-   * @param recordIndexRecords hoodie records after rli index lookup.
-   * @param parallelism parallelism to use.
-   * @return
+   * Reduces metadata table index records by their metadata record key to 
ensure only a single
+   * update per metadata record key.
+   * <p>
+   * This is critical for handling scenarios where the same metadata record 
key appears multiple times:
+   * - Partition movements: A data record moving from one partition to another 
generates both
+   * a delete entry (from old partition) and an insert entry (to new partition)
+   * - File group movements: A record moving from one file group to another
+   * - When usePartitionInKey is true: Groups by (metadata record key, 
partition path) tuple
+   * - When usePartitionInKey is false: Groups by metadata record key only
+   * <p>
+   * The reduce logic prefers non-deleted records over deleted ones to 
properly handle movements.
+   * For Record Level Index: When both records are non-deleted (indicating 
duplicate inserts for
+   * the same key), an exception is thrown since this represents an invalid 
state.
+   * For Secondary Index: When both records are non-deleted, the later record 
(record2) is preferred.
+   *
+   * @param metadataRecords   {@link HoodieData} of metadata table index 
records to be reduced by keys
+   * @param parallelism       parallelism for the reduce-by operation
+   * @param usePartitionInKey whether to use partition path as part of the 
grouping key
+   *                          (true for partitioned RLI, false for 
non-partitioned RLI and SI)
+   * @return HoodieData of deduplicated metadata records with one record per 
metadata record key
    */
   @VisibleForTesting
-  public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> 
recordIndexRecords, int parallelism, boolean isPartitionedRLI) {
-    HoodiePairData<HoodieKey, HoodieRecord> recordIndexRecordsPair;
-    if (isPartitionedRLI) {
-      recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> {
+  public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> 
metadataRecords, int parallelism, boolean usePartitionInKey) {
+    HoodiePairData<HoodieKey, HoodieRecord> metadataRecordsPair;
+    if (usePartitionInKey) {
+      metadataRecordsPair = metadataRecords.mapToPair(r -> {
         String recordPartitionPath = r.isDelete(DELETE_CONTEXT, 
CollectionUtils.emptyProps())
             ? ((EmptyHoodieRecordPayloadWithPartition) 
r.getData()).getPartitionPath() : ((HoodieMetadataPayload) 
r.getData()).getDataPartition();
         return Pair.of(new HoodieKey(r.getRecordKey(), recordPartitionPath), 
r);
       });
     } else {
-      recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> 
Pair.of(r.getKey(), r));
+      metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(), 
r));
     }
-    return 
recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, 
HoodieRecord, HoodieRecord>) (record1, record2) -> {
+    return 
metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, 
HoodieRecord, HoodieRecord>) (record1, record2) -> {
       boolean isRecord1Deleted = record1.isDelete(DELETE_CONTEXT, 
CollectionUtils.emptyProps());
       boolean isRecord2Deleted = record2.isDelete(DELETE_CONTEXT, 
CollectionUtils.emptyProps());
       if (isRecord1Deleted && !isRecord2Deleted) {
@@ -1118,7 +1133,8 @@ public class HoodieTableMetadataUtil {
         // let's delete just 1 of them
         return record1;
       } else {
-        throw new HoodieIOException("Two HoodieRecord updates to RLI is seen 
for same record key " + record2.getRecordKey() + ", record 1 : "
+        // Both records are non-deleted
+        throw new HoodieIOException("Two HoodieRecord updates to the index is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
             + record1 + ", record 2 : " + record2);
       }
     }, parallelism).values();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index 18cbb29163bb..2bbaf74ce588 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.types.StringType
 import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, 
ValueSource}
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource, ValueSource}
 import org.junit.jupiter.params.provider.Arguments.arguments
 import org.scalatest.Assertions.{assertResult, assertThrows}
 
@@ -1767,6 +1767,146 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     )
   }
 
+  /**
+   * Test Secondary Index with partition path update using global record index.
+   * This test validates that when a record moves from one partition (file 
group) to another
+   * using global index, the secondary index is correctly updated and queries 
work as expected.
+   *
+   * Test flow:
+   * 1. Create a table with global index enabled
+   * 2. Insert records into different partitions with a secondary index
+   * 3. Update partition path of a record (moving it from partition A to B)
+   * 4. Validate secondary index metadata is correct (no duplicates, no 
missing entry)
+   * 5. Validate query results using secondary index pruning
+   */
+  @ParameterizedTest
+  @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", 
"MERGE_ON_READ,true", "MERGE_ON_READ,false"))
+  def testSecondaryIndexWithPartitionPathUpdateUsingGlobalIndex(tableType: 
HoodieTableType,
+                                                                
enableStreamingWrite: Boolean): Unit = {
+    val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" 
else "mor"
+    val tableName = "test_secondary_index_with_partition_update_global_index_" 
+ sqlTableType + "_" + enableStreamingWrite
+
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  ts bigint,
+         |  record_key_col string,
+         |  not_record_key_col string,
+         |  partition_key_col string
+         |) using hudi
+         | options (
+         |  primaryKey = 'record_key_col',
+         |  type = '$sqlTableType',
+         |  hoodie.metadata.enable = 'true',
+         |  hoodie.metadata.record.index.enable = 'true',
+         |  hoodie.datasource.write.recordkey.field = 'record_key_col',
+         |  hoodie.enable.data.skipping = 'true',
+         |  hoodie.datasource.write.payload.class = 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+         |  hoodie.index.type = 'RECORD_INDEX',
+         |  hoodie.record.index.update.partition.path = 'true',
+         |  hoodie.metadata.streaming.write.enabled = '$enableStreamingWrite'
+         | )
+         | partitioned by (partition_key_col)
+         | location '$basePath'
+       """.stripMargin)
+
+    spark.sql("set hoodie.parquet.small.file.limit=0")
+    // Insert initial records into different partitions
+    spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
+    spark.sql(s"insert into $tableName values(2, 'row2', 'def', 'p2')")
+    spark.sql(s"insert into $tableName values(3, 'row3', 'ghi', 'p2')")
+    spark.sql(s"insert into $tableName values(4, 'row4', 'ghi', 'p2')")
+
+    // Create secondary index
+    spark.sql(s"create index idx_not_record_key_col on $tableName 
(not_record_key_col)")
+
+    // Validate index created successfully
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+    
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
+
+    // Validate the secondary index records before partition update
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false)
+    )
+
+    // Validate data skipping with filters on secondary key column
+    spark.sql("set hoodie.metadata.enable=true")
+    spark.sql("set hoodie.enable.data.skipping=true")
+    spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'ghi'")(
+      Seq(3, "row3", "ghi", "p2"),
+      Seq(4, "row4", "ghi", "p2")
+    )
+
+    // Update partition path - move row3 from p2 to p3 using MERGE INTO
+    // This moves a record from file group A to file group B
+    spark.sql(
+      s"""
+         |MERGE INTO $tableName AS target
+         |USING (SELECT 3 as ts, 'row3' as record_key_col, 'ghi' as 
not_record_key_col, 'p3' as partition_key_col) AS source
+         |ON target.record_key_col = source.record_key_col
+         |WHEN MATCHED THEN UPDATE SET *
+       """.stripMargin)
+
+    // Validate the secondary index records after partition update
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false)
+    )
+
+    // Validate data after partition update
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'ghi'")(
+      Seq(3, "row3", "ghi", "p3"),
+      Seq(4, "row4", "ghi", "p2")
+    )
+
+    // Validate all records are in correct partitions
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName order by record_key_col")(
+      Seq(1, "row1", "abc", "p1"),
+      Seq(2, "row2", "def", "p2"),
+      Seq(3, "row3", "ghi", "p3"),
+      Seq(4, "row4", "ghi", "p2")
+    )
+
+    // Update both secondary column value and partition path
+    // This moves a record from file group A to file group B
+    spark.sql(
+      s"""
+         |MERGE INTO $tableName AS target
+         |USING (SELECT 4 as ts, 'row4' as record_key_col, 'jkl' as 
not_record_key_col, 'p3' as partition_key_col) AS source
+         |ON target.record_key_col = source.record_key_col
+         |WHEN MATCHED THEN UPDATE SET *
+       """.stripMargin)
+
+    checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
+      Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false),
+      Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
+      Seq(s"ghi${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false),
+      Seq(s"jkl${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row4", false)
+    )
+
+    // Validate data after partition update
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName where not_record_key_col = 'jkl'")(
+      Seq(4, "row4", "jkl", "p3")
+    )
+
+    // Validate all records are in correct partitions
+    checkAnswer(s"select ts, record_key_col, not_record_key_col, 
partition_key_col from $tableName order by record_key_col")(
+      Seq(1, "row1", "abc", "p1"),
+      Seq(2, "row2", "def", "p2"),
+      Seq(3, "row3", "ghi", "p3"),
+      Seq(4, "row4", "jkl", "p3")
+    )
+  }
+
   private def deleteLastCompletedCommitFromTimeline(hudiOpts: Map[String, 
String], metaClient: HoodieTableMetaClient): Unit = {
     val lastInstant = 
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant().get()
     assertTrue(hoodieStorage().deleteFile(new 
StoragePath(metaClient.getTimelinePath, 
HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR.getFileName(lastInstant))))


Reply via email to