This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5143a982bca [HUDI-7269] Fallback to key based merge if positions are
missing from log block (#11415)
5143a982bca is described below
commit 5143a982bca79fadef1a819c17b73c123f5376ac
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jun 11 09:09:23 2024 -0400
[HUDI-7269] Fallback to key based merge if positions are missing from log
block (#11415)
Fallback to key based merging if positions are missing from a log block when
doing position based read in the fg reader. Changes:
- Make position based buffer extend key based buffer so we can fall back
to key based buffer.
- Move some position based logic from the base buffer into the position
buffer because that is the only place it is used.
- If a log block is found to not have positions, we call a method to convert
the map to use keys instead of positions: fallbackToKeyBasedBuffer().
This conversion is not completely effective because delete records may
not have key stored. We set a flag "needToDoHybridStrategy" to true and
then handle this issue when merging with the base file.
---------
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/OverwriteWithLatestSparkMerger.java | 46 +++++
.../hudi/BaseSparkInternalRowReaderContext.java | 3 +
.../hudi/common/model/HoodieAvroIndexedRecord.java | 8 +
.../hudi/common/model/HoodieRecordMerger.java | 17 ++
.../common/model/OverwriteWithLatestMerger.java | 49 +++++
.../hudi/common/table/HoodieTableConfig.java | 11 ++
.../hudi/common/table/HoodieTableMetaClient.java | 14 +-
.../read/HoodieBaseFileGroupRecordBuffer.java | 103 +++--------
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 14 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 198 +++++++++++++++++----
.../common/table/read/CustomPayloadForTesting.java | 35 ++++
.../table/read/TestHoodieFileGroupReaderBase.java | 30 +++-
.../testutils/reader/DataGenerationPlan.java | 18 +-
.../testutils/reader/HoodieFileSliceTestUtils.java | 47 +++--
.../hudi/common/table/read/TestCustomMerger.java | 56 +++++-
.../common/table/read/TestEventTimeMerging.java | 59 +++++-
...ing.java => TestOverwriteWithLatestMerger.java} | 101 +++++++----
.../reader/HoodieFileGroupReaderTestHarness.java | 9 +-
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 8 +-
...stHoodiePositionBasedFileGroupRecordBuffer.java | 34 ++--
.../read/TestHoodieFileGroupReaderOnSpark.scala | 17 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 6 +-
22 files changed, 665 insertions(+), 218 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
new file mode 100644
index 00000000000..611f045f645
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Spark merger that always chooses the newer record
+ */
+public class OverwriteWithLatestSparkMerger extends HoodieSparkRecordMerger {
+
+ @Override
+ public String getMergingStrategy() {
+ return OVERWRITE_MERGER_STRATEGY_UUID;
+ }
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 7fb4577f896..36bcba9214c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -48,6 +48,7 @@ import scala.Function1;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;
/**
@@ -65,6 +66,8 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
switch (mergerStrategy) {
case DEFAULT_MERGER_STRATEGY_UUID:
return new HoodieSparkRecordMerger();
+ case OVERWRITE_MERGER_STRATEGY_UUID:
+ return new OverwriteWithLatestSparkMerger();
default:
throw new HoodieException("The merger strategy UUID is not supported:
" + mergerStrategy);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 77f724249c7..96f6700ef91 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -53,6 +53,14 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
super(key, data);
}
+ public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data,
HoodieRecordLocation currentLocation) {
+ super(key, data, null, currentLocation, null);
+ }
+
+ public HoodieAvroIndexedRecord(IndexedRecord data, HoodieRecordLocation
currentLocation) {
+ super(null, data, null, currentLocation, null);
+ }
+
public HoodieAvroIndexedRecord(
HoodieKey key,
IndexedRecord data,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 62cde38a351..94d6509a7d8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -45,6 +46,8 @@ public interface HoodieRecordMerger extends Serializable {
String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";
+ String OVERWRITE_MERGER_STRATEGY_UUID =
"ce9acb64-bde0-424c-9b91-f6ebba25356d";
+
/**
* This method converges combineAndGetUpdateValue and precombine from
HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
@@ -163,4 +166,18 @@ public interface HoodieRecordMerger extends Serializable {
* The kind of merging strategy this recordMerger belongs to. An UUID
represents merging strategy.
*/
String getMergingStrategy();
+
+ /**
+ * The record merge mode that corresponds to this record merger
+ */
+ default RecordMergeMode getRecordMergeMode() {
+ switch (getMergingStrategy()) {
+ case DEFAULT_MERGER_STRATEGY_UUID:
+ return RecordMergeMode.EVENT_TIME_ORDERING;
+ case OVERWRITE_MERGER_STRATEGY_UUID:
+ return RecordMergeMode.OVERWRITE_WITH_LATEST;
+ default:
+ return RecordMergeMode.CUSTOM;
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
new file mode 100644
index 00000000000..2311b030a18
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Avro Merger that always chooses the newer record
+ */
+public class OverwriteWithLatestMerger implements HoodieRecordMerger {
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+
+ @Override
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return HoodieRecord.HoodieRecordType.AVRO;
+ }
+
+ @Override
+ public String getMergingStrategy() {
+ return OVERWRITE_MERGER_STRATEGY_UUID;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b3bf9668d93..a295f14108d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -198,6 +198,17 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> RECORD_MERGER_STRATEGY =
ConfigProperty
.key("hoodie.compaction.record.merger.strategy")
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ .withInferFunction(cfg -> {
+ switch
(RecordMergeMode.valueOf(cfg.getStringOrDefault(RECORD_MERGE_MODE))) {
+ case EVENT_TIME_ORDERING:
+ return Option.of(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
+ case OVERWRITE_WITH_LATEST:
+ return
Option.of(HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID);
+ case CUSTOM:
+ default:
+ return Option.empty();
+ }
+ })
.sinceVersion("0.13.0")
.withDocumentation("Id of merger strategy. Hudi will pick
HoodieRecordMerger implementations in
hoodie.datasource.write.record.merger.impls which has the same merger strategy
id");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 250091ecec6..fdf843f078f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
@@ -1307,6 +1308,9 @@ public class HoodieTableMetaClient implements
Serializable {
if (recordMergeMode != null) {
tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
}
+ if (recordMergerStrategy != null) {
+ tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY,
recordMergerStrategy);
+ }
}
if (null != tableCreateSchema) {
@@ -1415,20 +1419,25 @@ public class HoodieTableMetaClient implements
Serializable {
boolean recordMergerStrategySet = null != recordMergerStrategy;
if (!recordMergerStrategySet
- || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+ || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)
+ || recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID)) {
if (payloadClassNameSet) {
if
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+ recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
} else if
(payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) {
recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
} else {
recordMergeMode = RecordMergeMode.CUSTOM;
}
} else if (payloadTypeSet) {
if
(payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) {
recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+ recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
} else if
(payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) {
recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
} else {
recordMergeMode = RecordMergeMode.CUSTOM;
}
@@ -1458,6 +1467,9 @@ public class HoodieTableMetaClient implements
Serializable {
|| (payloadClassNameSet &&
payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName()))
|| (payloadTypeSet &&
payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())),
constructMergeConfigErrorMessage());
+ checkArgument(recordMergerStrategySet &&
recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID),
+ "Record merger strategy (" + (recordMergerStrategySet ?
recordMergerStrategy : "null")
+ + ") should be consistent with the record merging mode
OVERWRITE_WITH_LATEST");
break;
case EVENT_TIME_ORDERING:
checkArgument((!payloadClassNameSet && !payloadTypeSet)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index c25fba0e928..9e0763d6116 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.InternalSchemaCache;
@@ -39,26 +38,19 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieKeyException;
-import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.avro.Schema;
-import org.roaringbitmap.longlong.Roaring64NavigableMap;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Function;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
@@ -97,6 +89,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMergeMode = getRecordMergeMode(payloadProps);
this.recordMerger = recordMerger;
+ //Custom merge mode should produce the same results for any merger so we
won't fail if there is a mismatch
+ if (recordMerger.getRecordMergeMode() != this.recordMergeMode &&
this.recordMergeMode != RecordMergeMode.CUSTOM) {
+ throw new IllegalStateException("Record merger is " +
recordMerger.getClass().getName() + " but merge mode is " +
this.recordMergeMode);
+ }
this.payloadProps = payloadProps;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
this.hoodieTableMetaClient = hoodieTableMetaClient;
@@ -293,21 +289,28 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord,
Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
if (existingRecordMetadataPair != null) {
- // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
- // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
- // For same ordering values, uses the natural order(arrival time
semantics).
- Comparable existingOrderingVal = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
- payloadProps);
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean chooseExisting = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
- && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
- if (chooseExisting) {
- // The DELETE message is obsolete if the old message has greater
orderingVal.
- return Option.empty();
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ return Option.empty();
+ case EVENT_TIME_ORDERING:
+ case CUSTOM:
+ default:
+ Comparable existingOrderingVal = readerContext.getOrderingValue(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
+ payloadProps);
+ if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
+ return Option.empty();
+ }
+ Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean chooseExisting = !deleteOrderingVal.equals(0)
+ && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
+ && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+ if (chooseExisting) {
+ // The DELETE message is obsolete if the old message has greater
orderingVal.
+ return Option.empty();
+ }
}
}
// Do delete.
@@ -428,60 +431,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
}
- /**
- * Filter a record for downstream processing when:
- * 1. A set of pre-specified keys exists.
- * 2. The key of the record is not contained in the set.
- */
- protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys, Schema writerSchema) {
- String recordKey = readerContext.getValue(record, writerSchema,
keyFieldName).toString();
- // Can not extract the record key, throw.
- if (recordKey == null || recordKey.isEmpty()) {
- throw new HoodieKeyException("Can not extract the key for a record");
- }
-
- // No keys are specified. Cannot skip at all.
- if (keys.isEmpty()) {
- return false;
- }
-
- // When the record key matches with one of the keys or key prefixes, can
not skip.
- if ((isFullKey && keys.contains(recordKey))
- || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
- return false;
- }
-
- // Otherwise, this record is not needed.
- return true;
- }
-
- /**
- * Extract the record positions from a log block header.
- *
- * @param logBlock
- * @return
- * @throws IOException
- */
- protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock)
throws IOException {
- List<Long> blockPositions = new ArrayList<>();
-
- Roaring64NavigableMap positions = logBlock.getRecordPositions();
- if (positions == null || positions.isEmpty()) {
- throw new HoodieValidationException("No record position info is found
when attempt to do position based merge.");
- }
-
- Iterator<Long> iterator = positions.iterator();
- while (iterator.hasNext()) {
- blockPositions.add(iterator.next());
- }
-
- if (blockPositions.isEmpty()) {
- throw new HoodieCorruptedDataException("No positions are extracted.");
- }
-
- return blockPositions;
- }
-
protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>,
Map<String, Object>> logRecordInfo) throws IOException {
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 96d4306afd4..d311923c625 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -102,7 +102,7 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
}
@Override
- public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
+ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws
IOException {
Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
while (it.hasNext()) {
DeleteRecord record = it.next();
@@ -126,17 +126,19 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileGroupR
return records.containsKey(recordKey);
}
+ protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
+ String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
+ Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(recordKey);
+ return hasNextBaseRecord(baseRecord, logRecordInfo);
+ }
+
@Override
protected boolean doHasNext() throws IOException {
ValidationUtils.checkState(baseFileIterator != null, "Base file iterator
has not been set yet");
// Handle merging.
while (baseFileIterator.hasNext()) {
- T baseRecord = baseFileIterator.next();
-
- String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(recordKey);
- if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
+ if (hasNextBaseRecord(baseFileIterator.next())) {
return true;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index bc25bb96f5e..79011a81d6b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -27,17 +27,23 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.Schema;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -45,17 +51,21 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into record position based map.
* Here the position means that record position in the base file. The records
from the base file is accessed from an iterator object. These records are
merged when the
* {@link #hasNext} method is called.
*/
-public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieKeyBasedFileGroupRecordBuffer<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class);
+
private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME =
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
private long nextRecordPosition = 0L;
+ private boolean needToDoHybridStrategy = false;
public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
@@ -73,11 +83,23 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
@Override
public BufferType getBufferType() {
- return BufferType.POSITION_BASED_MERGE;
+ return readerContext.getShouldMergeUseRecordPosition() ?
BufferType.POSITION_BASED_MERGE : super.getBufferType();
}
@Override
public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
+ if (!readerContext.getShouldMergeUseRecordPosition()) {
+ super.processDataBlock(dataBlock, keySpecOpt);
+ return;
+ }
+ // Extract positions from data block.
+ List<Long> recordPositions = extractRecordPositions(dataBlock);
+ if (recordPositions == null) {
+ LOG.warn("Falling back to key based merge for Read");
+ fallbackToKeyBasedBuffer();
+ super.processDataBlock(dataBlock, keySpecOpt);
+ return;
+ }
// Prepare key filters.
Set<String> keys = new HashSet<>();
boolean isFullKey = true;
@@ -94,8 +116,6 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
enablePartialMerging = true;
}
- // Extract positions from data block.
- List<Long> recordPositions = extractRecordPositions(dataBlock);
Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
// TODO: Return an iterator that can generate sequence number with the
record.
@@ -123,35 +143,58 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
}
}
- @Override
- public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordPosition) throws IOException {
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
- Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
- doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
- if (mergedRecordAndMetadata.isPresent()) {
- records.put(recordPosition, Pair.of(
-
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
- mergedRecordAndMetadata.get().getRight()));
+ private void fallbackToKeyBasedBuffer() {
+ readerContext.setShouldMergeUseRecordPosition(false);
+ //need to make a copy of the keys to avoid concurrent modification
exception
+ ArrayList<Serializable> positions = new ArrayList<>(records.keySet());
+ for (Serializable position : positions) {
+ Pair<Option<T>, Map<String, Object>> entry = records.get(position);
+ Object recordKey = entry.getRight().get(INTERNAL_META_RECORD_KEY);
+ if (entry.getLeft().isPresent() || recordKey != null) {
+
+ records.put((String) recordKey, entry);
+ records.remove(position);
+ } else {
+ //if it's a delete record and the key is null, then we need to still
use positions
+ //this happens when we read the positions using
logBlock.getRecordPositions()
+ //instead of reading the delete records themselves
+ needToDoHybridStrategy = true;
+ }
}
}
@Override
public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws
IOException {
+ if (!readerContext.getShouldMergeUseRecordPosition()) {
+ super.processDeleteBlock(deleteBlock);
+ return;
+ }
+
List<Long> recordPositions = extractRecordPositions(deleteBlock);
- if
(recordMerger.getMergingStrategy().equals(DEFAULT_MERGER_STRATEGY_UUID)) {
- for (Long recordPosition : recordPositions) {
- records.put(recordPosition,
- Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(null, "", 0)));
- }
+ if (recordPositions == null) {
+ LOG.warn("Falling back to key based merge for Read");
+ fallbackToKeyBasedBuffer();
+ super.processDeleteBlock(deleteBlock);
return;
}
- int recordIndex = 0;
- Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
- while (it.hasNext()) {
- DeleteRecord record = it.next();
- long recordPosition = recordPositions.get(recordIndex++);
- processNextDeletedRecord(record, recordPosition);
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ for (Long recordPosition : recordPositions) {
+ records.putIfAbsent(recordPosition,
+ Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(null, "", 0L)));
+ }
+ return;
+ case EVENT_TIME_ORDERING:
+ case CUSTOM:
+ default:
+ int recordIndex = 0;
+ Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+ while (it.hasNext()) {
+ DeleteRecord record = it.next();
+ long recordPosition = recordPositions.get(recordIndex++);
+ processNextDeletedRecord(record, recordPosition);
+ }
}
}
@@ -174,20 +217,97 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
}
@Override
- protected boolean doHasNext() throws IOException {
- ValidationUtils.checkState(baseFileIterator != null, "Base file iterator
has not been set yet");
-
- // Handle merging.
- while (baseFileIterator.hasNext()) {
- T baseRecord = baseFileIterator.next();
- nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
- if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
- return true;
+ protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
+ if (!readerContext.getShouldMergeUseRecordPosition()) {
+ return doHasNextFallbackBaseRecord(baseRecord);
+ }
+
+ nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema,
+ ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
+ Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
+
+ Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+ baseRecord, readerSchema);
+
+ Option<T> resultRecord = logRecordInfo != null
+ ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight())
+ : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord),
metadata);
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ return true;
+ }
+ return false;
+ }
+
+ private boolean doHasNextFallbackBaseRecord(T baseRecord) throws IOException
{
+ if (needToDoHybridStrategy) {
+ //see if there is a delete block with record positions
+ nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema,
+ ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
+ Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
+ if (logRecordInfo != null) {
+ //we have a delete that was not able to be converted. Since it is the
newest version, the record is deleted
+ //remove a key based record if it exists
+ records.remove(readerContext.getRecordKey(baseRecord, readerSchema));
+ return false;
}
}
+ return super.hasNextBaseRecord(baseRecord);
+ }
+
+ /**
+ * Filter a record for downstream processing when:
+ * 1. A set of pre-specified keys exists.
+ * 2. The key of the record is not contained in the set.
+ */
+ protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys, Schema writerSchema) {
+ // No keys are specified. Cannot skip at all.
+ if (keys.isEmpty()) {
+ return false;
+ }
+
+ String recordKey = readerContext.getValue(record, writerSchema,
keyFieldName).toString();
+ // Can not extract the record key, throw.
+ if (recordKey == null || recordKey.isEmpty()) {
+ throw new HoodieKeyException("Can not extract the key for a record");
+ }
+
+ // When the record key matches with one of the keys or key prefixes, can
not skip.
+ if ((isFullKey && keys.contains(recordKey))
+ || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
+ return false;
+ }
+
+ // Otherwise, this record is not needed.
+ return true;
+ }
+
+ /**
+ * Extract the record positions from a log block header.
+ *
+ * @param logBlock
+ * @return
+ * @throws IOException
+ */
+ protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock)
throws IOException {
+ List<Long> blockPositions = new ArrayList<>();
+
+ Roaring64NavigableMap positions = logBlock.getRecordPositions();
+ if (positions == null || positions.isEmpty()) {
+ LOG.warn("No record position info is found when attempt to do position
based merge.");
+ return null;
+ }
+
+ Iterator<Long> iterator = positions.iterator();
+ while (iterator.hasNext()) {
+ blockPositions.add(iterator.next());
+ }
+
+ if (blockPositions.isEmpty()) {
+ LOG.warn("No positions are extracted.");
+ return null;
+ }
- // Handle records solely from log files.
- return hasNextLogRecord();
+ return blockPositions;
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
new file mode 100644
index 00000000000..b69d4421f73
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/CustomPayloadForTesting.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+
+public class CustomPayloadForTesting extends DefaultHoodieRecordPayload {
+ public CustomPayloadForTesting(GenericRecord record, Comparable orderingVal)
{
+ super(record, orderingVal);
+ }
+
+ public CustomPayloadForTesting(Option<GenericRecord> record) {
+ super(record);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a8717d8a8e3..8128a32d5ef 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -55,6 +56,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -75,12 +78,16 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
@TempDir
protected java.nio.file.Path tempDir;
+ protected String customRecordMergerStrategy =
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+
public abstract StorageConfiguration<?> getStorageConf();
public abstract String getBasePath();
public abstract HoodieReaderContext<T> getHoodieReaderContext(String
tablePath, Schema avroSchema, StorageConfiguration<?> storageConf);
+ public abstract String getRecordPayloadForMergeMode(RecordMergeMode
mergeMode);
+
public abstract void commitToTable(List<String> recordList, String operation,
Map<String, String> writeConfigs);
@@ -93,7 +100,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
@Test
public void testCompareToComparable() throws Exception {
- Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING));
// Prepare a table for initializing reader context
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
commitToTable(recordsToStrings(dataGen.generateInserts("001", 1)),
BULK_INSERT.value(), writeConfigs);
@@ -170,7 +177,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
@ParameterizedTest
@MethodSource("testArguments")
public void testReadFileGroupInMergeOnReadTable(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
- Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode));
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
@@ -194,7 +201,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
@ParameterizedTest
@MethodSource("testArguments")
public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
- Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode));
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
// Use InMemoryIndex to generate log only mor table
writeConfigs.put("hoodie.index.type", "INMEMORY");
@@ -212,7 +219,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
}
- private Map<String, String> getCommonConfigs() {
+ private Map<String, String> getCommonConfigs(RecordMergeMode
recordMergeMode) {
Map<String, String> configMapping = new HashMap<>();
configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
@@ -225,6 +232,21 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
configMapping.put("hoodie.delete.shuffle.parallelism", "1");
configMapping.put("hoodie.merge.small.file.group.candidates.limit", "0");
configMapping.put("hoodie.compact.inline", "false");
+ configMapping.put(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ configMapping.put("hoodie.datasource.write.payload.class",
getRecordPayloadForMergeMode(recordMergeMode));
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ configMapping.put("hoodie.datasource.write.record.merger.strategy",
OVERWRITE_MERGER_STRATEGY_UUID);
+ configMapping.put("hoodie.datasource.write.precombine.field", "");
+ break;
+ case CUSTOM:
+ configMapping.put("hoodie.datasource.write.record.merger.strategy",
customRecordMergerStrategy);
+ break;
+ case EVENT_TIME_ORDERING:
+ default:
+ configMapping.put("hoodie.datasource.write.record.merger.strategy",
DEFAULT_MERGER_STRATEGY_UUID);
+ break;
+ }
return configMapping;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
index 29b0090a5db..666a2ff11ce 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
@@ -39,6 +39,7 @@ public class DataGenerationPlan {
// The operation type of the record.
private final OperationType operationType;
private final String instantTime;
+ private final boolean writePositions;
public enum OperationType {
INSERT,
@@ -50,12 +51,14 @@ public class DataGenerationPlan {
String partitionPath,
long timestamp,
OperationType operationType,
- String instantTime) {
+ String instantTime,
+ boolean writePositions) {
this.recordKeys = recordKeys;
this.partitionPath = partitionPath;
this.timestamp = timestamp;
this.operationType = operationType;
this.instantTime = instantTime;
+ this.writePositions = writePositions;
}
public List<String> getRecordKeys() {
@@ -78,6 +81,10 @@ public class DataGenerationPlan {
return instantTime;
}
+ public boolean getWritePositions() {
+ return writePositions;
+ }
+
public static Builder newBuilder() {
return new Builder();
}
@@ -88,6 +95,7 @@ public class DataGenerationPlan {
private long timestamp;
private OperationType operationType;
private String instantTime;
+ private boolean writePositions;
public Builder withRecordKeys(List<String> recordKeys) {
this.recordKeys = recordKeys;
@@ -114,13 +122,19 @@ public class DataGenerationPlan {
return this;
}
+ public Builder withWritePositions(boolean writePositions) {
+ this.writePositions = writePositions;
+ return this;
+ }
+
public DataGenerationPlan build() {
return new DataGenerationPlan(
recordKeys,
partitionPath,
timestamp,
operationType,
- instantTime);
+ instantTime,
+ writePositions);
}
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 2ca33678adc..01ce1f168f6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -85,6 +86,7 @@ public class HoodieFileSliceTestUtils {
public static final String PARTITION_PATH = "partition_path";
public static final String RIDER = "rider";
public static final String ROW_KEY = "_row_key";
+ public static final int RECORD_KEY_INDEX =
AVRO_SCHEMA.getField(ROW_KEY).pos();
public static final String TIMESTAMP = "timestamp";
public static final HoodieTestDataGenerator DATA_GEN =
new HoodieTestDataGenerator(0xDEED);
@@ -165,21 +167,25 @@ public class HoodieFileSliceTestUtils {
HoodieLogBlock.HoodieLogBlockType dataBlockType,
List<IndexedRecord> records,
Map<HoodieLogBlock.HeaderMetadataType, String> header,
- StoragePath logFilePath
+ StoragePath logFilePath,
+ boolean writePositions,
+ Map<String, Long> keyToPositionMap
) {
return createDataBlock(
dataBlockType,
- records.stream().map(HoodieAvroIndexedRecord::new)
+ records.stream().map(r -> new HoodieAvroIndexedRecord(r, new
HoodieRecordLocation("", "", keyToPositionMap.get(r.get(RECORD_KEY_INDEX)))))
.collect(Collectors.toList()),
header,
- logFilePath);
+ logFilePath,
+ writePositions);
}
private static HoodieDataBlock createDataBlock(
HoodieLogBlock.HoodieLogBlockType dataBlockType,
List<HoodieRecord> records,
Map<HoodieLogBlock.HeaderMetadataType, String> header,
- StoragePath pathForReader
+ StoragePath pathForReader,
+ boolean writePositions
) {
switch (dataBlockType) {
case CDC_DATA_BLOCK:
@@ -190,7 +196,7 @@ public class HoodieFileSliceTestUtils {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(
records,
- false,
+ writePositions,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
@@ -203,7 +209,7 @@ public class HoodieFileSliceTestUtils {
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
- false,
+ writePositions,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
@@ -219,21 +225,23 @@ public class HoodieFileSliceTestUtils {
List<IndexedRecord> records,
Map<HoodieLogBlock.HeaderMetadataType, String> header,
Schema schema,
- Properties props
+ Properties props,
+ boolean writePositions,
+ Map<String, Long> keyToPositionMap
) {
List<HoodieRecord> hoodieRecords = records.stream()
.map(r -> {
String rowKey = (String)
r.get(r.getSchema().getField(ROW_KEY).pos());
String partitionPath = (String)
r.get(r.getSchema().getField(PARTITION_PATH).pos());
- return new HoodieAvroIndexedRecord(new HoodieKey(rowKey,
partitionPath), r);
+ return new HoodieAvroIndexedRecord(new HoodieKey(rowKey,
partitionPath), r, new HoodieRecordLocation("", "",
keyToPositionMap.get(r.get(RECORD_KEY_INDEX))));
})
.collect(Collectors.toList());
return new HoodieDeleteBlock(
hoodieRecords.stream().map(
r -> Pair.of(DeleteRecord.create(
- r.getKey(), r.getOrderingValue(schema, props)), -1L))
+ r.getKey(), r.getOrderingValue(schema, props)),
r.getCurrentLocation().getPosition()))
.collect(Collectors.toList()),
- false,
+ writePositions,
header
);
}
@@ -285,7 +293,9 @@ public class HoodieFileSliceTestUtils {
String fileId,
String logInstantTime,
int version,
- HoodieLogBlock.HoodieLogBlockType blockType
+ HoodieLogBlock.HoodieLogBlockType blockType,
+ boolean writePositions,
+ Map<String, Long> keyToPositionMap
) throws InterruptedException, IOException {
try (HoodieLogFormat.Writer writer =
HoodieLogFormat.newWriterBuilder()
@@ -301,11 +311,11 @@ public class HoodieFileSliceTestUtils {
if (blockType != DELETE_BLOCK) {
HoodieDataBlock dataBlock = getDataBlock(
- blockType, records, header, new StoragePath(logFilePath));
+ blockType, records, header, new StoragePath(logFilePath),
writePositions, keyToPositionMap);
writer.appendBlock(dataBlock);
} else {
HoodieDeleteBlock deleteBlock = getDeleteBlock(
- records, header, schema, PROPERTIES);
+ records, header, schema, PROPERTIES, writePositions,
keyToPositionMap);
writer.appendBlock(deleteBlock);
}
}
@@ -328,6 +338,7 @@ public class HoodieFileSliceTestUtils {
HoodieBaseFile baseFile = null;
List<HoodieLogFile> logFiles = new ArrayList<>();
+ Map<String, Long> keyToPositionMap = new HashMap<>();
// Generate a base file with records.
DataGenerationPlan baseFilePlan = plans.get(0);
if (!baseFilePlan.getRecordKeys().isEmpty()) {
@@ -339,6 +350,9 @@ public class HoodieFileSliceTestUtils {
records,
schema,
baseFilePlan.getInstantTime());
+ for (int i = 0; i < baseFilePlan.getRecordKeys().size(); i++) {
+ keyToPositionMap.put(baseFilePlan.getRecordKeys().get(i), (long) i);
+ }
}
// Rest of plans are for log files.
@@ -361,7 +375,9 @@ public class HoodieFileSliceTestUtils {
fileId,
logFilePlan.getInstantTime(),
i,
- blockType));
+ blockType,
+ logFilePlan.getWritePositions(),
+ keyToPositionMap));
}
// Assemble the FileSlice finally.
@@ -391,6 +407,7 @@ public class HoodieFileSliceTestUtils {
.withPartitionPath(partitionPath)
.withTimeStamp(timestamp)
.withInstantTime(baseInstantTime)
+ .withWritePositions(false)
.build();
plans.add(baseFilePlan);
@@ -412,6 +429,7 @@ public class HoodieFileSliceTestUtils {
List<Long> timestamps,
List<DataGenerationPlan.OperationType> operationTypes,
List<String> instantTimes,
+ List<Boolean> shouldWritePositions,
String basePath,
String partitionPath,
String fileId
@@ -425,6 +443,7 @@ public class HoodieFileSliceTestUtils {
.withRecordKeys(keys)
.withTimeStamp(timestamps.get(i))
.withInstantTime(instantTimes.get(i))
+ .withWritePositions(shouldWritePositions.get(i))
.build());
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 4ec1c0556b0..c6f949f56a7 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -39,11 +39,16 @@ import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
@@ -103,6 +108,7 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
INSERT, DELETE, UPDATE, DELETE, UPDATE);
instantTimes = Arrays.asList(
"001", "002", "003", "004", "005");
+ shouldWritePositions = Arrays.asList(false, false, false, false, false);
}
@BeforeEach
@@ -115,9 +121,11 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
setUpMockCommits();
}
- @Test
- public void testWithOneLogFile() throws IOException, InterruptedException {
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithOneLogFile(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2,
useRecordPositions);
List<String> leftKeysExpected =
Arrays.asList("6", "7", "8", "9", "10");
List<String> leftKeysActual = new ArrayList<>();
@@ -129,9 +137,11 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftKeysExpected, leftKeysActual);
}
- @Test
- public void testWithTwoLogFiles() throws IOException, InterruptedException {
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithTwoLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3,
useRecordPositions);
List<String> leftKeysExpected =
Arrays.asList("1", "3", "6", "7", "8", "9", "10");
List<String> leftKeysActual = new ArrayList<>();
@@ -143,9 +153,11 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftKeysExpected, leftKeysActual);
}
- @Test
- public void testWithThreeLogFiles() throws IOException, InterruptedException
{
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithThreeLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions, useRecordPositions);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4,
useRecordPositions);
List<String> leftKeysExpected =
Arrays.asList("1", "3", "7", "9", "10");
List<String> leftKeysActual = new ArrayList<>();
@@ -171,6 +183,32 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftKeysExpected, leftKeysActual);
}
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testPositionMergeFallback(boolean log1haspositions, boolean
log2haspositions,
+ boolean log3haspositions, boolean
log4haspositions) throws IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(true, log1haspositions,
log2haspositions, log3haspositions, log4haspositions);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+ List<String> leftKeysExpected =
+ Arrays.asList("1", "3", "5", "7", "9");
+ List<String> leftKeysActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ leftKeysActual.add(iterator.next()
+ .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+ .toString());
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ }
+
+ //generate all possible combos of 4 booleans
+ private static Stream<Arguments> testArgs() {
+ Stream.Builder<Arguments> b = Stream.builder();
+ for (int i = 0; i < 16; i++) {
+ b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0, (i / 4) % 2 == 0, (i /
8) % 2 == 0));
+ }
+ return b.build();
+ }
+
/**
* This merger is designed to save records whose record key is odd.
* That means, if the record is not a delete record, and its record
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index 3b3fc3c4359..237bf527be0 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -35,11 +35,16 @@ import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
@@ -99,6 +104,7 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
// Specify the instant time for each file.
instantTimes = Arrays.asList(
"001", "002", "003", "004", "005");
+ shouldWritePositions = Arrays.asList(false, false, false, false, false);
}
@BeforeEach
@@ -111,10 +117,12 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
setUpMockCommits();
}
- @Test
- public void testWithOneLogFile() throws IOException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithOneLogFile(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions);
// The FileSlice contains a base file and a log file.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2,
useRecordPositions);
List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
@@ -128,10 +136,12 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
- @Test
- public void testWithTwoLogFiles() throws IOException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithTwoLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions);
// The FileSlice contains a base file and two log files.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3,
useRecordPositions);
List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
@@ -145,10 +155,12 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
- @Test
- public void testWithThreeLogFiles() throws IOException, InterruptedException
{
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithThreeLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions, useRecordPositions);
// The FileSlice contains a base file and three log files.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4,
useRecordPositions);
List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
@@ -178,4 +190,33 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftKeysExpected, leftKeysActual);
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testPositionMergeFallback(boolean log1haspositions, boolean
log2haspositions,
+ boolean log3haspositions, boolean
log4haspositions) throws IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(true, log1haspositions,
log2haspositions, log3haspositions, log4haspositions);
+ // The FileSlice contains a base file and three log files.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8",
"9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L,
2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+
+ //generate all possible combos of 4 booleans
+ private static Stream<Arguments> testArgs() {
+ Stream.Builder<Arguments> b = Stream.builder();
+ for (int i = 0; i < 16; i++) {
+ b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0, (i / 4) % 2 == 0, (i /
8) % 2 == 0));
+ }
+ return b.build();
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
similarity index 63%
copy from
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
copy to
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
index 3b3fc3c4359..66b5f11460a 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java
@@ -21,12 +21,12 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestMerger;
import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
import
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
-import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -35,11 +35,16 @@ import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
@@ -48,17 +53,15 @@ import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.Operati
import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestEventTimeMerging extends HoodieFileGroupReaderTestHarness {
+public class TestOverwriteWithLatestMerger extends
HoodieFileGroupReaderTestHarness {
@BeforeAll
public static void setUp() throws IOException {
- // Create dedicated merger to avoid current delete logic holes.
- // TODO: Unify delete logic (HUDI-7240).
- HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger();
+ HoodieRecordMerger merger = new OverwriteWithLatestMerger();
readerContext = new HoodieTestReaderContext(
Option.of(merger),
- Option.of(HoodieRecordTestPayload.class.getName()));
+ Option.of(OverwriteWithLatestAvroPayload.class.getName()));
properties.setProperty(
- HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name());
+ HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.OVERWRITE_WITH_LATEST.name());
// -------------------------------------------------------------
// The test logic is as follows:
@@ -71,16 +74,14 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
// Current existing keys: [6, 7, 8, 9, 10]
// 3. After adding the second log file,
// we tried to add the records with keys from 1 to 3 back,
- // but we cannot since their ordering value is 1 < 3.
- // Current existing keys: [6, 7, 8, 9, 10]
+ // Current existing keys: [1, 2, 3, 6, 7, 8, 9, 10]
// 4. After adding the third log file,
// we tried to delete records with keys from 6 to 8,
// but we cannot since their ordering value is 1 < 2.
- // Current existing keys: [6, 7, 8, 9, 10]
+ // Current existing keys: [1, 2, 3, 9, 10]
// 5. After adding the fourth log file,
- // we tried to add the records with keys from 1 to 2 back,
- // and it worked since their ordering value is 4 > 3.
- // Current existing keys: [1, 2, 6, 7, 8, 9, 10]
+ // we tried to add the records with keys from 2 to 4
+ // Current existing keys: [1, 2, 3, 4, 9, 10]
// -------------------------------------------------------------
// Specify the key column values for each file.
@@ -89,7 +90,7 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
new HoodieFileSliceTestUtils.KeyRange(1, 5),
new HoodieFileSliceTestUtils.KeyRange(1, 3),
new HoodieFileSliceTestUtils.KeyRange(6, 8),
- new HoodieFileSliceTestUtils.KeyRange(1, 2));
+ new HoodieFileSliceTestUtils.KeyRange(2, 4));
// Specify the value of `timestamp` column for each file.
timestamps = Arrays.asList(
2L, 3L, 1L, 1L, 4L);
@@ -99,11 +100,12 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
// Specify the instant time for each file.
instantTimes = Arrays.asList(
"001", "002", "003", "004", "005");
+ shouldWritePositions = Arrays.asList(false, false, false, false, false);
}
@BeforeEach
public void initialize() throws Exception {
- setTableName(TestEventTimeMerging.class.getName());
+ setTableName(TestOverwriteWithLatestMerger.class.getName());
initPath(tableName);
initMetaClient();
initTestDataGenerator(new String[]{PARTITION_PATH});
@@ -111,10 +113,12 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
setUpMockCommits();
}
- @Test
- public void testWithOneLogFile() throws IOException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithOneLogFile(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions);
// The FileSlice contains a base file and a log file.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2,
useRecordPositions);
List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
@@ -128,12 +132,14 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
- @Test
- public void testWithTwoLogFiles() throws IOException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithTwoLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions);
// The FileSlice contains a base file and two log files.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
- List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
- List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3,
useRecordPositions);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "6", "7",
"8", "9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L, 2L,
2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
List<Long> leftTimestampsActual = new ArrayList<>();
while (iterator.hasNext()) {
@@ -145,12 +151,14 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
- @Test
- public void testWithThreeLogFiles() throws IOException, InterruptedException
{
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testWithThreeLogFiles(boolean useRecordPositions) throws
IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(useRecordPositions,
useRecordPositions, useRecordPositions, useRecordPositions);
// The FileSlice contains a base file and three log files.
- ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
- List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
- List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4,
useRecordPositions);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
List<Long> leftTimestampsActual = new ArrayList<>();
while (iterator.hasNext()) {
@@ -166,8 +174,28 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
public void testWithFourLogFiles() throws IOException, InterruptedException {
// The FileSlice contains a base file and three log files.
ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
- List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8",
"9", "10");
- List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L,
2L);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9",
"10");
+ List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void testPositionMergeFallback(boolean log1haspositions, boolean
log2haspositions,
+ boolean log3haspositions, boolean
log4haspositions) throws IOException, InterruptedException {
+ shouldWritePositions = Arrays.asList(true, log1haspositions,
log2haspositions, log3haspositions, log4haspositions);
+ // The FileSlice contains a base file and three log files.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9",
"10");
+ List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
List<String> leftKeysActual = new ArrayList<>();
List<Long> leftTimestampsActual = new ArrayList<>();
while (iterator.hasNext()) {
@@ -178,4 +206,13 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
assertEquals(leftKeysExpected, leftKeysActual);
assertEquals(leftTimestampsExpected, leftTimestampsActual);
}
+
+ //generate all possible combos of 4 booleans
+ private static Stream<Arguments> testArgs() {
+ Stream.Builder<Arguments> b = Stream.builder();
+ for (int i = 0; i < 16; i++) {
+ b.add(Arguments.of(i % 2 == 0, (i / 2) % 2 == 0, (i / 4) % 2 == 0, (i /
8) % 2 == 0));
+ }
+ return b.build();
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index a2ef6ae3906..ac9fa2d3130 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -54,6 +54,7 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
protected static List<DataGenerationPlan.OperationType> operationTypes;
// Set the instantTime for each record set.
protected static List<String> instantTimes;
+ protected static List<Boolean> shouldWritePositions;
// Environmental variables.
protected static StorageConfiguration<?> storageConf;
@@ -92,6 +93,11 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles)
throws IOException, InterruptedException {
+ return getFileGroupIterator(numFiles, false);
+ }
+
+ protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles,
boolean shouldReadPositions)
+ throws IOException, InterruptedException {
assert (numFiles >= 1 && numFiles <= keyRanges.size());
Option<FileSlice> fileSliceOpt =
@@ -101,6 +107,7 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
timestamps.subList(0, numFiles),
operationTypes.subList(0, numFiles),
instantTimes.subList(0, numFiles),
+ shouldWritePositions.subList(0, numFiles),
basePath,
PARTITION_PATH,
FILE_ID
@@ -114,7 +121,7 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
basePath,
"1000", // Not used internally.
AVRO_SCHEMA,
- false,
+ shouldReadPositions,
0L,
Long.MAX_VALUE,
properties,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index c98a9a9c0f4..5fa0cc8d1ff 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -27,9 +27,9 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils,
SparkKeyGeneratorInterface}
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.util.StringUtils
import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
@@ -123,6 +123,8 @@ object HoodieCreateRecordUtils {
val consistentLogicalTimestampEnabled = parameters.getOrElse(
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
+ val precombine = config.getString(PRECOMBINE_FIELD)
+ val precombineEmpty = StringUtils.isNullOrEmpty(precombine)
// handle dropping partition columns
it.map { avroRec =>
@@ -140,8 +142,8 @@ object HoodieCreateRecordUtils {
avroRecWithoutMeta
}
- val hoodieRecord = if (shouldCombine) {
- val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec,
config.getString(PRECOMBINE_FIELD),
+ val hoodieRecord = if (shouldCombine && !precombineEmpty) {
+ val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec,
precombine,
false,
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(processedRecord, orderingVal,
hoodieKey,
config.getString(PAYLOAD_CLASS_NAME), recordLocation)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index f61db4ee247..c4ff0e7b814 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
@@ -66,8 +65,6 @@ import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrin
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodiePositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupReaderOnSpark {
private final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF);
@@ -76,12 +73,12 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
private String partitionPath;
- public void prepareBuffer(boolean useCustomMerger) throws Exception {
+ public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
Map<String, String> writeConfigs = new HashMap<>();
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
- writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.OVERWRITE_WITH_LATEST)
? "" : "timestamp");
writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
@@ -91,6 +88,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
writeConfigs.put("hoodie.compact.inline", "false");
writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
+ writeConfigs.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
getRecordPayloadForMergeMode(mergeMode));
commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
String[] partitionPaths = dataGen.getPartitionPaths();
@@ -111,11 +109,22 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
ctx.setHasBootstrapBaseFile(false);
ctx.setHasLogFiles(true);
ctx.setNeedsBootstrapMerge(false);
- ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new
HoodieSparkRecordMerger());
+ switch (mergeMode) {
+ case CUSTOM:
+ ctx.setRecordMerger(new CustomMerger());
+ break;
+ case EVENT_TIME_ORDERING:
+ ctx.setRecordMerger(new HoodieSparkRecordMerger());
+ break;
+ case OVERWRITE_WITH_LATEST:
+ default:
+ ctx.setRecordMerger(new OverwriteWithLatestSparkMerger());
+ break;
+ }
ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx,
avroSchema, avroSchema,
Option.empty(), metaClient.getTableConfig()));
TypedProperties props = new TypedProperties();
- props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name());
+ props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), mergeMode.name());
buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
ctx,
metaClient,
@@ -170,7 +179,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithPositions() throws Exception {
- prepareBuffer(false);
+ prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
@@ -180,7 +189,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithCustomMerger() throws Exception {
- prepareBuffer(true);
+ prepareBuffer(RecordMergeMode.CUSTOM);
HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
@@ -189,11 +198,10 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithoutPositions() throws Exception {
- prepareBuffer(false);
+ prepareBuffer(RecordMergeMode.OVERWRITE_WITH_LATEST);
HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
- Exception exception = assertThrows(
- HoodieValidationException.class, () ->
buffer.processDeleteBlock(deleteBlock));
- assertTrue(exception.getMessage().contains("No record position info is
found"));
+ buffer.processDeleteBlock(deleteBlock);
+ assertEquals(50, buffer.getLogRecords().size());
}
public static class CustomMerger implements HoodieRecordMerger {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index e20104858b6..c2dc9091f45 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,14 +21,14 @@ package org.apache.hudi.common.table.read
import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord,
HoodieRecordMerger, OverwriteWithLatestAvroPayload, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.storage.StorageConfiguration
-import org.apache.hudi.{HoodieSparkRecordMerger, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
-
+import org.apache.hudi.{HoodieSparkRecordMerger,
OverwriteWithLatestSparkMerger, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.common.config.RecordMergeMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils,
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
@@ -38,7 +38,6 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import java.util
-
import scala.collection.JavaConverters._
/**
@@ -48,6 +47,8 @@ import scala.collection.JavaConverters._
class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[InternalRow] with SparkAdapterSupport {
var spark: SparkSession = _
+ var customPayloadName: String = classOf[CustomPayloadForTesting].getName
+
@BeforeEach
def setup() {
val sparkConf = new SparkConf
@@ -121,4 +122,12 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def getComparableUTF8String(value: String): Comparable[_] = {
UTF8String.fromString(value)
}
+
+ override def getRecordPayloadForMergeMode(mergeMode: RecordMergeMode):
String = {
+ mergeMode match {
+ case RecordMergeMode.EVENT_TIME_ORDERING =>
classOf[DefaultHoodieRecordPayload].getName
+ case RecordMergeMode.OVERWRITE_WITH_LATEST =>
classOf[OverwriteWithLatestAvroPayload].getName
+ case RecordMergeMode.CUSTOM => customPayloadName
+ }
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 9778301f2f6..1581eb873b8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -344,9 +344,7 @@ public class StreamSync implements Serializable, Closeable {
.setConf(HadoopFSUtils.getStorageConfWithCopy(conf))
.setBasePath(cfg.targetBasePath)
.setPayloadClassName(cfg.payloadClassName)
- .setRecordMergerStrategy(
-
props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
- HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
+ .setRecordMergerStrategy(null)
.build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
@@ -440,7 +438,7 @@ public class StreamSync implements Serializable, Closeable {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopFSUtils.getStorageConfWithCopy(conf))
.setBasePath(cfg.targetBasePath)
-
.setRecordMergerStrategy(props.getProperty(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key(),
HoodieWriteConfig.RECORD_MERGER_STRATEGY.defaultValue()))
+ .setRecordMergerStrategy(null)
.setTimeGeneratorConfig(HoodieTimeGeneratorConfig.newBuilder().fromProperties(props).withPath(cfg.targetBasePath).build())
.build();
String instantTime = metaClient.createNewInstantTime();