This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b9fb9f616e6 [HUDI-6822] Fix deletes handling in hbase index when
partition path is updated (#9630)
b9fb9f616e6 is described below
commit b9fb9f616e6585b5e92f796e50ef93747d38fb49
Author: flashJd <[email protected]>
AuthorDate: Tue Dec 5 00:08:35 2023 +0800
[HUDI-6822] Fix deletes handling in hbase index when partition path is
updated (#9630)
---------
Co-authored-by: Balaji Varadarajan <[email protected]>
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 1 +
.../metadata/HoodieBackedTableMetadataWriter.java | 68 +++++-----------
.../hudi/index/hbase/SparkHoodieHBaseIndex.java | 4 +
.../index/hbase/TestSparkHoodieHBaseIndex.java | 95 ++++++++++++++--------
.../org/apache/hudi/common/model/HoodieRecord.java | 23 +++++-
.../hudi/common/model/HoodieRecordDelegate.java | 32 ++++++--
.../model/TestHoodieRecordSerialization.scala | 12 +--
7 files changed, 140 insertions(+), 95 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 33e8d501943..de3d181ad06 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -323,6 +323,7 @@ public class HoodieIndexUtils {
} else {
// merged record has a different partition: issue a delete to the
old partition and insert the merged record to the new partition
HoodieRecord<R> deleteRecord = createDeleteRecord(config,
existing.getKey());
+ deleteRecord.setIgnoreIndexUpdate(true);
return Arrays.asList(tagRecord(deleteRecord,
existing.getCurrentLocation()), merged).iterator();
}
});
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ecdf93eda1d..781a9024117 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -29,10 +29,8 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -89,17 +87,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
@@ -939,8 +934,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
- HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpdates(writeStatus);
- HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpdates(updatesFromWriteStatuses, commitMetadata);
+ HoodieData<HoodieRecord> updatesFromWriteStatuses =
getRecordIndexUpserts(writeStatus);
+ HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX,
updatesFromWriteStatuses.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
return partitionToRecordMap;
@@ -953,7 +948,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
processAndCommit(instantTime, () -> {
Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
commitMetadata, instantTime, getRecordsGenerationParams());
- HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpdates(records, commitMetadata);
+ HoodieData<HoodieRecord> additionalUpdates =
getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX, records.union(additionalUpdates));
updateFunctionalIndexIfPresent(commitMetadata, instantTime,
partitionToRecordMap);
return partitionToRecordMap;
@@ -1483,44 +1478,19 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
}
/**
- * Return records that represent update to the record index due to write
operation on the dataset.
+ * Return records that represent upserts to the record index due to write
operation on the dataset.
*
* @param writeStatuses {@code WriteStatus} from the write operation
*/
- private HoodieData<HoodieRecord>
getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) {
- HoodiePairData<String, HoodieRecordDelegate> recordKeyDelegatePairs = null;
- // if update partition path is true, chances that we might get two records
(1 delete in older partition and 1 insert to new partition)
- // and hence we might have to do reduce By key before ingesting to RLI
partition.
- if (dataWriteConfig.getRecordIndexUpdatePartitionPath()) {
- recordKeyDelegatePairs = writeStatuses.map(writeStatus ->
writeStatus.getWrittenRecordDelegates().stream()
- .map(recordDelegate -> Pair.of(recordDelegate.getRecordKey(),
recordDelegate)))
- .flatMapToPair(Stream::iterator)
- .reduceByKey((recordDelegate1, recordDelegate2) -> {
- if
(recordDelegate1.getRecordKey().equals(recordDelegate2.getRecordKey())) {
- if (!recordDelegate1.getNewLocation().isPresent() &&
!recordDelegate2.getNewLocation().isPresent()) {
- throw new HoodieIOException("Both version of records do not
have location set. Record V1 " + recordDelegate1.toString()
- + ", Record V2 " + recordDelegate2.toString());
- }
- if (recordDelegate1.getNewLocation().isPresent()) {
- return recordDelegate1;
- } else {
- // if record delegate 1 does not have location set, record
delegate 2 should have location set.
- return recordDelegate2;
- }
- } else {
- return recordDelegate1;
- }
- }, Math.max(1, writeStatuses.getNumPartitions()));
- } else {
- // if update partition path = false, we should get only one entry per
record key.
- recordKeyDelegatePairs = writeStatuses.flatMapToPair(
- (SerializableFunction<WriteStatus, Iterator<? extends Pair<String,
HoodieRecordDelegate>>>) writeStatus
- -> writeStatus.getWrittenRecordDelegates().stream().map(rec ->
Pair.of(rec.getRecordKey(), rec)).iterator());
- }
- return recordKeyDelegatePairs
- .map(writeStatusRecordDelegate -> {
- HoodieRecordDelegate recordDelegate =
writeStatusRecordDelegate.getValue();
- HoodieRecord hoodieRecord = null;
+ private HoodieData<HoodieRecord>
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {
+ return writeStatuses.flatMap(writeStatus -> {
+ List<HoodieRecord> recordList = new LinkedList<>();
+ for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
+ if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+ if (recordDelegate.getIgnoreIndexUpdate()) {
+ continue;
+ }
+ HoodieRecord hoodieRecord;
Option<HoodieRecordLocation> newLocation =
recordDelegate.getNewLocation();
if (newLocation.isPresent()) {
if (recordDelegate.getCurrentLocation().isPresent()) {
@@ -1536,17 +1506,21 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
}
// for updates, we can skip updating RLI partition in MDT
} else {
+ // Insert new record case
hoodieRecord = HoodieMetadataPayload.createRecordIndexUpdate(
recordDelegate.getRecordKey(),
recordDelegate.getPartitionPath(),
newLocation.get().getFileId(),
newLocation.get().getInstantTime(), dataWriteConfig.getWritesFileIdEncoding());
+ recordList.add(hoodieRecord);
}
} else {
// Delete existing index for a deleted record
hoodieRecord =
HoodieMetadataPayload.createRecordIndexDelete(recordDelegate.getRecordKey());
+ recordList.add(hoodieRecord);
}
- return hoodieRecord;
- })
- .filter(Objects::nonNull);
+ }
+ }
+ return recordList.iterator();
+ });
}
private HoodieData<HoodieRecord>
getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata
replaceCommitMetadata) {
@@ -1568,7 +1542,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
this.getClass().getSimpleName());
}
- private HoodieData<HoodieRecord>
getRecordIndexAdditionalUpdates(HoodieData<HoodieRecord>
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
+ private HoodieData<HoodieRecord>
getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord>
updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
WriteOperationType operationType = commitMetadata.getOperationType();
if (operationType == WriteOperationType.INSERT_OVERWRITE) {
// load existing records from replaced filegroups and left anti join
overwriting records
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 039501fbf67..43af6dda0d4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -288,6 +288,7 @@ public class SparkHoodieHBaseIndex extends
HoodieIndex<Object, Object> {
new EmptyHoodieRecordPayload());
emptyRecord.unseal();
emptyRecord.setCurrentLocation(new
HoodieRecordLocation(commitTs, fileId));
+ emptyRecord.setIgnoreIndexUpdate(true);
emptyRecord.seal();
// insert partition new data record
currentRecord = new HoodieAvroRecord(new
HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()),
@@ -359,6 +360,9 @@ public class SparkHoodieHBaseIndex extends
HoodieIndex<Object, Object> {
// Any calls beyond `multiPutBatchSize` within a second will be
rate limited
for (HoodieRecordDelegate recordDelegate :
writeStatus.getWrittenRecordDelegates()) {
if (!writeStatus.isErrored(recordDelegate.getHoodieKey())) {
+ if (recordDelegate.getIgnoreIndexUpdate()) {
+ continue;
+ }
Option<HoodieRecordLocation> loc =
recordDelegate.getNewLocation();
if (loc.isPresent()) {
if (recordDelegate.getCurrentLocation().isPresent()) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index 23f1c9203bd..2ad2abaf988 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -86,6 +86,7 @@ import static
org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atMost;
@@ -221,11 +222,10 @@ public class TestSparkHoodieHBaseIndex extends
SparkClientFunctionalTestHarness
}
@Test
- public void testTagLocationAndPartitionPathUpdate() throws Exception {
+ public void testTagLocationAndPartitionPathUpdateDisabled() throws Exception
{
final String newCommitTime = "001";
- final int numRecords = 10;
final String oldPartitionPath = "1970/01/01";
- final String emptyHoodieRecordPayloadClassName =
EmptyHoodieRecordPayload.class.getName();
+ final int numRecords = 10;
List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime,
numRecords);
List<HoodieRecord> oldRecords = new LinkedList();
@@ -238,39 +238,68 @@ public class TestSparkHoodieHBaseIndex extends
SparkClientFunctionalTestHarness
JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
- HoodieWriteConfig config = getConfig(true, false);
- SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true,
false));
-
- try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
- // allowed path change test
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
+ HoodieWriteConfig config = getConfigBuilder(100, false, false).build();
+ SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+ writeClient.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords,
newCommitTime);
+ writeClient.commit(newCommitTime, writeStatues);
+ assertNoWriteErrors(writeStatues.collect());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
+ SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+ List<HoodieRecord> notAllowPathChangeRecords = tagLocation(index,
newWriteRecords, hoodieTable).collect();
+ assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
+
+ String newCommitTime1 = "002";
+ writeClient.startCommitWithTime(newCommitTime1);
+ JavaRDD<WriteStatus> writeStatues1 = writeClient.upsert(newWriteRecords,
newCommitTime1);
+ writeClient.commit(newCommitTime1, writeStatues1);
+ assertNoWriteErrors(writeStatues1.collect());
+ assertEquals(numRecords, writeStatues1.map(writeStatus ->
writeStatus.getTotalRecords()).reduce(Long::sum));
+ assertEquals(0, writeStatues1.filter(writeStatus ->
!writeStatus.getPartitionPath().equals(oldPartitionPath)).count());
+ }
- JavaRDD<HoodieRecord> oldHoodieRecord = tagLocation(index,
oldWriteRecords, hoodieTable);
- assertEquals(0, oldHoodieRecord.filter(record ->
record.isCurrentLocationKnown()).count());
- writeClient.startCommitWithTime(newCommitTime);
- JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords,
newCommitTime);
- writeClient.commit(newCommitTime, writeStatues);
- assertNoWriteErrors(writeStatues.collect());
- updateLocation(index, writeStatues, hoodieTable);
+ @Test
+ public void testTagLocationAndPartitionPathUpdateEnabled() throws Exception {
+ final String newCommitTime = "001";
+ final String oldPartitionPath = "1970/01/01";
+ final int numRecords = 10;
- metaClient = HoodieTableMetaClient.reload(metaClient);
- hoodieTable = HoodieSparkTable.create(config, context, metaClient);
- List<HoodieRecord> taggedRecords = tagLocation(index, newWriteRecords,
hoodieTable).collect();
- assertEquals(numRecords * 2L, taggedRecords.stream().count());
- // Verify the number of deleted records
- assertEquals(numRecords, taggedRecords.stream().filter(record ->
record.getKey().getPartitionPath().equals(oldPartitionPath)
- &&
record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClassName)).count());
- // Verify the number of inserted records
- assertEquals(numRecords, taggedRecords.stream().filter(record ->
!record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
-
- // not allowed path change test
- index = new SparkHoodieHBaseIndex(getConfig(false, false));
- List<HoodieRecord> notAllowPathChangeRecords = tagLocation(index,
newWriteRecords, hoodieTable).collect();
- assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
- assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord ->
hoodieRecord.isCurrentLocationKnown()
- &&
hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count());
+ List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime,
numRecords);
+ List<HoodieRecord> oldRecords = new LinkedList();
+ for (HoodieRecord newRecord: newRecords) {
+ HoodieKey key = new HoodieKey(newRecord.getRecordKey(),
oldPartitionPath);
+ HoodieRecord hoodieRecord = new HoodieAvroRecord(key,
(HoodieRecordPayload) newRecord.getData());
+ oldRecords.add(hoodieRecord);
}
+
+ JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
+ JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
+
+ HoodieWriteConfig config = getConfigBuilder(100, true, false).build();
+ SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
+ writeClient.startCommitWithTime(newCommitTime);
+ JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords,
newCommitTime);
+ writeClient.commit(newCommitTime, writeStatues);
+ assertNoWriteErrors(writeStatues.collect());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
+ SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
+ List<HoodieRecord> pathChangeRecords = tagLocation(index, newWriteRecords,
hoodieTable).collect();
+ assertEquals(numRecords * 2, pathChangeRecords.stream().count());
+ assertEquals(numRecords,
pathChangeRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
+
+ String newCommitTime1 = "002";
+ writeClient.startCommitWithTime(newCommitTime1);
+ JavaRDD<WriteStatus> writeStatues1 = writeClient.upsert(newWriteRecords,
newCommitTime1);
+ writeClient.commit(newCommitTime1, writeStatues1);
+ assertNoWriteErrors(writeStatues1.collect());
+ assertEquals(numRecords * 2, writeStatues1.map(writeStatus ->
writeStatus.getTotalRecords()).reduce(Long::sum));
+ assertNotEquals(0, writeStatues1.filter(writeStatus ->
writeStatus.getPartitionPath().equals(oldPartitionPath)).count());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+ List<HoodieRecord> pathChangeRecords1 = tagLocation(index,
newWriteRecords, hoodieTable).collect();
+ assertEquals(numRecords,
pathChangeRecords1.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
}
@Test
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 827c143d877..553fb5ce706 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -134,6 +134,11 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
*/
protected HoodieRecordLocation newLocation;
+ /**
+ * If set, not update index after written.
+ */
+ protected boolean ignoreIndexUpdate;
+
/**
* Indicates whether the object is sealed.
*/
@@ -159,6 +164,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
this.currentLocation = null;
this.newLocation = null;
this.sealed = false;
+ this.ignoreIndexUpdate = false;
this.operation = operation;
this.metaData = metaData;
}
@@ -181,6 +187,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
this.currentLocation = record.currentLocation;
this.newLocation = record.newLocation;
this.sealed = record.sealed;
+ this.ignoreIndexUpdate = record.ignoreIndexUpdate;
}
public HoodieRecord() {}
@@ -256,6 +263,17 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
return HoodieRecordLocation.INVALID_POSITION;
}
+ /**
+ * Sets the ignore flag.
+ */
+ public void setIgnoreIndexUpdate(boolean ignoreFlag) {
+ this.ignoreIndexUpdate = ignoreFlag;
+ }
+
+ public boolean getIgnoreIndexUpdate() {
+ return this.ignoreIndexUpdate;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -266,7 +284,8 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
}
HoodieRecord that = (HoodieRecord) o;
return Objects.equals(key, that.key) && Objects.equals(data, that.data)
- && Objects.equals(currentLocation, that.currentLocation) &&
Objects.equals(newLocation, that.newLocation);
+ && Objects.equals(currentLocation, that.currentLocation) &&
Objects.equals(newLocation, that.newLocation)
+ && Objects.equals(ignoreIndexUpdate, that.ignoreIndexUpdate);
}
@Override
@@ -343,6 +362,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
// NOTE: Writing out actual record payload is relegated to the actual
// implementation
writeRecordPayload(data, kryo, output);
+ kryo.writeObjectOrNull(output, ignoreIndexUpdate, Boolean.class);
}
/**
@@ -358,6 +378,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
// NOTE: Reading out actual record payload is relegated to the actual
// implementation
this.data = readRecordPayload(kryo, input);
+ this.ignoreIndexUpdate = kryo.readObjectOrNull(input, Boolean.class);
// NOTE: We're always seal object after deserialization
this.sealed = true;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
index a9323c15988..f493b3a96f6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -52,52 +52,59 @@ public class HoodieRecordDelegate implements Serializable,
KryoSerializable {
*/
private Option<HoodieRecordLocation> newLocation;
+ /**
+ * If set, not update index after written.
+ */
+ private boolean ignoreIndexUpdate;
+
private HoodieRecordDelegate(HoodieKey hoodieKey,
@Nullable HoodieRecordLocation currentLocation,
- @Nullable HoodieRecordLocation newLocation) {
+ @Nullable HoodieRecordLocation newLocation,
+ boolean ignoreIndexUpdate) {
this.hoodieKey = hoodieKey;
this.currentLocation = Option.ofNullable(currentLocation);
this.newLocation = Option.ofNullable(newLocation);
+ this.ignoreIndexUpdate = ignoreIndexUpdate;
}
public static HoodieRecordDelegate create(String recordKey, String
partitionPath) {
- return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
null, null);
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
null, null, false);
}
public static HoodieRecordDelegate create(String recordKey,
String partitionPath,
HoodieRecordLocation
currentLocation) {
- return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, null);
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, null, false);
}
public static HoodieRecordDelegate create(String recordKey,
String partitionPath,
HoodieRecordLocation
currentLocation,
HoodieRecordLocation newLocation) {
- return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, newLocation);
+ return new HoodieRecordDelegate(new HoodieKey(recordKey, partitionPath),
currentLocation, newLocation, false);
}
public static HoodieRecordDelegate create(HoodieKey key) {
- return new HoodieRecordDelegate(key, null, null);
+ return new HoodieRecordDelegate(key, null, null, false);
}
public static HoodieRecordDelegate create(HoodieKey key,
HoodieRecordLocation currentLocation) {
- return new HoodieRecordDelegate(key, currentLocation, null);
+ return new HoodieRecordDelegate(key, currentLocation, null, false);
}
public static HoodieRecordDelegate create(HoodieKey key,
HoodieRecordLocation
currentLocation,
HoodieRecordLocation newLocation) {
- return new HoodieRecordDelegate(key, currentLocation, newLocation);
+ return new HoodieRecordDelegate(key, currentLocation, newLocation, false);
}
public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record) {
- return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), record.getNewLocation());
+ return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), record.getNewLocation(),
record.getIgnoreIndexUpdate());
}
public static HoodieRecordDelegate fromHoodieRecord(HoodieRecord record,
@Nullable
HoodieRecordLocation newLocationOverride) {
- return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), newLocationOverride);
+ return new HoodieRecordDelegate(record.getKey(),
record.getCurrentLocation(), newLocationOverride,
record.getIgnoreIndexUpdate());
}
public String getRecordKey() {
@@ -120,12 +127,17 @@ public class HoodieRecordDelegate implements
Serializable, KryoSerializable {
return newLocation;
}
+ public boolean getIgnoreIndexUpdate() {
+ return ignoreIndexUpdate;
+ }
+
@Override
public String toString() {
return "HoodieRecordDelegate{"
+ "hoodieKey=" + hoodieKey
+ ", currentLocation=" + currentLocation
+ ", newLocation=" + newLocation
+ + ", ignoreIndexUpdate=" + ignoreIndexUpdate
+ '}';
}
@@ -135,6 +147,7 @@ public class HoodieRecordDelegate implements Serializable,
KryoSerializable {
kryo.writeObjectOrNull(output, hoodieKey, HoodieKey.class);
kryo.writeClassAndObject(output, currentLocation.isPresent() ?
currentLocation.get() : null);
kryo.writeClassAndObject(output, newLocation.isPresent() ?
newLocation.get() : null);
+ kryo.writeObjectOrNull(output, ignoreIndexUpdate, Boolean.class);
}
@VisibleForTesting
@@ -143,5 +156,6 @@ public class HoodieRecordDelegate implements Serializable,
KryoSerializable {
this.hoodieKey = kryo.readObjectOrNull(input, HoodieKey.class);
this.currentLocation = Option.ofNullable((HoodieRecordLocation)
kryo.readClassAndObject(input));
this.newLocation = Option.ofNullable((HoodieRecordLocation)
kryo.readClassAndObject(input));
+ this.ignoreIndexUpdate = kryo.readObjectOrNull(input, Boolean.class);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index 26a19f9c856..1ce1b3e8fca 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -79,8 +79,8 @@ class TestHoodieRecordSerialization extends
SparkClientFunctionalTestHarness {
val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5),
unsafeRow, false)
Seq(
- (unsafeRow, rowSchema, 87),
- (hoodieInternalRow, addMetaFields(rowSchema), 127)
+ (unsafeRow, rowSchema, 89),
+ (hoodieInternalRow, addMetaFields(rowSchema), 129)
) foreach { case (row, schema, expectedSize) => routine(row, schema,
expectedSize) }
}
@@ -105,13 +105,15 @@ class TestHoodieRecordSerialization extends
SparkClientFunctionalTestHarness {
val key = new HoodieKey("rec-key", "part-path")
val legacyRecord = toLegacyAvroRecord(avroRecord, key)
+ legacyRecord.setIgnoreIndexUpdate(true)
val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
+ avroIndexedRecord.setIgnoreIndexUpdate(true)
- val expectedLagacyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 534 else
528
+ val expectedLagacyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 536 else
530
Seq(
(legacyRecord, expectedLagacyRecordSize),
- (avroIndexedRecord, 389)
+ (avroIndexedRecord, 391)
) foreach { case (record, expectedSize) => routine(record, expectedSize) }
}
@@ -130,7 +132,7 @@ class TestHoodieRecordSerialization extends
SparkClientFunctionalTestHarness {
}
val key = new HoodieKey("rec-key", "part-path")
- val expectedEmptyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 30 else 27
+ val expectedEmptyRecordSize = if (HoodieSparkUtils.gteqSpark3_4) 32 else 29
Seq(
(new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1,
HoodieRecordType.AVRO),
expectedEmptyRecordSize),