This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c574654aac0 [HUDI-6368] Strength avro record merger (#8953)
c574654aac0 is described below
commit c574654aac0f10f5e14673c5112dbd21d1aa2bd5
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 14 20:28:24 2023 +0800
[HUDI-6368] Strength avro record merger (#8953)
* move the merging operation decision (preCombining or base/delta merge) to
the invoker, it is deterministic when initializing the record merger
* expose the operation mode as an interface so that some of the merger like
HoodieSparkValidateDuplicateKeyRecordMerger can have custom logic
* pre register the object cache of the record merger to mediate the
complexity
* fix the equility check of payload within FlinkWriteHelper
* add default merger type for merged/unmerged log scanner
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 23 ++++----
.../cli/commands/TestHoodieLogFileCommand.java | 5 +-
.../org/apache/hudi/index/HoodieIndexUtils.java | 13 ++++-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 2 +-
.../hudi/table/action/commit/BaseWriteHelper.java | 13 +----
.../table/action/commit/HoodieWriteHelper.java | 2 +-
.../hudi/table/action/commit/FlinkWriteHelper.java | 4 +-
.../hudi/table/action/commit/JavaWriteHelper.java | 2 +-
.../bootstrap/ParquetBootstrapMetadataHandler.java | 7 +--
.../TestHoodieClientOnCopyOnWriteStorage.java | 12 ++--
.../TestHoodieClientOnMergeOnReadStorage.java | 4 --
.../hudi/common/model/HoodieAvroRecordMerger.java | 68 +++++-----------------
.../model/HoodiePreCombineAvroRecordMerger.java | 56 ++++++++++++++++++
.../hudi/common/model/OperationModeAwareness.java | 34 +++++++++++
.../table/log/AbstractHoodieLogRecordReader.java | 4 +-
.../table/log/HoodieMergedLogRecordScanner.java | 6 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 6 +-
.../apache/hudi/common/util/HoodieRecordUtils.java | 20 ++++---
.../hudi/io/storage/HoodieAvroParquetReader.java | 2 -
.../metadata/HoodieMetadataLogRecordReader.java | 4 +-
.../common/functional/TestHoodieLogFormat.java | 20 ++-----
...rdUtilsTest.java => TestHoodieRecordUtils.java} | 2 +-
.../examples/quickstart/TestQuickstartData.java | 3 -
.../org/apache/hudi/sink/StreamWriteFunction.java | 3 +-
.../apache/hudi/table/format/TestInputFormat.java | 38 ++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 14 ++++-
.../realtime/RealtimeCompactedRecordReader.java | 9 +--
.../realtime/RealtimeUnmergedRecordReader.java | 6 +-
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 4 +-
...odieSparkValidateDuplicateKeyRecordMerger.scala | 23 +++-----
.../ShowHoodieLogFileRecordsProcedure.scala | 5 +-
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 3 +-
.../spark/sql/hudi/TestHoodieOptionConfig.scala | 3 +-
33 files changed, 237 insertions(+), 183 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 187e24539bb..82f9c1a6468 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -18,12 +18,6 @@
package org.apache.hudi.cli.commands;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
@@ -31,7 +25,6 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -45,17 +38,21 @@ import
org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
-import scala.Tuple2;
-import scala.Tuple3;
import java.io.IOException;
import java.util.ArrayList;
@@ -68,6 +65,9 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import scala.Tuple2;
+import scala.Tuple3;
+
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
/**
@@ -221,7 +221,6 @@ public class HoodieLogFileCommand {
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
for (HoodieRecord hoodieRecord : scanner) {
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index d964a127683..25298876c42 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -28,9 +28,8 @@ import
org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -40,7 +39,6 @@ import
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -232,7 +230,6 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 89cbf8d9a1e..46ad232022d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -230,7 +231,12 @@ public class HoodieIndexUtils {
/**
* Merge the incoming record with the matching existing record loaded via
{@link HoodieMergedReadHandle}. The existing record is the latest version in
the table.
*/
- private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(HoodieRecord<R> incoming, HoodieRecord<R>
existing, Schema writeSchema, HoodieWriteConfig config) throws IOException {
+ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(
+ HoodieRecord<R> incoming,
+ HoodieRecord<R> existing,
+ Schema writeSchema,
+ HoodieWriteConfig config,
+ HoodieRecordMerger recordMerger) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
// prepend the hoodie meta fields as the incoming record does not have them
@@ -239,7 +245,7 @@ public class HoodieIndexUtils {
// after prepend the meta fields, convert the record back to the original
payload
HoodieRecord incomingWithMetaFields = incomingPrepended
.wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(),
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false,
Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = config.getRecordMerger()
+ Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
.merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
if (mergeResult.isPresent()) {
// the merged record needs to be converted back to the original payload
@@ -270,6 +276,7 @@ public class HoodieIndexUtils {
// merged existing records with current locations being set
HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(partitionLocations, config, hoodieTable);
+ final HoodieRecordMerger recordMerger = config.getRecordMerger();
HoodieData<HoodieRecord<R>> taggedUpdatingRecords =
updatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
.leftOuterJoin(existingRecords.mapToPair(r ->
Pair.of(r.getRecordKey(), r)))
.values().flatMap(entry -> {
@@ -286,7 +293,7 @@ public class HoodieIndexUtils {
return
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
Option.of(existing.getCurrentLocation()))).iterator();
}
- Option<HoodieRecord<R>> mergedOpt =
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config);
+ Option<HoodieRecord<R>> mergedOpt =
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config,
recordMerger);
if (!mergedOpt.isPresent()) {
// merge resulted in delete: force tag the incoming to the old
partition
return
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
Option.of(existing.getCurrentLocation()))).iterator();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 416e3ebfe5e..cfe11b1fd8d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -456,7 +456,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
long oldNumWrites = 0;
- try (HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
oldFilePath)) {
+ try (HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data
validation", e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 6e1c1f8d9ff..8d8978927f6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -21,9 +21,9 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -32,7 +32,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.time.Duration;
import java.time.Instant;
-import java.util.Properties;
public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
@@ -92,15 +91,9 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
- HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+ HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
}
- public I deduplicateRecords(I records, HoodieIndex<?, ?> index, int
parallelism, String schema, Properties props, HoodieRecordMerger merger) {
- TypedProperties updatedProps =
HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(props);
- return doDeduplicateRecords(records, index, parallelism, schema,
updatedProps, merger);
- }
-
- protected abstract I doDeduplicateRecords(
- I records, HoodieIndex<?, ?> index, int parallelism, String schema,
TypedProperties props, HoodieRecordMerger merger);
+ public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 47793d7155e..d7640c28e50 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -56,7 +56,7 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
}
@Override
- protected HoodieData<HoodieRecord<T>> doDeduplicateRecords(
+ public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger
merger) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 6cdb17715de..b6f8541f50e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -91,7 +91,7 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, List<HoodieRecord
}
@Override
- protected List<HoodieRecord<T>> doDeduplicateRecords(
+ public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
// If index used is global, then records are expected to differ in their
partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
@@ -110,7 +110,7 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, List<HoodieRecord
// we cannot allow the user to change the key or partitionPath, since
that will affect
// everything
// so pick it from one of the records.
- boolean choosePrev = rec1 == reducedRecord;
+ boolean choosePrev = rec1.getData() == reducedRecord.getData();
HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
HoodieOperation operation = choosePrev ? rec1.getOperation() :
rec2.getOperation();
HoodieRecord<T> hoodieRecord = reducedRecord.newInstance(reducedKey,
operation);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 04befe4ea16..beb1d14f329 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -58,7 +58,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T,
List<HoodieRecord<T
}
@Override
- protected List<HoodieRecord<T>> doDeduplicateRecords(
+ public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords =
records.stream().map(record -> {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 8ef6ab8f5cf..2c3ddfdcda2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -23,7 +23,6 @@ import
org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -78,16 +77,16 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
KeyGeneratorInterface keyGenerator,
String partitionPath,
Schema schema) throws Exception {
- HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+ HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
+ HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(recordType)
.getFileReader(table.getHadoopConf(), sourceFilePath);
HoodieExecutor<Void> executor = null;
try {
Function<HoodieRecord, HoodieRecord> transformer = record -> {
String recordKey = record.getRecordKey(schema,
Option.of(keyGenerator));
- return createNewMetadataBootstrapRecord(recordKey, partitionPath,
recordMerger.getRecordType())
+ return createNewMetadataBootstrapRecord(recordKey, partitionPath,
recordType)
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying)
buffer we get a clean copy of
// it since these records will be inserted into the queue
later.
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index f42b799bff5..8e26fcab77a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -46,15 +46,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -80,7 +79,6 @@ import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -487,11 +485,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
int dedupParallelism = records.getNumPartitions() + 100;
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
(HoodieData<HoodieRecord<RawTripTestPayload>>)
HoodieWriteHelper.newInstance()
- .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(),
HoodiePreCombineAvroRecordMerger.INSTANCE);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs =
dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(),
dedupedRecsRdd.getNumPartitions());
assertEquals(1, dedupedRecs.size());
@@ -503,7 +500,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
when(index.isGlobal()).thenReturn(false);
dedupedRecsRdd =
(HoodieData<HoodieRecord<RawTripTestPayload>>)
HoodieWriteHelper.newInstance()
- .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(),
HoodiePreCombineAvroRecordMerger.INSTANCE);
dedupedRecs = dedupedRecsRdd.collectAsList();
assertEquals(records.getNumPartitions(),
dedupedRecsRdd.getNumPartitions());
assertEquals(2, dedupedRecs.size());
@@ -555,11 +552,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
int dedupParallelism = records.getNumPartitions() + 100;
HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
(HoodieData<HoodieRecord<RawTripTestPayload>>)
HoodieWriteHelper.newInstance()
- .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(),
HoodiePreCombineAvroRecordMerger.INSTANCE);
List<HoodieRecord<RawTripTestPayload>> dedupedRecs =
dedupedRecsRdd.collectAsList();
assertEquals(dedupedRecs.get(0).getOperation(),
recordThree.getOperation());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 4073e63bd12..90a529e33a2 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -21,7 +21,6 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -35,7 +34,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -448,7 +446,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
.withLatestInstantTime(instant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner.scan(true);
List<String> prevInstants = scanner.getValidBlockInstants();
@@ -462,7 +459,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
.withLatestInstantTime(currentInstant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner2.scan(true);
List<String> currentInstants = scanner2.getValidBlockInstants();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index 252e11135af..a7bba4ebd9c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -21,19 +21,21 @@ package org.apache.hudi.common.model;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
import java.util.Properties;
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-public class HoodieAvroRecordMerger implements HoodieRecordMerger {
+/**
+ * Record merger for Hoodie avro record.
+ *
+ * <p>It should only be used for base record from disk to merge with incoming
record.
+ */
+public class HoodieAvroRecordMerger implements HoodieRecordMerger,
OperationModeAwareness {
+ public static final HoodieAvroRecordMerger INSTANCE = new
HoodieAvroRecordMerger();
@Override
public String getMergingStrategy() {
@@ -42,27 +44,8 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger {
@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.AVRO);
- ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.AVRO);
- Config.LegacyOperationMode legacyOperatingMode =
Config.LegacyOperationMode.valueOf(
- props.getString(Config.LEGACY_OPERATING_MODE.key(),
Config.LEGACY_OPERATING_MODE.defaultValue()));
-
- switch (legacyOperatingMode) {
- case PRE_COMBINING:
- HoodieRecord res = preCombine(older, newer, newSchema, props);
- if (res == older) {
- return Option.of(Pair.of(res, oldSchema));
- } else {
- return Option.of(Pair.of(res, newSchema));
- }
-
- case COMBINING:
- return combineAndGetUpdateValue(older, newer, newSchema, props)
- .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord)
r).getData()).getSchema()));
-
- default:
- throw new UnsupportedOperationException(String.format("Unsupported
legacy operating mode (%s)", legacyOperatingMode));
- }
+ return combineAndGetUpdateValue(older, newer, newSchema, props)
+ .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
}
@Override
@@ -70,40 +53,17 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger {
return HoodieRecordType.AVRO;
}
- private HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer,
Schema schema, Properties props) {
- HoodieRecordPayload payload = unsafeCast(((HoodieAvroRecord)
newer).getData().preCombine(((HoodieAvroRecord) older).getData(), schema,
props));
- return new HoodieAvroRecord(newer.getKey(), payload, newer.getOperation());
- }
-
- private Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException {
+ private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException {
Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema,
props).map(HoodieAvroIndexedRecord::getData);
if (!previousAvroData.isPresent()) {
return Option.empty();
}
- return ((HoodieAvroRecord)
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props)
- .map(combinedAvroPayload -> new
HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
+ return ((HoodieAvroRecord)
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema,
props);
}
- public static class Config {
-
- public enum LegacyOperationMode {
- PRE_COMBINING,
- COMBINING
- }
-
- public static ConfigProperty<String> LEGACY_OPERATING_MODE =
-
ConfigProperty.key("hoodie.datasource.write.record.merger.legacy.operation")
- .defaultValue(LegacyOperationMode.COMBINING.name())
- .withDocumentation("Controls the mode of the merging operation
performed by `HoodieAvroRecordMerger`. "
- + "This is required to maintain backward-compatibility w/ the
existing semantic of `HoodieRecordPayload` "
- + "implementations providing `preCombine` and
`combineAndGetUpdateValue` methods.");
-
- public static TypedProperties
withLegacyOperatingModePreCombining(Properties props) {
- TypedProperties newProps = new TypedProperties();
- newProps.putAll(props);
- newProps.setProperty(Config.LEGACY_OPERATING_MODE.key(),
Config.LegacyOperationMode.PRE_COMBINING.name());
- return newProps;
- }
+ @Override
+ public HoodieRecordMerger asPreCombiningMode() {
+ return HoodiePreCombineAvroRecordMerger.INSTANCE;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
new file mode 100644
index 00000000000..904f6847441
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePreCombineAvroRecordMerger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+/**
+ * Record merger for Hoodie avro record.
+ *
+ * <p>It should only be used for deduplication among incoming records.
+ */
+public class HoodiePreCombineAvroRecordMerger extends HoodieAvroRecordMerger {
+ public static final HoodiePreCombineAvroRecordMerger INSTANCE = new
HoodiePreCombineAvroRecordMerger();
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ return Option.of(preCombine(older, oldSchema, newer, newSchema, props));
+ }
+
+ @SuppressWarnings("rawtypes, unchecked")
+ private Pair<HoodieRecord, Schema> preCombine(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) {
+ HoodieRecordPayload newerPayload = ((HoodieAvroRecord) newer).getData();
+ HoodieRecordPayload olderPayload = ((HoodieAvroRecord) older).getData();
+ HoodieRecordPayload payload = newerPayload.preCombine(olderPayload,
newSchema, props);
+ if (payload == olderPayload) {
+ return Pair.of(older, oldSchema);
+ } else if (payload == newerPayload) {
+ return Pair.of(newer, newSchema);
+ } else {
+ HoodieRecord mergedRecord = new HoodieAvroRecord(newer.getKey(),
payload, newer.getOperation());
+ return Pair.of(mergedRecord, newSchema);
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
new file mode 100644
index 00000000000..95d12dfc149
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OperationModeAwareness.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+/**
+ * In some cases, the Hudi engine needs to know what operation mode the
current merging belongs to.
+ * {@link HoodieRecordMerger} that wants to distinguish the operation mode
should implement this interface.
+ */
+public interface OperationModeAwareness {
+ /**
+ * Specifies the legacy operation mode as preCombining.
+ *
+ * <p>The preCombining takes place in two cases:
+ * i). In memory records merging during data ingestion;
+ * ii). Log records merging for MOR reader.
+ */
+ HoodieRecordMerger asPreCombiningMode();
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 7304cc3d1fb..1e5bef103f4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
@@ -60,7 +59,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -167,7 +165,7 @@ public abstract class AbstractHoodieLogRecordReader {
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
// Log scanner merge log with precombine
- TypedProperties props =
HoodieAvroRecordMerger.Config.withLegacyOperatingModePreCombining(new
Properties());
+ TypedProperties props = new TypedProperties();
if (this.preCombineField != null) {
props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
this.preCombineField);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index ef4eec1ba07..c150ba9a3e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -333,7 +335,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
// By default, we're doing a full-scan
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
- private HoodieRecordMerger recordMerger;
+ private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -436,7 +438,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
@Override
public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
- this.recordMerger = recordMerger;
+ this.recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 726172e5ee0..51044c0814c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -19,9 +19,11 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -106,7 +108,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
// specific configurations
private LogRecordScannerCallback callback;
private boolean enableOptimizedLogBlocksScan;
- private HoodieRecordMerger recordMerger;
+ private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
@@ -174,7 +176,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
@Override
public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
- this.recordMerger = recordMerger;
+ this.recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index f7790837d9c..4bd7ffccbab 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.OperationModeAwareness;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -39,9 +40,12 @@ import java.util.Objects;
* A utility class for HoodieRecord.
*/
public class HoodieRecordUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieRecordUtils.class);
private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieRecordUtils.class);
+ static {
+ INSTANCE_CACHE.put(HoodieAvroRecordMerger.class.getName(),
HoodieAvroRecordMerger.INSTANCE);
+ }
/**
* Instantiate a given class with a record merge.
@@ -71,7 +75,7 @@ public class HoodieRecordUtils {
public static HoodieRecordMerger createRecordMerger(String basePath,
EngineType engineType,
List<String> mergerClassList, String recordMergerStrategy) {
if (mergerClassList.isEmpty() ||
HoodieTableMetadata.isMetadataTable(basePath)) {
- return
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
+ return HoodieAvroRecordMerger.INSTANCE;
} else {
return mergerClassList.stream()
.map(clazz -> {
@@ -86,7 +90,7 @@ public class HoodieRecordUtils {
.filter(merger ->
merger.getMergingStrategy().equals(recordMergerStrategy))
.filter(merger -> recordTypeCompatibleEngine(merger.getRecordType(),
engineType))
.findFirst()
-
.orElse(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
+ .orElse(HoodieAvroRecordMerger.INSTANCE);
}
}
@@ -105,10 +109,10 @@ public class HoodieRecordUtils {
}
public static boolean recordTypeCompatibleEngine(HoodieRecordType
recordType, EngineType engineType) {
- if (engineType == EngineType.SPARK && recordType ==
HoodieRecordType.SPARK) {
- return true;
- } else {
- return false;
- }
+ return engineType == EngineType.SPARK && recordType ==
HoodieRecordType.SPARK;
+ }
+
+ public static HoodieRecordMerger mergerToPreCombineMode(HoodieRecordMerger
merger) {
+ return merger instanceof OperationModeAwareness ?
((OperationModeAwareness) merger).asPreCombiningMode() : merger;
}
}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index d9eb0c54a84..0b97d052cbb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -47,8 +47,6 @@ import static
org.apache.hudi.common.util.TypeUtils.unsafeCast;
/**
* {@link HoodieFileReader} implementation for parquet format.
- *
- * @param <R> Record implementation that permits field access by integer index.
*/
public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index befb518faca..18c389a1d1b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.metadata;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
@@ -136,10 +135,9 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
new HoodieMergedLogRecordScanner.Builder()
.withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
- // NOTE: Merging of Metadata Table's records is currently handled
using {@code HoodieAvroRecordMerger}
+ // NOTE: Merging of Metadata Table's records is currently handled
using {@code HoodiePreCombineAvroRecordMerger}
// for compatibility purposes; In the future it {@code
HoodieMetadataPayload} semantic
// will be migrated to its own custom instance of {@code
RecordMerger}
- .withRecordMerger(new HoodieAvroRecordMerger())
.withReadBlocksLazily(true)
.withReverseReader(false)
.withOperationField(false);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index af4e5d72d85..5c946ec9c5a 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
@@ -55,7 +54,6 @@ import
org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -663,7 +661,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -710,7 +707,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withForceFullScan(false)
.build();
@@ -801,7 +797,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withForceFullScan(false)
.build();
@@ -1300,7 +1295,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
@@ -1348,7 +1342,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -1377,9 +1370,9 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
readBlocksLazily,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled,
+ boolean
readBlocksLazily,
+ boolean
enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1458,7 +1451,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1573,7 +1565,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
@@ -2130,7 +2121,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(true)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records
from scanner");
final List<String> readKeys = new ArrayList<>();
@@ -2220,7 +2210,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2),
scanner.getNumMergedRecordsInLog(),
@@ -2734,8 +2723,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
try (HoodieMergedLogRecordScanner scanner = builder.build()) {
assertEquals(expectedTotalRecords, scanner.getTotalLogRecords(), "There
should be " + expectedTotalRecords + " records");
final Set<String> readKeys = new HashSet<>();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
similarity index 98%
rename from
hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
rename to
hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 4dcabecccb7..7cc1f2b0b8f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
-class HoodieRecordUtilsTest {
+class TestHoodieRecordUtils {
@Test
void loadHoodieMerge() {
diff --git
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index ab929907186..7fc93c776f5 100644
---
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations;
import org.apache.parquet.Strings;
import org.apache.parquet.avro.AvroParquetReader;
@@ -362,7 +360,6 @@ public class TestQuickstartData {
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7dc6ea9c0f4..5087c814030 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -201,7 +202,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
}
private void initMergeClass() {
- recordMerger = writeClient.getConfig().getRecordMerger();
+ recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
LOG.info("init hoodie merge with class [{}]",
recordMerger.getClass().getName());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 7c076344166..f4ecb3e67d0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -56,7 +56,9 @@ import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -68,6 +70,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -962,6 +965,28 @@ public class TestInputFormat {
assertThat(baseMergeLogFileResult2, is("[]"));
}
+ /**
+ * This test check 2 cases of records preCombining.
+ * When the preCombine is true, the writer does an in-memory combing for
incoming records,
+ * when the preCombine is false, the merged log reader does the combining
while reading.
+ * With disorder deletes, we can check whether the '_hoodie_operation' is
correctly set up.
+ */
+ @ParameterizedTest
+ @MethodSource("preCombiningAndChangelogModeParams")
+ void testMergeOnReadDisorderDeleteMerging(boolean preCombine, boolean
changelogMode) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PRE_COMBINE.key(), preCombine + "");
+ options.put(FlinkOptions.CHANGELOG_ENABLED.key(), changelogMode + "");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write log file with disorder deletes
+ TestData.writeData(TestData.DATA_SET_DISORDER_INSERT_DELETE, conf);
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ final String baseResult = TestData.rowDataToString(readData(inputFormat));
+ String expected = "[+I[id1, Danny, 22, 1970-01-01T00:00:00.004, par1]]";
+ assertThat(baseResult, is(expected));
+ }
+
@Test
void testReadArchivedCommitsIncrementally() throws Exception {
Map<String, String> options = new HashMap<>();
@@ -1085,6 +1110,19 @@ public class TestInputFormat {
// Utilities
// -------------------------------------------------------------------------
+ /**
+ * Return test params => (preCombining, changelog mode).
+ */
+ private static Stream<Arguments> preCombiningAndChangelogModeParams() {
+ Object[][] data =
+ new Object[][] {
+ {true, true},
+ {true, false},
+ {false, true},
+ {false, false}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
private HoodieTableSource getTableSource(Configuration conf) {
return new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7f9ad108941..8c8ecf71061 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -24,13 +24,11 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -345,6 +343,17 @@ public class TestData {
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
);
+ public static List<RowData> DATA_SET_DISORDER_INSERT_DELETE = Arrays.asList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
22,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+ deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
+ );
+
public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
// DISORDER UPDATE
updateAfterRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 21,
@@ -926,7 +935,6 @@ public class TestData {
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 8701939ee25..2cf372384e9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -25,9 +25,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
@@ -62,7 +60,6 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
private final Set<String> deltaRecordKeys;
private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
- private final HoodieRecordMerger recordMerger;
private final int recordKeyIndex;
private Iterator<String> deltaItr;
@@ -76,7 +73,6 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
this.recordKeyIndex = split.getVirtualKeyInfo()
.map(HoodieVirtualKeyInfo::getRecordKeyFieldIndex)
.orElse(HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
- this.recordMerger =
HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
}
/**
@@ -103,7 +99,6 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2,
false))
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
}
@@ -191,10 +186,10 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
// for hive engine, the hiveSchema will be: col1,col2,par, and the
writerSchema will be col1,col2,par
// for presto engine, the hiveSchema will be: col1,col2, but the
writerSchema will be col1,col2,par
// so to be compatible with hive and presto, we should rewrite oldRecord
before we call combineAndGetUpdateValue,
- // once presto on hudi have it's own mor reader, we can remove the rewrite
logical.
+ // once presto on hudi have its own mor reader, we can remove the rewrite
logical.
GenericRecord genericRecord =
HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord,
getLogScannerReaderSchema());
HoodieRecord record = new HoodieAvroIndexedRecord(genericRecord);
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(record,
+ Option<Pair<HoodieRecord, Schema>> mergeResult =
HoodieAvroRecordMerger.INSTANCE.merge(record,
genericRecord.getSchema(), newRecord, getLogScannerReaderSchema(), new
TypedProperties(payloadProps));
return mergeResult.map(p -> (HoodieAvroIndexedRecord) p.getLeft());
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 427b758db34..a40519df92d 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -19,11 +19,9 @@
package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
@@ -85,9 +83,7 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader
.withLatestInstantTime(split.getMaxCommitTime())
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
-
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()));
-
+
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
this.executor = new BoundedInMemoryExecutor<>(
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
getParallelProducers(scannerBuilder),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 239ec31eb65..a541bbb8e52 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger,
HoodieRecordMerger}
+import org.apache.hudi.common.model.HoodieRecordMerger
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
+import org.apache.hudi.common.util.ValidationUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
index f59b62d7bec..8127cb73414 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieSparkValidateDuplicateKeyRecordMerger.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.HoodieSparkRecordMerger
import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.HoodieAvroRecordMerger.Config
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.{collection, Option => HOption}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordMerger,
OperationModeAwareness}
+import org.apache.hudi.common.util.{HoodieRecordUtils, collection, Option =>
HOption}
import org.apache.hudi.exception.HoodieDuplicateKeyException
/**
@@ -30,18 +29,14 @@ import org.apache.hudi.exception.HoodieDuplicateKeyException
* config.
* @see org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload
*/
-class HoodieSparkValidateDuplicateKeyRecordMerger extends
HoodieSparkRecordMerger {
+class HoodieSparkValidateDuplicateKeyRecordMerger extends
HoodieSparkRecordMerger with OperationModeAwareness {
override def merge(older: HoodieRecord[_], oldSchema: Schema, newer:
HoodieRecord[_], newSchema: Schema, props: TypedProperties):
HOption[collection.Pair[HoodieRecord[_], Schema]] = {
- val legacyOperatingMode =
Config.LegacyOperationMode.valueOf(props.getString(Config.LEGACY_OPERATING_MODE.key,
Config.LEGACY_OPERATING_MODE.defaultValue))
- legacyOperatingMode match {
- case Config.LegacyOperationMode.PRE_COMBINING =>
- super.merge(older, oldSchema, newer, newSchema, props)
- case Config.LegacyOperationMode.COMBINING =>
- val key = older.getRecordKey(oldSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD)
- throw new HoodieDuplicateKeyException(key)
- case _ =>
- throw new UnsupportedOperationException(String.format("Unsupported
legacy operating mode (%s)", legacyOperatingMode))
- }
+ val key = older.getRecordKey(oldSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD)
+ throw new HoodieDuplicateKeyException(key)
+ }
+
+ override def asPreCombiningMode(): HoodieRecordMerger = {
+
HoodieRecordUtils.loadRecordMerger(classOf[HoodieSparkRecordMerger].getName)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 5761a75383a..36a9a882750 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,11 +21,11 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieLogFile,
HoodieRecordPayload}
+import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.util.{HoodieRecordUtils, ValidationUtils}
+import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.spark.sql.Row
@@ -76,7 +76,6 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
.withSpillableMapBasePath(HoodieMemoryConfig.getDefaultSpillableMapBasePath)
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
-
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(classOf[HoodieAvroRecordMerger].getName))
.build
scanner.asScala.foreach(hoodieRecord => {
val record =
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 804a6c5f63b..c05a4055447 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.Path
-import org.apache.hudi.avro.model.HoodieCleanMetadata
-import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
+import org.apache.hudi.HoodieSparkRecordMerger
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
index 35ebe872e40..43fcb79ecf9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala
@@ -17,9 +17,8 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieAvroRecordMerger, HoodieRecordMerger, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertTrue