This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b9fb9f616e6 [HUDI-6822] Fix deletes handling in hbase index when 
partition path is updated (#9630)
b9fb9f616e6 is described below

commit b9fb9f616e6585b5e92f796e50ef93747d38fb49
Author: flashJd <[email protected]>
AuthorDate: Tue Dec 5 00:08:35 2023 +0800

    [HUDI-6822] Fix deletes handling in hbase index when partition path is 
updated (#9630)
    
    
    ---------
    
    Co-authored-by: Balaji Varadarajan <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  1 +
 .../metadata/HoodieBackedTableMetadataWriter.java  | 68 +++++-----------
 .../hudi/index/hbase/SparkHoodieHBaseIndex.java    |  4 +
 .../index/hbase/TestSparkHoodieHBaseIndex.java     | 95 ++++++++++++++--------
 .../org/apache/hudi/common/model/HoodieRecord.java | 23 +++++-
 .../hudi/common/model/HoodieRecordDelegate.java    | 32 ++++++--
 .../model/TestHoodieRecordSerialization.scala      | 12 +--
 7 files changed, 140 insertions(+), 95 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 33e8d501943..de3d181ad06 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -323,6 +323,7 @@ public class HoodieIndexUtils {
           } else {
             // merged record has a different partition: issue a delete to the 
old partition and insert the merged record to the new partition
             HoodieRecord<R> deleteRecord = createDeleteRecord(config, 
existing.getKey());
+            deleteRecord.setIgnoreIndexUpdate(true);
             return Arrays.asList(tagRecord(deleteRecord, 
existing.getCurrentLocation()), merged).iterator();
           }
         });
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ecdf93eda1d..781a9024117 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -29,10 +29,8 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -89,17 +87,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
@@ -939,8 +934,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
-      HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpdates(writeStatus);
-      HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpdates(updatesFromWriteStatuses, commitMetadata);
+      HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpserts(writeStatus);
+      HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX, 
updatesFromWriteStatuses.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
       return partitionToRecordMap;
@@ -953,7 +948,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     processAndCommit(instantTime, () -> {
       Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
commitMetadata, instantTime, getRecordsGenerationParams());
-      HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpdates(records, commitMetadata);
+      HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(records, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX, records.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
       return partitionToRecordMap;
@@ -1483,44 +1478,19 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
   }
 
   /**
-   * Return records that represent update to the record index due to write 
operation on the dataset.
+   * Return records that represent upserts to the record index due to write 
operation on the dataset.
    *
    * @param writeStatuses {@code WriteStatus} from the write operation
    */
-  private HoodieData<HoodieRecord> 
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
-    HoodiePairData<String, HoodieRecordDelegate> recordKeyDelegatePairs = null;
-    // if update partition path is true, chances that we might get two records 
(1 delete in older partition and 1 insert to new partition)
-    // and hence we might have to do reduce By key before ingesting to RLI 
partition.
-    if (dataWriteConfig.getRecordIndexUpdatePartitionPath()) {
-      recordKeyDelegatePairs = writeStatuses.map(writeStatus -> 
writeStatus.getWrittenRecordDelegates().stream()
-              .map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(), 
recordDelegate)))
-          .flatMapToPair(Stream::iterator)
-          .reduceByKey((recordDelegate1, recordDelegate2) -> {
-            if 
(recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) {
-              if (!recordDelegate1.getNewLocation().isPresent() && 
!recordDelegate2.getNewLocation().isPresent()) {
-                throw new HoodieIOException("Both version of records do not 
have location set. Record V1 " + recordDelegate1.toString()
-                    + ", Record V2 " + recordDelegate2.toString());
-              }
-              if (recordDelegate1.getNewLocation().isPresent()) {
-                return recordDelegate1;
-              } else {
-                // if record delegate 1 does not have location set, record 
delegate 2 should have location set.
-                return recordDelegate2;
-              }
-            } else {
-              return recordDelegate1;
-            }
-          }, Math.max(1, writeStatuses.getNumPartitions()));
-    } else {
-      // if update partition path = false, we should get only one entry per 
record key.
-      recordKeyDelegatePairs = writeStatuses.flatMapToPair(
-          (SerializableFunction<WriteStatus, Iterator<? extends Pair<String, 
HoodieRecordDelegate>>>) writeStatus
-              -> writeStatus.getWrittenRecordDelegates().stream().map(rec -> 
Pair.of(rec.getRecordKey(), rec)).iterator());
-    }
-    return recordKeyDelegatePairs
-        .map(writeStatusRecordDelegate -> {
-          HoodieRecordDelegate recordDelegate = 
writeStatusRecordDelegate.getValue();
-          HoodieRecord hoodieRecord = null;
+  private HoodieData<HoodieRecord> 
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
+    return writeStatuses.flatMap(writeStatus -> {
+      List<HoodieRecord> recordList = new LinkedList<>();
+      for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
+        if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+          if (recordDelegate.getIgnoreIndexUpdate()) {
+            continue;
+          }
+          HoodieRecord hoodieRecord;
           Option<HoodieRecordLocation> newLocation = 
recordDelegate.getNewLocation();
           if (newLocation.isPresent()) {
             if (recordDelegate.getCurrentLocation().isPresent()) {
@@ -1536,17 +1506,21 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
               }
               // for updates, we can skip updating RLI partition in MDT
             } else {
+              // Insert new record case
               hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
                   recordDelegate.getRecordKey(), 
recordDelegate.getPartitionPath(),
                   newLocation.get().getFileId(), 
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+              recordList.add(hoodieRecord);
             }
           } else {
             // Delete existing index for a deleted record
             hoodieRecord = 
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
+            recordList.add(hoodieRecord);
           }
-          return hoodieRecord;
-        })
-        .filter(Objects::nonNull);
+        }
+      }
+      return recordList.iterator();
+    });
   }
 
   private HoodieData<HoodieRecord> 
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata 
replaceCommitMetadata) {
@@ -1568,7 +1542,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         this.getClass().getSimpleName());
   }
 
-  private HoodieData<HoodieRecord> 
getRecordIndexAdditionalUpdates(HoodieData<HoodieRecord> 
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
+  private HoodieData<HoodieRecord> 
getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> 
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
     WriteOperationType operationType = commitMetadata.getOperationType();
     if (operationType == WriteOperationType.INSERT_OVERWRITE) {
       // load existing records from replaced filegroups and left anti join 
overwriting records
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 039501fbf67..43af6dda0d4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -288,6 +288,7 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
                   new EmptyHoodieRecordPayload());
               emptyRecord.unseal();
               emptyRecord.setCurrentLocation(new 
HoodieRecordLocation(commitTs, fileId));
+              emptyRecord.setIgnoreIndexUpdate(true);
               emptyRecord.seal();
               // insert partition new data record
               currentRecord = new HoodieAvroRecord(new 
HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()),
@@ -359,6 +360,9 @@ public class SparkHoodieHBaseIndex extends 
HoodieIndex<Object, Object> {
             // Any calls beyond `multiPutBatchSize` within a second will be 
rate limited
             for (HoodieRecordDelegate recordDelegate : 
writeStatus.getWrittenRecordDelegates()) {
               if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+                if (recordDelegate.getIgnoreIndexUpdate()) {
+                  continue;
+                }
                 Option<HoodieRecordLocation> loc = 
recordDelegate.getNewLocation();
                 if (loc.isPresent()) {
                   if (recordDelegate.getCurrentLocation().isPresent()) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index 23f1c9203bd..2ad2abaf988 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -86,6 +86,7 @@ import static 
org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM;
 import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atMost;
@@ -221,11 +222,10 @@ public class TestSparkHoodieHBaseIndex extends 
SparkClientFunctionalTestHarness
   }
 
   @Test
-  public void testTagLocationAndPartitionPathUpdate() throws Exception {
+  public void testTagLocationAndPartitionPathUpdateDisabled() throws Exception 
{
     final String newCommitTime = "001";
-    final int numRecords = 10;
     final String oldPartitionPath = "1970/01/01";
-    final String emptyHoodieRecordPayloadClassName = 
EmptyHoodieRecordPayload.class.getName();
+    final int numRecords = 10;
 
     List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, 
numRecords);
     List<HoodieRecord> oldRecords = new LinkedList();
@@ -238,39 +238,68 @@ public class TestSparkHoodieHBaseIndex extends 
SparkClientFunctionalTestHarness
     JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
     JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
 
-    HoodieWriteConfig config = getConfig(true, false);
-    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true, 
false));
-
-    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
-      // allowed path change test
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+    HoodieWriteConfig config = getConfigBuilder(100, false, false).build();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, 
newCommitTime);
+    writeClient.commit(newCommitTime, writeStatues);
+    assertNoWriteErrors(writeStatues.collect());
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+    List<HoodieRecord> notAllowPathChangeRecords = tagLocation(index, 
newWriteRecords, hoodieTable).collect();
+    assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
+
+    String newCommitTime1 = "002";
+    writeClient.startCommitWithTime(newCommitTime1);
+    JavaRDD<WriteStatus> writeStatues1 = writeClient.upsert(newWriteRecords, 
newCommitTime1);
+    writeClient.commit(newCommitTime1, writeStatues1);
+    assertNoWriteErrors(writeStatues1.collect());
+    assertEquals(numRecords, writeStatues1.map(writeStatus -> 
writeStatus.getTotalRecords()).reduce(Long::sum));
+    assertEquals(0, writeStatues1.filter(writeStatus -> 
!writeStatus.getPartitionPath().equals(oldPartitionPath)).count());
+  }
 
-      JavaRDD<HoodieRecord> oldHoodieRecord = tagLocation(index, 
oldWriteRecords, hoodieTable);
-      assertEquals(0, oldHoodieRecord.filter(record -> 
record.isCurrentLocationKnown()).count());
-      writeClient.startCommitWithTime(newCommitTime);
-      JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, 
newCommitTime);
-      writeClient.commit(newCommitTime, writeStatues);
-      assertNoWriteErrors(writeStatues.collect());
-      updateLocation(index, writeStatues, hoodieTable);
+  @Test
+  public void testTagLocationAndPartitionPathUpdateEnabled() throws Exception {
+    final String newCommitTime = "001";
+    final String oldPartitionPath = "1970/01/01";
+    final int numRecords = 10;
 
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      hoodieTable = HoodieSparkTable.create(config, context, metaClient);
-      List<HoodieRecord> taggedRecords = tagLocation(index, newWriteRecords, 
hoodieTable).collect();
-      assertEquals(numRecords * 2L, taggedRecords.stream().count());
-      // Verify the number of deleted records
-      assertEquals(numRecords, taggedRecords.stream().filter(record -> 
record.getKey().getPartitionPath().equals(oldPartitionPath)
-          && 
record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClassName)).count());
-      // Verify the number of inserted records
-      assertEquals(numRecords, taggedRecords.stream().filter(record -> 
!record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
-
-      // not allowed path change test
-      index = new SparkHoodieHBaseIndex(getConfig(false, false));
-      List<HoodieRecord> notAllowPathChangeRecords = tagLocation(index, 
newWriteRecords, hoodieTable).collect();
-      assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
-      assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> 
hoodieRecord.isCurrentLocationKnown()
-          && 
hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+    List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, 
numRecords);
+    List<HoodieRecord> oldRecords = new LinkedList();
+    for (HoodieRecord newRecord: newRecords) {
+      HoodieKey key = new HoodieKey(newRecord.getRecordKey(), 
oldPartitionPath);
+      HoodieRecord hoodieRecord = new HoodieAvroRecord(key, 
(HoodieRecordPayload) newRecord.getData());
+      oldRecords.add(hoodieRecord);
     }
+
+    JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
+    JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
+
+    HoodieWriteConfig config = getConfigBuilder(100, true, false).build();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+    writeClient.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, 
newCommitTime);
+    writeClient.commit(newCommitTime, writeStatues);
+    assertNoWriteErrors(writeStatues.collect());
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
+    SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+    List<HoodieRecord> pathChangeRecords = tagLocation(index, newWriteRecords, 
hoodieTable).collect();
+    assertEquals(numRecords * 2, pathChangeRecords.stream().count());
+    assertEquals(numRecords, 
pathChangeRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+
+    String newCommitTime1 = "002";
+    writeClient.startCommitWithTime(newCommitTime1);
+    JavaRDD<WriteStatus> writeStatues1 = writeClient.upsert(newWriteRecords, 
newCommitTime1);
+    writeClient.commit(newCommitTime1, writeStatues1);
+    assertNoWriteErrors(writeStatues1.collect());
+    assertEquals(numRecords * 2, writeStatues1.map(writeStatus -> 
writeStatus.getTotalRecords()).reduce(Long::sum));
+    assertNotEquals(0, writeStatues1.filter(writeStatus -> 
writeStatus.getPartitionPath().equals(oldPartitionPath)).count());
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+    List<HoodieRecord> pathChangeRecords1 = tagLocation(index, 
newWriteRecords, hoodieTable).collect();
+    assertEquals(numRecords, 
pathChangeRecords1.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
   }
 
   @Test
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 827c143d877..553fb5ce706 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -134,6 +134,11 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
    */
   protected HoodieRecordLocation newLocation;
 
+  /**
+   * If set, not update index after written.
+   */
+  protected boolean ignoreIndexUpdate;
+
   /**
    * Indicates whether the object is sealed.
    */
@@ -159,6 +164,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     this.currentLocation = null;
     this.newLocation = null;
     this.sealed = false;
+    this.ignoreIndexUpdate = false;
     this.operation = operation;
     this.metaData = metaData;
   }
@@ -181,6 +187,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
     this.sealed = record.sealed;
+    this.ignoreIndexUpdate = record.ignoreIndexUpdate;
   }
 
   public HoodieRecord() {}
@@ -256,6 +263,17 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     return HoodieRecordLocation.INVALID_POSITION;
   }
 
+  /**
+   * Sets the ignore flag.
+   */
+  public void setIgnoreIndexUpdate(boolean ignoreFlag) {
+    this.ignoreIndexUpdate = ignoreFlag;
+  }
+
+  public boolean getIgnoreIndexUpdate() {
+    return this.ignoreIndexUpdate;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -266,7 +284,8 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     }
     HoodieRecord that = (HoodieRecord) o;
     return Objects.equals(key, that.key) && Objects.equals(data, that.data)
-        && Objects.equals(currentLocation, that.currentLocation) && 
Objects.equals(newLocation, that.newLocation);
+        && Objects.equals(currentLocation, that.currentLocation) && 
Objects.equals(newLocation, that.newLocation)
+        && Objects.equals(ignoreIndexUpdate, that.ignoreIndexUpdate);
   }
 
   @Override
@@ -343,6 +362,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     // NOTE: Writing out actual record payload is relegated to the actual
     //       implementation
     writeRecordPayload(data, kryo, output);
+    kryo.writeObjectOrNull(output, ignoreIndexUpdate, Boolean.class);
   }
 
   /**
@@ -358,6 +378,7 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     // NOTE: Reading out actual record payload is relegated to the actual
     //       implementation
     this.data = readRecordPayload(kryo, input);
+    this.ignoreIndexUpdate = kryo.readObjectOrNull(input, Boolean.class);
 
     // NOTE: We're always seal object after deserialization
     this.sealed = true;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
index a9323c15988..f493b3a96f6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -52,52 +52,59 @@ public class HoodieRecordDelegate implements Serializable, 
KryoSerializable {
    */
   private Option<HoodieRecordLocation> newLocation;
 
+  /**
+   * If set, not update index after written.
+   */
+  private boolean ignoreIndexUpdate;
+
   private HoodieRecordDelegate(HoodieKey hoodieKey,
                                @Nullable HoodieRecordLocation currentLocation,
-                               @Nullable HoodieRecordLocation newLocation) {
+                               @Nullable HoodieRecordLocation newLocation,
+                               boolean ignoreIndexUpdate) {
     this.hoodieKey = hoodieKey;
     this.currentLocation = Option.ofNullable(currentLocation);
     this.newLocation = Option.ofNullable(newLocation);
+    this.ignoreIndexUpdate = ignoreIndexUpdate;
   }
 
   public static HoodieRecordDelegate create(String recordKey, String 
partitionPath) {
-    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
null, null);
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
null, null, false);
   }
 
   public static HoodieRecordDelegate create(String recordKey,
                                             String partitionPath,
                                             HoodieRecordLocation 
currentLocation) {
-    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, null);
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, null, false);
   }
 
   public static HoodieRecordDelegate create(String recordKey,
                                             String partitionPath,
                                             HoodieRecordLocation 
currentLocation,
                                             HoodieRecordLocation newLocation) {
-    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, newLocation);
+    return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath), 
currentLocation, newLocation, false);
   }
 
   public static HoodieRecordDelegate create(HoodieKey key) {
-    return new HoodieRecordDelegate(key, null, null);
+    return new HoodieRecordDelegate(key, null, null, false);
   }
 
   public static HoodieRecordDelegate create(HoodieKey key, 
HoodieRecordLocation currentLocation) {
-    return new HoodieRecordDelegate(key, currentLocation, null);
+    return new HoodieRecordDelegate(key, currentLocation, null, false);
   }
 
   public static HoodieRecordDelegate create(HoodieKey key,
                                             HoodieRecordLocation 
currentLocation,
                                             HoodieRecordLocation newLocation) {
-    return new HoodieRecordDelegate(key, currentLocation, newLocation);
+    return new HoodieRecordDelegate(key, currentLocation, newLocation, false);
   }
 
   public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record) {
-    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), record.getNewLocation());
+    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), record.getNewLocation(), 
record.getIgnoreIndexUpdate());
   }
 
   public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record,
                                                       @Nullable 
HoodieRecordLocation newLocationOverride) {
-    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), newLocationOverride);
+    return new HoodieRecordDelegate(record.getKey(), 
record.getCurrentLocation(), newLocationOverride, 
record.getIgnoreIndexUpdate());
   }
 
   public String getRecordKey() {
@@ -120,12 +127,17 @@ public class HoodieRecordDelegate implements 
Serializable, KryoSerializable {
     return newLocation;
   }
 
+  public boolean getIgnoreIndexUpdate() {
+    return ignoreIndexUpdate;
+  }
+
   @Override
   public String toString() {
     return "HoodieRecordDelegate{"
         + "hoodieKey=" + hoodieKey
         + ", currentLocation=" + currentLocation
         + ", newLocation=" + newLocation
+        + ", ignoreIndexUpdate=" + ignoreIndexUpdate
         + '}';
   }
 
@@ -135,6 +147,7 @@ public class HoodieRecordDelegate implements Serializable, 
KryoSerializable {
     kryo.writeObjectOrNull(output, hoodieKey, HoodieKey.class);
     kryo.writeClassAndObject(output, currentLocation.isPresent() ? 
currentLocation.get() : null);
     kryo.writeClassAndObject(output, newLocation.isPresent() ? 
newLocation.get() : null);
+    kryo.writeObjectOrNull(output, ignoreIndexUpdate, Boolean.class);
   }
 
   @VisibleForTesting
@@ -143,5 +156,6 @@ public class HoodieRecordDelegate implements Serializable, 
KryoSerializable {
     this.hoodieKey = kryo.readObjectOrNull(input, HoodieKey.class);
     this.currentLocation = Option.ofNullable((HoodieRecordLocation) 
kryo.readClassAndObject(input));
     this.newLocation = Option.ofNullable((HoodieRecordLocation) 
kryo.readClassAndObject(input));
+    this.ignoreIndexUpdate = kryo.readObjectOrNull(input, Boolean.class);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index 26a19f9c856..1ce1b3e8fca 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -79,8 +79,8 @@ class TestHoodieRecordSerialization extends 
SparkClientFunctionalTestHarness {
     val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5), 
unsafeRow, false)
 
     Seq(
-      (unsafeRow, rowSchema, 87),
-      (hoodieInternalRow, addMetaFields(rowSchema), 127)
+      (unsafeRow, rowSchema, 89),
+      (hoodieInternalRow, addMetaFields(rowSchema), 129)
     ) foreach { case (row, schema, expectedSize) => routine(row, schema, 
expectedSize) }
   }
 
@@ -105,13 +105,15 @@ class TestHoodieRecordSerialization extends 
SparkClientFunctionalTestHarness {
     val key = new HoodieKey("rec-key", "part-path")
 
     val legacyRecord = toLegacyAvroRecord(avroRecord, key)
+    legacyRecord.setIgnoreIndexUpdate(true)
     val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
+    avroIndexedRecord.setIgnoreIndexUpdate(true)
 
-    val expectedLagacyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 534 else 
528
+    val expectedLagacyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 536 else 
530
 
     Seq(
       (legacyRecord, expectedLagacyRecordSize),
-      (avroIndexedRecord, 389)
+      (avroIndexedRecord, 391)
     ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
   }
 
@@ -130,7 +132,7 @@ class TestHoodieRecordSerialization extends 
SparkClientFunctionalTestHarness {
     }
 
     val key = new HoodieKey("rec-key", "part-path")
-    val expectedEmptyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 30 else 27
+    val expectedEmptyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 32 else 29
     Seq(
       (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1, 
HoodieRecordType.AVRO),
         expectedEmptyRecordSize),

Reply via email to