This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebc2545245e [HUDI-9602] Use BufferedRecordMerger for dedup and
global index path (#13600)
4ebc2545245e is described below
commit 4ebc2545245e4a82605215229d5e5764f2660b30
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Aug 5 19:53:05 2025 +0530
[HUDI-9602] Use BufferedRecordMerger for dedup and global index path
(#13600)
* use the BufferedRecordMerger to deduplicate inputs for COW and index
write path;
* Add a new sub-merger for MIT expression payload.
---------
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Timothy Brown <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 180 ++++++++++++++-------
.../hudi/table/action/commit/BaseWriteHelper.java | 83 +++++++++-
.../table/action/commit/HoodieWriteHelper.java | 34 ++--
.../hudi/utils/HoodieWriterClientTestHarness.java | 55 +++++--
.../hudi/table/action/commit/FlinkWriteHelper.java | 37 ++---
.../hudi/table/action/commit/JavaWriteHelper.java | 30 ++--
.../commit/BaseSparkCommitActionExecutor.java | 6 +-
.../hudi/BaseSparkInternalRecordContext.java | 2 +-
.../org/apache/hudi/avro/AvroRecordContext.java | 53 ++++--
.../apache/hudi/avro/HoodieAvroReaderContext.java | 29 +++-
.../common/engine/AvroReaderContextFactory.java | 11 +-
.../hudi/common/engine/HoodieEngineContext.java | 17 +-
.../hudi/common/engine/HoodieReaderContext.java | 26 ++-
.../apache/hudi/common/engine/RecordContext.java | 58 +++++--
.../apache/hudi/common/model/BaseAvroPayload.java | 10 ++
.../apache/hudi/common/model/HoodieAvroRecord.java | 2 +-
.../hudi/common/model/HoodieAvroRecordMerger.java | 8 +-
.../hudi/common/model/HoodieRecordPayload.java | 23 ++-
.../hudi/common/table/read/BufferedRecord.java | 13 +-
.../table/read/BufferedRecordMergerFactory.java | 127 +++++++++++----
.../common/table/read/HoodieFileGroupReader.java | 7 +-
.../apache/hudi/common/util/HoodieRecordUtils.java | 13 ++
.../common/table/read/SchemaHandlerTestBase.java | 2 +-
.../read/buffer/BaseTestFileGroupRecordBuffer.java | 6 +
.../read/buffer/TestFileGroupRecordBuffer.java | 3 +-
.../buffer/TestReusableKeyBasedRecordBuffer.java | 3 +-
.../TestSortedKeyBasedFileGroupRecordBuffer.java | 3 +-
.../common/testutils/HoodieTestDataGenerator.java | 6 +-
.../hudi/common/testutils/RawTripTestPayload.java | 4 +-
.../hudi/common/util/TestHoodieRecordUtils.java | 27 ++++
.../org/apache/hudi/sink/StreamWriteFunction.java | 33 +++-
.../hudi/table/format/FlinkRecordContext.java | 2 +-
.../org/apache/hudi/hadoop/HiveRecordContext.java | 2 +-
.../org/apache/hudi/HoodieMergeOnReadRDDV2.scala | 3 +-
.../hudi/command/payload/ExpressionPayload.scala | 11 ++
.../hudi/functional/TestBufferedRecordMerger.java | 2 +-
.../apache/hudi/functional/TestHoodieIndex.java | 4 +
.../dml/others/TestPartialUpdateForMergeInto.scala | 5 +-
38 files changed, 704 insertions(+), 236 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 5f6d6e520861..658eb1c436d6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,14 +20,17 @@ package org.apache.hudi.index;
import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieIndexDefinition;
@@ -43,7 +46,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.read.MergeResult;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
@@ -69,7 +76,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +93,7 @@ import static java.util.stream.Collectors.toList;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.EXPRESSION_OPTION;
import static
org.apache.hudi.index.expression.HoodieExpressionIndex.IDENTITY_TRANSFORM;
@@ -331,7 +338,7 @@ public class HoodieIndexUtils {
* @return {@link HoodieRecord}s that have the current location being set.
*/
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
- HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig
config, HoodieTable hoodieTable) {
+ HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig
config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory,
Schema dataSchema) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
final Option<String> instantTime = metaClient
.getActiveTimeline() // we need to include all actions and completed
@@ -341,7 +348,6 @@ public class HoodieIndexUtils {
if (instantTime.isEmpty()) {
return hoodieTable.getContext().emptyHoodieData();
}
- ReaderContextFactory<R> readerContextFactory =
hoodieTable.getContext().getReaderContextFactory(metaClient);
return partitionLocations.flatMap(p -> {
Option<FileSlice> fileSliceOption = Option.fromJavaOptional(hoodieTable
.getHoodieView()
@@ -351,7 +357,6 @@ public class HoodieIndexUtils {
if (fileSliceOption.isEmpty()) {
return Collections.emptyIterator();
}
- Schema dataSchema =
AvroSchemaCache.intern(HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getWriteSchema()),
config.allowOperationMetadataField()));
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema());
FileSlice fileSlice = fileSliceOption.get();
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
@@ -386,20 +391,20 @@ public class HoodieIndexUtils {
* We also need the keygenerator so we can figure out the partition path
after expression payload
* evaluates the merge.
*/
- private static Pair<HoodieWriteConfig, Option<BaseKeyGenerator>>
getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig
tableConfig) {
- if
(config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
{
+ private static Pair<HoodieWriteConfig, BaseKeyGenerator>
getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig
tableConfig, boolean isExpressionPayload) {
+ HoodieWriteConfig writeConfig = config;
+ if (isExpressionPayload) {
TypedProperties typedProperties =
TypedProperties.copy(config.getProps());
// set the payload class to table's payload class and not expresison
payload. this will be used to read the existing records
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(),
tableConfig.getPayloadClass());
typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
tableConfig.getPayloadClass());
- HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
- try {
- return Pair.of(writeConfig, Option.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())));
- } catch (IOException e) {
- throw new RuntimeException("KeyGenerator must inherit from
BaseKeyGenerator to update a records partition path using spark sql merge
into", e);
- }
+ writeConfig =
HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
+ }
+ try {
+ return Pair.of(writeConfig, (BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
+ } catch (IOException e) {
+ throw new RuntimeException("KeyGenerator must inherit from
BaseKeyGenerator to update a records partition path using spark sql merge
into", e);
}
- return Pair.of(config, Option.empty());
}
/**
@@ -411,30 +416,53 @@ public class HoodieIndexUtils {
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
- Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+ if (mergeResult.isDelete()) {
//the record was deleted
return Option.empty();
}
- HoodieRecord<R> result = mergeResult.get().getLeft();
- if (result.getData().equals(HoodieRecord.SENTINEL)) {
+ if (mergeResult.getMergedRecord() == null) {
+ // SENTINEL case: the record did not match and merge case and should not
be modified
+ return Option.of((HoodieRecord<R>) new
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+ }
+
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields,
+ existingRecordContext, orderingFieldNames, existing.getRecordKey(),
false);
+
+ if (resultingBufferedRecord.getRecord().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
- return Option.of(result);
+ return
Option.of(existingRecordContext.constructHoodieRecord(resultingBufferedRecord,
incoming.getPartitionPath()));
}
//record is inserted or updated
- String partitionPath = keyGenerator.getPartitionPath((GenericRecord)
result.getData());
+ String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator, existingRecordContext,
resultingBufferedRecord);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord,
partitionPath);
HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema,
writeSchemaWithMetaFields,
- new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
config.getProps());
+ new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
config.getProps());
return
Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema)));
+ }
+ private static <R> String inferPartitionPath(HoodieRecord<R> incoming,
HoodieRecord<R> existing, Schema recordSchema, BaseKeyGenerator keyGenerator,
+ RecordContext<R> recordContext,
BufferedRecord<R> resultingBufferedRecord) {
+ R record = resultingBufferedRecord.getRecord();
+ if (record == incoming.getData()) {
+ return incoming.getPartitionPath();
+ } else if (record == existing.getData()) {
+ return existing.getPartitionPath();
+ } else {
+ // the merged record is not the same as either incoming or existing, so
we need to compute the partition path
+ return
keyGenerator.getPartitionPath(recordContext.convertToAvroRecord(record,
recordSchema));
+ }
}
/**
@@ -444,32 +472,37 @@ public class HoodieIndexUtils {
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
+ Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
- Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
- Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
- if (expressionPayloadKeygen.isPresent()) {
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames,
+ boolean isExpressionPayload) throws IOException {
+ if (isExpressionPayload) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ writeSchemaWithMetaFields, config, recordMerger, keyGenerator,
incomingRecordContext, existingRecordContext, orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
- } else {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incomingPrepended,
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(),
orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+ if (mergeResult.isDelete()) {
+ // the record was deleted
return Option.empty();
}
+ R mergedRecord = mergeResult.getMergedRecord();
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergedRecord, writeSchemaWithMetaFields,
existingRecordContext,
+ orderingFieldNames, existing.getRecordKey(), mergedRecord == null);
+ String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator, existingRecordContext,
resultingBufferedRecord);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord,
partitionPath);
+ // the merged record needs to be converted back to the original payload
+ return
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(),
+ config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema)));
}
}
@@ -477,10 +510,14 @@ public class HoodieIndexUtils {
* Merge tagged incoming records with existing records in case of partition
path updated.
*/
public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
- HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable)
{
- Pair<HoodieWriteConfig, Option<BaseKeyGenerator>>
keyGeneratorWriteConfigOpt = getKeygenAndUpdatedWriteConfig(config,
hoodieTable.getMetaClient().getTableConfig());
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations,
+ HoodieWriteConfig config,
+ HoodieTable hoodieTable) {
+ boolean isExpressionPayload =
config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
+ Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt =
+ getKeygenAndUpdatedWriteConfig(config,
hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
HoodieWriteConfig updatedConfig = keyGeneratorWriteConfigOpt.getLeft();
- Option<BaseKeyGenerator> expressionPayloadKeygen =
keyGeneratorWriteConfigOpt.getRight();
+ BaseKeyGenerator keyGenerator = keyGeneratorWriteConfigOpt.getRight();
// completely new records
HoodieData<HoodieRecord<R>> taggedNewRecords =
incomingRecordsAndLocations.filter(p ->
!p.getRight().isPresent()).map(Pair::getLeft);
// the records found in existing base files
@@ -493,10 +530,38 @@ public class HoodieIndexUtils {
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+ readerContext.initRecordMergerForIngestion(config.getProps());
+ // Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
+ // can be using an expression payload so here we rely on the table's
configured payload class if it is required.
+ ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
+ .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
+ RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ SerializableSchema writerSchema = new
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
+ SerializableSchema writerSchemaWithMetaFields = new
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(),
updatedConfig.allowOperationMetadataField()));
+ AvroSchemaCache.intern(writerSchema.get());
+ AvroSchemaCache.intern(writerSchemaWithMetaFields.get());
+ // Read the existing records with the meta fields and current writer
schema as the output schema
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable,
readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
+ List<String> orderingFieldNames = getOrderingFieldNames(
+ readerContext.getMergeMode(), hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ readerContext.getMergeMode(),
+ false,
+ readerContext.getRecordMerger(),
+ orderingFieldNames,
+ writerSchema.get(),
+
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(),
hoodieTable.getConfig().getPayloadClass())),
+ hoodieTable.getConfig().getProps(),
+ hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
+ String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
HoodieData<HoodieRecord<R>> taggedUpdatingRecords =
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
.leftOuterJoin(existingRecords.mapToPair(r ->
Pair.of(r.getRecordKey(), r)))
.values().flatMap(entry -> {
@@ -507,13 +572,14 @@ public class HoodieIndexUtils {
return Collections.singletonList(incoming).iterator();
}
HoodieRecord<R> existing = existingOpt.get();
- Schema writeSchema = new
Schema.Parser().parse(updatedConfig.getWriteSchema());
+ Schema writeSchema = writerSchema.get();
if (incoming.isDelete(writeSchema, updatedConfig.getProps())) {
// incoming is a delete: force tag the incoming to the old
partition
return
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()),
existing.getCurrentLocation())).iterator();
}
-
- Option<HoodieRecord<R>> mergedOpt =
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, updatedConfig,
recordMerger, expressionPayloadKeygen);
+ Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(
+ incoming, existing, writeSchema,
writerSchemaWithMetaFields.get(), updatedConfig,
+ recordMerger, keyGenerator, incomingRecordContext,
existingRecordContext, orderingFieldsArray, isExpressionPayload);
if (!mergedOpt.isPresent()) {
// merge resulted in delete: force tag the incoming to the old
partition
return
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()),
existing.getCurrentLocation())).iterator();
@@ -652,7 +718,7 @@ public class HoodieIndexUtils {
Schema tableSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
List<String> sourceFields = new ArrayList<>(columns.keySet());
String columnName = sourceFields.get(0); // We know there's only one
column from the check above
-
+
// First check if the field exists
try {
getNestedFieldSchemaFromWriteSchema(tableSchema, columnName);
@@ -663,7 +729,7 @@ public class HoodieIndexUtils {
indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" :
"expression",
userIndexName, columnName));
}
-
+
// Check for complex types (RECORD, ARRAY, MAP) - not supported for any
index type
if (!validateDataTypeForSecondaryOrExpressionIndex(sourceFields,
tableSchema)) {
Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema,
columnName);
@@ -674,7 +740,7 @@ public class HoodieIndexUtils {
indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" :
"expression",
userIndexName, columnName, fieldSchema.getType()));
}
-
+
// For secondary index, apply stricter data type validation
if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
if (!validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) {
@@ -683,7 +749,7 @@ public class HoodieIndexUtils {
if (fieldSchema.getLogicalType() != null) {
actualType += " with logical type " + fieldSchema.getLogicalType();
}
-
+
throw new HoodieMetadataIndexException(String.format(
"Cannot create secondary index '%s': Column '%s' has unsupported
data type '%s'. "
+ "Secondary indexes only support: STRING, CHAR, INT, BIGINT/LONG,
SMALLINT, TINYINT, "
@@ -691,13 +757,13 @@ public class HoodieIndexUtils {
+ "and DATE types. Please choose a column with one of these
supported types.",
userIndexName, columnName, actualType));
}
-
+
// Check if record index is enabled for secondary index
boolean hasRecordIndex =
metaClient.getTableConfig().getMetadataPartitions().stream()
.anyMatch(partition ->
partition.equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
boolean recordIndexEnabled = Boolean.parseBoolean(
options.getOrDefault(RECORD_INDEX_ENABLE_PROP.key(),
RECORD_INDEX_ENABLE_PROP.defaultValue().toString()));
-
+
if (!hasRecordIndex && !recordIndexEnabled) {
throw new HoodieMetadataIndexException(String.format(
"Cannot create secondary index '%s': Record index is required for
secondary indexes but is not enabled. "
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index 5d6bb6048e5b..ebb119360367 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -18,19 +18,35 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.List;
+
public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
@@ -85,9 +101,68 @@ public abstract class BaseWriteHelper<T, I, K, O, R>
extends ParallelismHelper<I
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
+ .getContext();
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ readerContext.initRecordMergerForIngestion(table.getConfig().getProps());
+ List<String> orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
table.getConfig().getProps(), table.getMetaClient());
+ Schema recordSchema;
+ if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+ recordSchema = new
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+ } else {
+ recordSchema = new
Schema.Parser().parse(table.getConfig().getWriteSchema());
+ }
+ recordSchema = AvroSchemaCache.intern(recordSchema);
+ BufferedRecordMerger<T> bufferedRecordMerger =
BufferedRecordMergerFactory.create(
+ readerContext,
+ readerContext.getMergeMode(),
+ false,
+
readerContext.getRecordMerger().map(HoodieRecordUtils::mergerToPreCombineMode),
+ orderingFieldNames,
+ Option.ofNullable(table.getConfig().getPayloadClass()),
+ recordSchema,
+ table.getConfig().getProps(),
+ tableConfig.getPartialUpdateMode());
+ return deduplicateRecords(
+ records,
+ table.getIndex(),
+ parallelism,
+ table.getConfig().getSchema(),
+ table.getConfig().getProps(),
+ bufferedRecordMerger,
+ readerContext,
+ orderingFieldNames.toArray(new String[0]));
}
- public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+ public abstract I deduplicateRecords(I records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schema,
+ TypedProperties props,
+ BufferedRecordMerger<T> merger,
+ HoodieReaderContext<T> readerContext,
+ String[] orderingFieldNames);
+
+ protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props,
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+ HoodieRecord<T> previous,
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+ try {
+ // NOTE: The order of previous and next is uncertain within a batch in
"reduceByKey".
+ // If the return value is empty, it means the previous should be chosen.
+ BufferedRecord<T> newBufferedRecord =
BufferedRecord.forRecordWithContext(next, schema, recordContext, props,
orderingFieldNames);
+ // Construct old buffered record.
+ BufferedRecord<T> oldBufferedRecord =
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props,
orderingFieldNames);
+ // Run merge.
+ Option<BufferedRecord<T>> merged =
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+ // NOTE: For merge mode based merging, it returns non-null.
+ // For mergers / payloads based merging, it may return null.
+ HoodieRecord<T> reducedRecord = merged.map(bufferedRecord ->
recordContext.constructHoodieRecord(bufferedRecord,
next.getPartitionPath())).orElse(previous);
+ boolean choosePrevious = merged.isEmpty();
+ HoodieKey reducedKey = choosePrevious ? previous.getKey() :
next.getKey();
+ HoodieOperation operation = choosePrevious ? previous.getOperation() :
next.getOperation();
+ return reducedRecord.newInstance(reducedKey, operation);
+ } catch (IOException e) {
+ throw new HoodieException(String.format("Error to merge two records, %s,
%s", previous, next), e);
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 37bb5b64e3bf..adfedf72db06 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -23,17 +23,15 @@ import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import java.io.IOException;
-
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T,
HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
@@ -56,10 +54,17 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
}
@Override
- public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger
merger) {
+ public HoodieData<HoodieRecord<T>>
deduplicateRecords(HoodieData<HoodieRecord<T>> records,
+ HoodieIndex<?, ?>
index,
+ int parallelism,
+ String schemaStr,
+ TypedProperties props,
+
BufferedRecordMerger<T> recordMerger,
+ HoodieReaderContext<T>
readerContext,
+ String[]
orderingFieldNames) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
+ RecordContext<T> recordContext = readerContext.getRecordContext();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their
partitionPath
@@ -68,17 +73,8 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
// Here we have to make a copy of the incoming record, since it
might be holding
// an instance of [[InternalRow]] pointing into shared, mutable
buffer
return Pair.of(key, record.copy());
- }).reduceByKey((rec1, rec2) -> {
- HoodieRecord<T> reducedRecord;
- try {
- reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(),
props).get().getLeft();
- } catch (IOException e) {
- throw new HoodieException(String.format("Error to merge two records,
%s, %s", rec1, rec2), e);
- }
- boolean choosePrev = rec1.getData().equals(reducedRecord.getData());
- HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
- HoodieOperation operation = choosePrev ? rec1.getOperation() :
rec2.getOperation();
- return reducedRecord.newInstance(reducedKey, operation);
- }, parallelism).map(Pair::getRight);
+ }).reduceByKey(
+ (previous, next) -> reduceRecords(props, recordMerger,
orderingFieldNames, previous, next, schema.get(), recordContext),
+ parallelism).map(Pair::getRight);
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index c437b9fbf1c0..9268f811a325 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -27,10 +27,12 @@ import
org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -39,7 +41,6 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,6 +51,8 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -93,6 +96,7 @@ import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.jetbrains.annotations.NotNull;
@@ -129,6 +133,7 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NA
import static
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
import static
org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
import static
org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
+import static
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
import static
org.apache.hudi.config.HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING;
import static
org.apache.hudi.testutils.Assertions.assertNoDupesWithinPartition;
import static
org.apache.hudi.testutils.Assertions.assertNoDuplicatesInPartition;
@@ -544,8 +549,34 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(isGlobal);
int dedupParallelism = records.getNumPartitions() + additionalParallelism;
- HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
(HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
- .deduplicateRecords(records, index, dedupParallelism,
writeConfig.getSchema(), writeConfig.getProps(),
HoodiePreCombineAvroRecordMerger.INSTANCE);
+ BaseHoodieWriteClient writeClient = getHoodieWriteClient(writeConfig);
+ HoodieReaderContext readerContext = writeClient.getEngineContext()
+ .getReaderContextFactoryForWrite(metaClient,
HoodieRecord.HoodieRecordType.AVRO, writeConfig.getProps()).getContext();
+ List<String> orderingFieldNames = getOrderingFieldNames(
+ readerContext.getMergeMode(), writeClient.getConfig().getProps(),
metaClient);
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
writeConfig.getPayloadClass(), null,
+ String.join(",", orderingFieldNames),
metaClient.getTableConfig().getTableVersion()).getLeft();
+ BufferedRecordMerger<HoodieRecord> recordMerger =
BufferedRecordMergerFactory.create(
+ readerContext,
+ recordMergeMode,
+ false,
+ Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+ new Schema.Parser().parse(writeClient.getConfig().getSchema()),
+ writeClient.getConfig().getProps(),
+ metaClient.getTableConfig().getPartialUpdateMode());
+ HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+ (HoodieData<HoodieRecord<RawTripTestPayload>>)
HoodieWriteHelper.newInstance()
+ .deduplicateRecords(
+ records,
+ index,
+ dedupParallelism,
+ writeConfig.getSchema(),
+ writeConfig.getProps(),
+ recordMerger,
+ readerContext,
+ orderingFieldNames.toArray(new String[0]));
assertEquals(expectedNumPartitions, dedupedRecsRdd.getNumPartitions());
List<HoodieRecord<RawTripTestPayload>> dedupedRecs =
dedupedRecsRdd.collectAsList();
assertEquals(isGlobal ? 1 : 2, dedupedRecs.size());
@@ -1187,24 +1218,22 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
*/
protected void testUpsertsInternal(Function3<Object, BaseHoodieWriteClient,
Object, String> writeFn, boolean populateMetaFields, boolean isPrepped,
SupportsUpgradeDowngrade
upgradeDowngrade) throws Exception {
-
metaClient.getStorage().deleteDirectory(new StoragePath(basePath));
-
- metaClient = HoodieTableMetaClient.newTableBuilder()
- .fromMetaClient(metaClient)
- .setTableVersion(6)
- .setPopulateMetaFields(populateMetaFields)
- .initTable(metaClient.getStorageConf().newInstance(),
metaClient.getBasePath());
-
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("driver,rider")
.withMetadataIndexColumnStatsFileGroupCount(1).withEngineType(getEngineType()).build())
.withWriteTableVersion(6);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
- metaClient = HoodieTestUtils.createMetaClient(storageConf, new
StoragePath(basePath), HoodieTableVersion.SIX);
-
HoodieWriteConfig config = cfgBuilder.build();
+ metaClient = HoodieTableMetaClient.newTableBuilder()
+ .fromProperties(config.getProps())
+ .setTableVersion(6)
+ .setTableType(metaClient.getTableType())
+ .setPopulateMetaFields(populateMetaFields)
+ .initTable(metaClient.getStorageConf().newInstance(),
metaClient.getBasePath());
+
+ metaClient = HoodieTestUtils.createMetaClient(storageConf, new
StoragePath(basePath), HoodieTableVersion.SIX);
BaseHoodieWriteClient client = getHoodieWriteClient(config);
// Write 1 (only inserts)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 0b0f1d7586df..13a30b228fde 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -22,13 +22,12 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -36,7 +35,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.avro.Schema;
-import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
@@ -92,31 +90,22 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, Iterator<HoodieRe
}
@Override
- public Iterator<HoodieRecord<T>>
deduplicateRecords(Iterator<HoodieRecord<T>> records, HoodieIndex<?, ?> index,
int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger
merger) {
+ public Iterator<HoodieRecord<T>>
deduplicateRecords(Iterator<HoodieRecord<T>> records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schemaStr,
+ TypedProperties props,
+ BufferedRecordMerger<T>
recordMerger,
+ HoodieReaderContext<T>
readerContext,
+ String[]
orderingFieldNames) {
// If index used is global, then records are expected to differ in their
partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords =
CollectionUtils.toStream(records)
.collect(Collectors.groupingBy(record ->
record.getKey().getRecordKey()));
// caution that the avro schema is not serializable
final Schema schema = new Schema.Parser().parse(schemaStr);
- return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1,
rec2) -> {
- HoodieRecord<T> reducedRecord;
- try {
- // Precombine do not need schema and do not return null
- reducedRecord = merger.merge(rec1, schema, rec2, schema,
props).get().getLeft();
- } catch (IOException e) {
- throw new HoodieException(String.format("Error to merge two records,
%s, %s", rec1, rec2), e);
- }
- // we cannot allow the user to change the key or partitionPath, since
that will affect
- // everything
- // so pick it from one of the records.
- boolean choosePrev = rec1.getData() == reducedRecord.getData();
- HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
- HoodieOperation operation = choosePrev ? rec1.getOperation() :
rec2.getOperation();
- HoodieRecord<T> hoodieRecord = reducedRecord.newInstance(reducedKey,
operation);
- // reuse the location from the first record.
- hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
- return hoodieRecord;
- }).orElse(null)).filter(Objects::nonNull).iterator();
+ return keyedRecords.values().stream().map(x ->
x.stream().reduce((previous, next) ->
+ reduceRecords(props, recordMerger, orderingFieldNames, previous, next,
schema, readerContext.getRecordContext())
+ ).orElse(null)).filter(Objects::nonNull).iterator();
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index 1ab944b94feb..efed521190c9 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -22,17 +22,16 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -59,8 +58,14 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T,
List<HoodieRecord<T
}
@Override
- public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
String schemaStr, TypedProperties props, HoodieRecordMerger merger) {
+ public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>>
records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schemaStr,
+ TypedProperties props,
+ BufferedRecordMerger<T>
recordMerger,
+ HoodieReaderContext<T>
readerContext,
+ String[] orderingFieldNames)
{
boolean isIndexingGlobal = index.isGlobal();
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords =
records.stream().map(record -> {
HoodieKey hoodieKey = record.getKey();
@@ -70,17 +75,8 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T,
List<HoodieRecord<T
}).collect(Collectors.groupingBy(Pair::getLeft));
final Schema schema = new Schema.Parser().parse(schemaStr);
- return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
- HoodieRecord<T> reducedRecord;
- try {
- reducedRecord = merger.merge(rec1, schema, rec2, schema,
props).get().getLeft();
- } catch (IOException e) {
- throw new HoodieException(String.format("Error to merge two records,
%s, %s", rec1, rec2), e);
- }
- // we cannot allow the user to change the key or partitionPath, since
that will affect
- // everything
- // so pick it from one of the records.
- return reducedRecord.newInstance(rec1.getKey(), rec1.getOperation());
- }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
+ return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((previous, next) ->
+ reduceRecords(props, recordMerger, orderingFieldNames, previous, next,
schema, readerContext.getRecordContext())
+ ).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index b2119ddf7ce2..3ff31990e7fa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -18,12 +18,11 @@
package org.apache.hudi.table.action.commit;
-import org.apache.hudi.client.utils.SparkPartitionUtils;
-import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.SparkPartitionUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
@@ -47,8 +46,9 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieMergeHandleFactory;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index a26df55c583b..7316c164dce4 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -70,7 +70,7 @@ public abstract class BaseSparkInternalRecordContext extends
RecordContext<Inter
}
@Override
- public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord, String
partitionPath) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 9bdfd1be0ee8..178a4c00e55c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -21,27 +21,41 @@ package org.apache.hudi.avro;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.AvroJavaTypeConverter;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import java.io.IOException;
import java.util.Map;
+import java.util.Properties;
+/**
+ * Record context for reading and transforming avro indexed records.
+ */
public class AvroRecordContext extends RecordContext<IndexedRecord> {
private final String payloadClass;
+ // This boolean indicates whether the caller requires payloads in the
HoodieRecord conversion.
+ // This is temporarily required as we migrate away from payloads.
+ private final boolean requiresPayloadRecords;
- public AvroRecordContext(HoodieTableConfig tableConfig) {
+ public AvroRecordContext(HoodieTableConfig tableConfig, String payloadClass,
boolean requiresPayloadRecords) {
super(tableConfig);
- this.payloadClass = tableConfig.getPayloadClass();
+ this.payloadClass = payloadClass;
this.typeConverter = new AvroJavaTypeConverter();
+ this.requiresPayloadRecords = requiresPayloadRecords;
}
public static Object getFieldValueFromIndexedRecord(
@@ -79,18 +93,39 @@ public class AvroRecordContext extends
RecordContext<IndexedRecord> {
}
@Override
- public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+ public HoodieRecord constructHoodieRecord(BufferedRecord<IndexedRecord>
bufferedRecord, String partitionPath) {
+ // HoodieKey is not required so do not generate it if partitionPath is null
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
+
if (bufferedRecord.isDelete()) {
- return SpillableMapUtils.generateEmptyPayload(
- bufferedRecord.getRecordKey(),
- partitionPath,
- bufferedRecord.getOrderingValue(),
- payloadClass);
+ if (payloadClass != null) {
+ return SpillableMapUtils.generateEmptyPayload(
+ bufferedRecord.getRecordKey(),
+ partitionPath,
+ bufferedRecord.getOrderingValue(),
+ payloadClass);
+ } else {
+ return new HoodieEmptyRecord<>(
+ hoodieKey,
+ HoodieRecord.HoodieRecordType.AVRO);
+ }
+ }
+ if (requiresPayloadRecords) {
+ HoodieRecordPayload payload =
HoodieRecordUtils.loadPayload(payloadClass, (GenericRecord)
bufferedRecord.getRecord(), bufferedRecord.getOrderingValue());
+ return new HoodieAvroRecord<>(hoodieKey, payload);
}
- HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}
+ @Override
+ public IndexedRecord extractDataFromRecord(HoodieRecord record, Schema
schema, Properties properties) {
+ try {
+ return record.toIndexedRecord(schema,
properties).map(HoodieAvroIndexedRecord::getData).orElse(null);
+ } catch (IOException e) {
+ throw new HoodieException("Failed to extract data from record: " +
record, e);
+ }
+ }
+
@Override
public IndexedRecord mergeWithEngineRecord(Schema schema,
Map<Integer, Object> updateValues,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d0a57d113c5c..f376b6c1a7c3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -96,7 +96,34 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
Option<InstantRange> instantRangeOpt,
Option<Predicate> filterOpt,
Map<StoragePath, HoodieAvroFileReader> reusableFileReaders) {
- super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new
AvroRecordContext(tableConfig));
+ this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
reusableFileReaders, tableConfig.getPayloadClass(), false);
+ }
+
+ /**
+ * Constructs an instance of the reader context for writer workflows
+ *
+ * @param storageConfiguration the storage configuration to use for
reading files
+ * @param tableConfig the configuration of the Hudi table being
read
+ * @param payloadClassName the payload class for the writer
+ * @param requiresPayloadRecords indicates whether the caller expects
payloads as the data in any HoodieRecord returned by this context
+ */
+ public HoodieAvroReaderContext(
+ StorageConfiguration<?> storageConfiguration,
+ HoodieTableConfig tableConfig,
+ String payloadClassName,
+ boolean requiresPayloadRecords) {
+ this(storageConfiguration, tableConfig, Option.empty(), Option.empty(),
Collections.emptyMap(), payloadClassName, requiresPayloadRecords);
+ }
+
+ private HoodieAvroReaderContext(
+ StorageConfiguration<?> storageConfiguration,
+ HoodieTableConfig tableConfig,
+ Option<InstantRange> instantRangeOpt,
+ Option<Predicate> filterOpt,
+ Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
+ String payloadClassName,
+ boolean requiresPayloadRecords) {
+ super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new
AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords));
this.reusableFileReaders = reusableFileReaders;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
index d410e7432910..72a6db27be85 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.engine;
import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.IndexedRecord;
@@ -29,13 +28,21 @@ import org.apache.avro.generic.IndexedRecord;
*/
public class AvroReaderContextFactory implements
ReaderContextFactory<IndexedRecord> {
private final HoodieTableMetaClient metaClient;
+ private final String payloadClassName;
+ private final boolean requiresPayloadRecords;
public AvroReaderContextFactory(HoodieTableMetaClient metaClient) {
+ this(metaClient, metaClient.getTableConfig().getPayloadClass(), false);
+ }
+
+ public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String
payloadClassName, boolean requiresPayloadRecords) {
this.metaClient = metaClient;
+ this.payloadClassName = payloadClassName;
+ this.requiresPayloadRecords = requiresPayloadRecords;
}
@Override
public HoodieReaderContext<IndexedRecord> getContext() {
- return new HoodieAvroReaderContext(metaClient.getStorageConf(),
metaClient.getTableConfig(), Option.empty(), Option.empty());
+ return new HoodieAvroReaderContext(metaClient.getStorageConf(),
metaClient.getTableConfig(), payloadClassName, requiresPayloadRecords);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 0255bbf67123..5199c2e93e68 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -30,6 +30,7 @@ import
org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableSortingIterator;
@@ -135,16 +136,26 @@ public abstract class HoodieEngineContext {
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
- public ReaderContextFactory<?>
getReaderContextFactoryDuringWrite(HoodieTableMetaClient metaClient,
HoodieRecord.HoodieRecordType recordType) {
+ /**
+ * Returns reader context factory for write operations in the table.
+ *
+ * @param metaClient Table meta client
+ * @param recordType Record type
+ * @param properties Typed properties
+ */
+ public ReaderContextFactory<?>
getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient,
HoodieRecord.HoodieRecordType recordType,
+
TypedProperties properties) {
if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
- return new AvroReaderContextFactory(metaClient);
+ String payloadClass = ConfigUtils.getPayloadClass(properties);
+ return new AvroReaderContextFactory(metaClient, payloadClass, true);
}
return getDefaultContextFactory(metaClient);
}
/**
* Returns default reader context factory for the engine.
- * @param metaClient Table metadata client
+ *
+ * @param metaClient Table metadata client
*/
public abstract ReaderContextFactory<?>
getDefaultContextFactory(HoodieTableMetaClient metaClient);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 56e6b763e13a..05699907bbfe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -240,12 +241,33 @@ public abstract class HoodieReaderContext<T> {
* @param properties the properties for the reader.
*/
public void initRecordMerger(TypedProperties properties) {
+ initRecordMerger(properties, false);
+ }
+
+ public void initRecordMergerForIngestion(TypedProperties properties) {
+ initRecordMerger(properties, true);
+ }
+
+ /**
+ * Initializes the record merger based on the table configuration and
properties.
+ * @param properties the properties for the reader.
+ * @param isIngestion indicates if the context is used in ingestion path.
+ */
+ private void initRecordMerger(TypedProperties properties, boolean
isIngestion) {
+ Option<String> providedPayloadClass =
HoodieRecordPayload.getPayloadClassNameIfPresent(properties);
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
- if
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ HoodieTableVersion tableVersion = tableConfig.getTableVersion();
+ // If the provided payload class differs from the table's payload class,
we need to infer the correct merging behavior.
+ if (isIngestion && providedPayloadClass.map(className ->
!className.equals(tableConfig.getPayloadClass())).orElse(false)) {
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(null, providedPayloadClass.get(),
null,
+ tableConfig.getPreCombineFieldsStr().orElse(null), tableVersion);
+ recordMergeMode = triple.getLeft();
+ mergeStrategyId = triple.getRight();
+ } else if (!tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
- mergeStrategyId, null, tableConfig.getTableVersion());
+ mergeStrategyId, tableConfig.getPreCombineFieldsStr().orElse(null),
tableVersion);
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
index 0e274c49c0d7..bcc10e46072f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java
@@ -42,29 +42,31 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.function.BiFunction;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+/**
+ * Record context provides the APIs for record related operations. Record
context is associated with
+ * a corresponding {@link HoodieReaderContext} and is used for getting field
values from a record,
+ * transforming a record etc.
+ */
public abstract class RecordContext<T> implements Serializable {
private static final long serialVersionUID = 1L;
- private SerializableBiFunction<T, Schema, String> recordKeyExtractor;
+ private final SerializableBiFunction<T, Schema, String> recordKeyExtractor;
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
protected JavaTypeConverter typeConverter;
protected String partitionPath;
- public RecordContext(HoodieTableConfig tableConfig) {
+ protected RecordContext(HoodieTableConfig tableConfig) {
this.typeConverter = new DefaultJavaTypeConverter();
- updateRecordKeyExtractor(tableConfig, tableConfig.populateMetaFields());
- }
-
- public void updateRecordKeyExtractor(HoodieTableConfig tableConfig, boolean
shouldUseMetadataFields) {
- this.recordKeyExtractor = shouldUseMetadataFields ? metadataKeyExtractor()
: virtualKeyExtractor(tableConfig.getRecordKeyFields()
+ this.recordKeyExtractor = tableConfig.populateMetaFields() ?
metadataKeyExtractor() : virtualKeyExtractor(tableConfig.getRecordKeyFields()
.orElseThrow(() -> new IllegalArgumentException("No record keys
specified and meta fields are not populated")));
}
@@ -72,6 +74,10 @@ public abstract class RecordContext<T> implements
Serializable {
this.partitionPath = partitionPath;
}
+ public T extractDataFromRecord(HoodieRecord record, Schema schema,
Properties properties) {
+ return (T) record.getData();
+ }
+
/**
* Gets the schema encoded in the buffered record {@code BufferedRecord}.
*
@@ -98,13 +104,25 @@ public abstract class RecordContext<T> implements
Serializable {
return this.localAvroSchemaCache.getSchema((Integer)
versionId).orElse(null);
}
+ /**
+ * Constructs a new {@link HoodieRecord} based on the given buffered record
{@link BufferedRecord} and the provided partition path.
+ * Use this method when the partition path is not consistent for all usages
of the RecordContext instance.
+ *
+ * @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
+ * @param partitionPath The partition path of the record
+ * @return A new instance of {@link HoodieRecord}.
+ */
+ public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord, String partitionPath);
+
/**
* Constructs a new {@link HoodieRecord} based on the given buffered record
{@link BufferedRecord}.
*
- * @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
+ * @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
* @return A new instance of {@link HoodieRecord}.
*/
- public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord);
+ public HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord) {
+ return constructHoodieRecord(bufferedRecord, partitionPath);
+ }
/**
* Constructs a new Engine based record based on a given schema, base record
and update values.
@@ -272,6 +290,28 @@ public abstract class RecordContext<T> implements
Serializable {
});
}
+ /**
+ * Gets the ordering value in particular type.
+ *
+ * @param record An option of record.
+ * @param schema The Avro schema of the record.
+ * @param orderingFieldNames names of the ordering fields
+ * @return The ordering value.
+ */
+ public Comparable getOrderingValue(T record,
+ Schema schema,
+ String[] orderingFieldNames) {
+ if (orderingFieldNames.length == 0) {
+ return OrderingValues.getDefault();
+ }
+
+ return OrderingValues.create(orderingFieldNames, field -> {
+ Object value = getValue(record, schema, field);
+ // API getDefaultOrderingValue is only used inside Comparables
constructor
+ return value != null ? convertValueToEngineType((Comparable) value) :
OrderingValues.getDefault();
+ });
+ }
+
/**
* Extracts the record position value from the record itself.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index 4bea348c3b1b..676d8f3ac8f8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -19,11 +19,14 @@
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
@@ -98,4 +101,11 @@ public abstract class BaseAvroPayload implements
Serializable {
public byte[] getRecordBytes() {
return recordBytes;
}
+
+ public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties
properties) throws IOException {
+ if (recordBytes.length == 0) {
+ return Option.empty();
+ }
+ return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 1f8f1f2fa624..1b6bfa03153d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -244,7 +244,7 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
@Override
public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) throws IOException {
- Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema,
props);
+ Option<IndexedRecord> avroData = getData().getIndexedRecord(recordSchema,
props);
if (avroData.isPresent()) {
HoodieAvroIndexedRecord record =
new HoodieAvroIndexedRecord(key, avroData.get(), operation,
getData().getMetadata());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
index e9be41e3fe87..673f1a61a0cf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java
@@ -44,7 +44,7 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger, OperationMode
@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
+ return combineAndGetUpdateValue(older, newer, oldSchema, newSchema, props)
.map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
}
@@ -53,13 +53,13 @@ public class HoodieAvroRecordMerger implements
HoodieRecordMerger, OperationMode
return HoodieRecordType.AVRO;
}
- private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException {
- Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema,
props).map(HoodieAvroIndexedRecord::getData);
+ private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema oldSchema, Schema newSchema, Properties props)
throws IOException {
+ Option<IndexedRecord> previousAvroData = older.toIndexedRecord(oldSchema,
props).map(HoodieAvroIndexedRecord::getData);
if (!previousAvroData.isPresent()) {
return Option.empty();
}
- return ((HoodieAvroRecord)
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema,
props);
+ return ((HoodieAvroRecord)
newer).getData().combineAndGetUpdateValue(previousAvroData.get(), newSchema,
props);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
index 3cc8bcf3da08..4cc2a493df17 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
@@ -112,6 +112,19 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
Option<IndexedRecord> getInsertValue(Schema schema) throws IOException;
+ /**
+ * Deserializes the HoodieRecordPayload into an {@link IndexedRecord}.
+ * Unlike {@link #getInsertValue(Schema, Properties)}, this method is meant
to solely perform deserialization.
+ *
+ * @param schema Schema to use for reading the record
+ * @param properties Properties for the current context
+ * @return the {@link IndexedRecord} if one is available, otherwise returns
an empty Option.
+ * @throws IOException thrown if there is an error during deserialization
+ */
+ default Option<IndexedRecord> getIndexedRecord(Schema schema, Properties
properties) throws IOException {
+ return getInsertValue(schema, properties);
+ }
+
/**
* Generates an avro record out of the given HoodieRecordPayload, to be
written out to storage. Called when writing a new value for the given
* HoodieKey, wherein there is no existing record in storage to be combined
against. (i.e insert) Return EMPTY to skip writing this record.
@@ -169,16 +182,18 @@ public interface HoodieRecordPayload<T extends
HoodieRecordPayload> extends Seri
}
static String getPayloadClassName(Properties props) {
- String payloadClassName;
+ return
getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME);
+ }
+
+ static Option<String> getPayloadClassNameIfPresent(Properties props) {
+ String payloadClassName = null;
if (props.containsKey(PAYLOAD_CLASS_NAME.key())) {
payloadClassName = props.getProperty(PAYLOAD_CLASS_NAME.key());
} else if (props.containsKey("hoodie.datasource.write.payload.class")) {
payloadClassName =
props.getProperty("hoodie.datasource.write.payload.class");
- } else {
- return HoodieTableConfig.DEFAULT_PAYLOAD_CLASS_NAME;
}
// There could be tables written with payload class from com.uber.hoodie.
// Need to transparently change to org.apache.hudi.
- return payloadClassName.replace("com.uber.hoodie", "org.apache.hudi");
+ return Option.ofNullable(payloadClassName).map(className ->
className.replace("com.uber.hoodie", "org.apache.hudi"));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index 5f205431b24f..d8375e8df67d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -54,9 +54,10 @@ public class BufferedRecord<T> implements Serializable {
this.isDelete = isDelete;
}
- public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T>
record, Schema schema, RecordContext<T> recordContext, Properties props,
String[] orderingFields) {
+ public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord
record, Schema schema, RecordContext<T> recordContext, Properties props,
String[] orderingFields) {
HoodieKey hoodieKey = record.getKey();
- String recordKey = hoodieKey == null ?
recordContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
+ T data = recordContext.extractDataFromRecord(record, schema, props);
+ String recordKey = hoodieKey == null ? recordContext.getRecordKey(data,
schema) : hoodieKey.getRecordKey();
Integer schemaId = recordContext.encodeAvroSchema(schema);
boolean isDelete;
try {
@@ -64,7 +65,7 @@ public class BufferedRecord<T> implements Serializable {
} catch (IOException e) {
throw new HoodieException("Failed to get isDelete from record.", e);
}
- return new BufferedRecord<>(recordKey, record.getOrderingValue(schema,
props, orderingFields), record.getData(), schemaId, isDelete);
+ return new BufferedRecord<>(recordKey, record.getOrderingValue(schema,
props, orderingFields), data, schemaId, isDelete);
}
public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema
schema, RecordContext<T> recordContext, List<String> orderingFieldNames,
boolean isDelete) {
@@ -74,6 +75,12 @@ public class BufferedRecord<T> implements Serializable {
return new BufferedRecord<>(recordKey, orderingValue, record, schemaId,
isDelete);
}
+ public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema
schema, RecordContext<T> recordContext, String[] orderingFieldNames, String
recordKey, boolean isDelete) {
+ Integer schemaId = recordContext.encodeAvroSchema(schema);
+ Comparable orderingValue = recordContext.getOrderingValue(record, schema,
orderingFieldNames);
+ return new BufferedRecord<>(recordKey, orderingValue, record, schemaId,
isDelete);
+ }
+
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord
deleteRecord, Comparable orderingValue) {
return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue,
null, null, true);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 46feee0e67e6..0c949806ba16 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -57,6 +57,19 @@ public class BufferedRecordMergerFactory {
Schema readerSchema,
TypedProperties props,
PartialUpdateMode
partialUpdateMode) {
+ return create(readerContext, recordMergeMode, enablePartialMerging,
recordMerger,
+ orderingFieldNames, readerSchema, payloadClass.map(p -> Pair.of(p,
p)), props, partialUpdateMode);
+ }
+
+ public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T>
readerContext,
+ RecordMergeMode
recordMergeMode,
+ boolean
enablePartialMerging,
+ Option<HoodieRecordMerger>
recordMerger,
+ List<String>
orderingFieldNames,
+ Schema readerSchema,
+ Option<Pair<String,
String>> payloadClasses,
+ TypedProperties props,
+ PartialUpdateMode
partialUpdateMode) {
/**
* This part implements KEEP_VALUES partial update mode, which merges two
records that do not have all columns.
* Other Partial update modes, like IGNORE_DEFAULTS assume all columns
exists in the record,
@@ -64,7 +77,7 @@ public class BufferedRecordMergerFactory {
*/
if (enablePartialMerging) {
BufferedRecordMerger<T> deleteRecordMerger = create(
- readerContext, recordMergeMode, false, recordMerger,
orderingFieldNames, payloadClass, readerSchema, props, partialUpdateMode);
+ readerContext, recordMergeMode, false, recordMerger,
orderingFieldNames, readerSchema, payloadClasses, props, partialUpdateMode);
return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
}
@@ -80,9 +93,13 @@ public class BufferedRecordMergerFactory {
}
return new
EventTimePartiaRecordMerger<>(readerContext.getRecordContext(),
partialUpdateMode, props);
default:
- if (payloadClass.isPresent()) {
- return new CustomPayloadRecordMerger<>(
- readerContext.getRecordContext(), recordMerger,
orderingFieldNames, payloadClass.get(), readerSchema, props);
+ if (payloadClasses.isPresent()) {
+ if
(payloadClasses.get().getRight().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload"))
{
+ return new
ExpressionPayloadRecordMerger<>(readerContext.getRecordContext(), recordMerger,
orderingFieldNames,
+ payloadClasses.get().getLeft(),
payloadClasses.get().getRight(), readerSchema, props);
+ } else {
+ return new
CustomPayloadRecordMerger<>(readerContext.getRecordContext(), recordMerger,
orderingFieldNames, payloadClasses.get().getLeft(), readerSchema, props);
+ }
} else {
return new CustomRecordMerger<>(readerContext.getRecordContext(),
recordMerger, orderingFieldNames, readerSchema, props);
}
@@ -389,13 +406,35 @@ public class BufferedRecordMergerFactory {
}
}
+ /**
+ * An implementation of {@link BufferedRecordMerger} which merges {@link
BufferedRecord}s based on the ExpressionPayload.
+ * The delta merge expects the incoming records to both be Expression
Payload, whereas the final merge expects the existing
+ * record payload to match the table's configured payload and the new record
to be an Expression Payload.
+ */
+ private static class ExpressionPayloadRecordMerger<T> extends
CustomPayloadRecordMerger<T> {
+ private final String basePayloadClass;
+
+ public ExpressionPayloadRecordMerger(RecordContext<T> recordContext,
Option<HoodieRecordMerger> recordMerger, List<String> orderingFieldNames,
String basePayloadClass, String incomingPayloadClass,
+ Schema readerSchema, TypedProperties
props) {
+ super(recordContext, recordMerger, orderingFieldNames,
incomingPayloadClass, readerSchema, props);
+ this.basePayloadClass = basePayloadClass;
+ }
+
+ @Override
+ protected Pair<HoodieRecord, HoodieRecord>
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T>
newerRecord) {
+ HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext,
olderRecord, basePayloadClass);
+ HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext,
newerRecord, payloadClass);
+ return Pair.of(oldHoodieRecord, newHoodieRecord);
+ }
+ }
+
/**
* An implementation of {@link BufferedRecordMerger} which merges {@link
BufferedRecord}s
* based on {@code CUSTOM} merge mode and a given record payload class.
*/
private static class CustomPayloadRecordMerger<T> extends
BaseCustomMerger<T> {
- private final List<String> orderingFieldNames;
- private final String payloadClass;
+ private final String[] orderingFieldNames;
+ protected final String payloadClass;
public CustomPayloadRecordMerger(
RecordContext<T> recordContext,
@@ -405,53 +444,73 @@ public class BufferedRecordMergerFactory {
Schema readerSchema,
TypedProperties props) {
super(recordContext, recordMerger, readerSchema, props);
- this.orderingFieldNames = orderingFieldNames;
+ this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
this.payloadClass = payloadClass;
}
@Override
public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
getMergedRecord(existingRecord, newRecord);
- if (combinedRecordAndSchemaOpt.isPresent()) {
- T combinedRecordData = recordContext.convertAvroRecord((IndexedRecord)
combinedRecordAndSchemaOpt.get().getLeft().getData());
- // If pre-combine does not return existing record, update it
- if (combinedRecordData != existingRecord.getRecord()) {
- Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
- return
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData,
combinedRecordAndSchema.getRight(), recordContext, orderingFieldNames, false));
- }
+ Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ if (mergedRecordAndSchema.isEmpty()) {
+ // An empty Option indicates that the output represents a delete.
+ return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, true));
+ }
+ HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ // Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
+ if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, true));
+ T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ // If pre-combine does not return existing record, update it
+ if (combinedRecordData != existingRecord.getRecord()) {
+ // For pkless we need to use record key from existing record
+ return
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData,
mergeResultSchema, recordContext, orderingFieldNames,
+ existingRecord.getRecordKey(),
mergedRecord.isDelete(mergeResultSchema, props)));
+ }
+ return Option.empty();
}
@Override
public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecord =
- getMergedRecord(olderRecord, newerRecord);
- if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(olderRecord, newerRecord, true);
+ if (mergedRecordAndSchema.isEmpty()) {
+ return new MergeResult<>(true, null);
+ }
+ HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ // Special handling for SENTINEL record in Expression Payload
+ if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
+ return new MergeResult<>(false, null);
+ }
+ if (!mergedRecord.isDelete(mergeResultSchema, props)) {
IndexedRecord indexedRecord;
- if (!mergedRecord.get().getRight().equals(readerSchema)) {
- indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData();
+ if (!mergeResultSchema.equals(readerSchema)) {
+ indexedRecord = (IndexedRecord)
mergedRecord.rewriteRecordWithNewSchema(mergeResultSchema, null,
readerSchema).getData();
} else {
- indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().getData();
+ indexedRecord = (IndexedRecord) mergedRecord.getData();
}
return new MergeResult<>(false,
recordContext.convertAvroRecord(indexedRecord));
}
return new MergeResult<>(true, null);
}
- private Option<Pair<HoodieRecord, Schema>>
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord)
throws IOException {
- HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext,
olderRecord);
- HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext,
newerRecord);
- Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
- oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord,
olderRecord),
- newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord,
newerRecord), props);
- return mergedRecord;
+ protected Pair<HoodieRecord, HoodieRecord>
getDeltaMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T>
newerRecord) {
+ HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext,
olderRecord, payloadClass);
+ HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext,
newerRecord, payloadClass);
+ return Pair.of(oldHoodieRecord, newHoodieRecord);
+ }
+
+ protected Pair<HoodieRecord, HoodieRecord>
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T>
newerRecord) {
+ return getDeltaMergeRecords(olderRecord, newerRecord);
+ }
+
+ private Option<Pair<HoodieRecord, Schema>>
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord,
boolean isFinalMerge) throws IOException {
+ Pair<HoodieRecord, HoodieRecord> records = isFinalMerge ?
getFinalMergeRecords(olderRecord, newerRecord) :
getDeltaMergeRecords(olderRecord, newerRecord);
+ return recordMerger.merge(records.getLeft(),
getSchemaForAvroPayloadMerge(olderRecord), records.getRight(),
getSchemaForAvroPayloadMerge(newerRecord), props);
}
- private HoodieRecord constructHoodieAvroRecord(RecordContext<T>
recordContext, BufferedRecord<T> bufferedRecord) {
+ protected HoodieRecord constructHoodieAvroRecord(RecordContext<T>
recordContext, BufferedRecord<T> bufferedRecord, String payloadClass) {
GenericRecord record = null;
if (!bufferedRecord.isDelete()) {
Schema recordSchema =
recordContext.getSchemaFromBufferRecord(bufferedRecord);
@@ -462,8 +521,8 @@ public class BufferedRecordMergerFactory {
HoodieRecordUtils.loadPayload(payloadClass, record,
bufferedRecord.getOrderingValue()), null);
}
- private Schema getSchemaForAvroPayloadMerge(HoodieRecord record,
BufferedRecord<T> bufferedRecord) throws IOException {
- if (record.isDelete(readerSchema, props)) {
+ private Schema getSchemaForAvroPayloadMerge(BufferedRecord<T>
bufferedRecord) {
+ if (bufferedRecord.getSchemaId() == null) {
return readerSchema;
}
return recordContext.getSchemaFromBufferRecord(bufferedRecord);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2a57d8b87376..c756c6fc6ba2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -20,7 +20,6 @@
package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.BaseFile;
@@ -35,6 +34,7 @@ import
org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -52,7 +52,6 @@ import org.apache.avro.Schema;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -119,9 +118,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
: new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
- this.orderingFieldNames = readerContext.getMergeMode() ==
RecordMergeMode.COMMIT_TIME_ORDERING
- ? Collections.emptyList()
- :
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElse(hoodieTableMetaClient.getTableConfig().getPreCombineFields());
+ this.orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props,
hoodieTableMetaClient);
this.readStats = new HoodieReadStats();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 6d3c3b9eadca..dd0eccb32e30 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -18,6 +18,8 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
@@ -25,6 +27,7 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OperationModeAwareness;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
@@ -32,6 +35,8 @@ import org.apache.avro.generic.GenericRecord;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -115,4 +120,12 @@ public class HoodieRecordUtils {
}
return null;
}
+
+ public static List<String> getOrderingFieldNames(RecordMergeMode mergeMode,
+ TypedProperties props,
+ HoodieTableMetaClient
metaClient) {
+ return mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING
+ ? Collections.emptyList()
+ :
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElseGet(()
-> metaClient.getTableConfig().getPreCombineFields());
+ }
}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index e9cae5045a7a..37837bd070d2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -337,7 +337,7 @@ public abstract class SchemaHandlerTestBase {
}
@Override
- public HoodieRecord<String>
constructHoodieRecord(BufferedRecord<String> bufferedRecord) {
+ public HoodieRecord<String>
constructHoodieRecord(BufferedRecord<String> bufferedRecord, String
partitionPath) {
return null;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index a7ba45b29c37..b5778e57062b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -53,6 +53,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -190,6 +191,11 @@ public class BaseTestFileGroupRecordBuffer {
return Option.of(payloadRecord);
}
+ @Override
+ public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties
properties) {
+ return Option.of(payloadRecord);
+ }
+
@Override
public Comparable<?> getOrderingValue() {
return null;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
index 416a539f1201..1a772005321c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBuffer.java
@@ -63,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -195,7 +196,7 @@ class TestFileGroupRecordBuffer {
record.put("ts", System.currentTimeMillis());
record.put("op", "d");
record.put("_hoodie_is_deleted", false);
- when(recordContext.getOrderingValue(any(), any(), any())).thenReturn(1);
+ when(recordContext.getOrderingValue(any(), any(),
anyList())).thenReturn(1);
when(recordContext.convertOrderingValueToEngineType(any())).thenReturn(1);
BufferedRecord<GenericRecord> bufferedRecord =
BufferedRecord.forRecordWithContext(record, schema,
readerContext.getRecordContext(), Collections.singletonList("ts"), true);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
index 8ece244bda31..d35d66ffa423 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestReusableKeyBasedRecordBuffer.java
@@ -47,6 +47,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -76,7 +77,7 @@ class TestReusableKeyBasedRecordBuffer {
return new TestRecord(recordKey, 0);
});
when(mockReaderContext.getRecordContext().getRecordKey(any(),
any())).thenAnswer(invocation -> ((TestRecord)
invocation.getArgument(0)).getRecordKey());
- when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(),
any())).thenAnswer(invocation -> {
+ when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(),
anyList())).thenAnswer(invocation -> {
TestRecord record = invocation.getArgument(0);
if (record.getRecordKey().equals("1")) {
return 20; // simulate newer record in base file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 0ef3da31cc1e..5ec30d04da7e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -56,6 +56,7 @@ import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -194,7 +195,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuf
return new TestRecord(recordKey, 0);
});
when(mockReaderContext.getRecordContext().getRecordKey(any(),
any())).thenAnswer(invocation -> ((TestRecord)
invocation.getArgument(0)).getRecordKey());
- when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(),
any())).thenReturn(0);
+ when(mockReaderContext.getRecordContext().getOrderingValue(any(), any(),
anyList())).thenReturn(0);
when(mockReaderContext.toBinaryRow(any(), any())).thenAnswer(invocation ->
invocation.getArgument(1));
when(mockReaderContext.seal(any())).thenAnswer(invocation ->
invocation.getArgument(0));
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index bb671a8f5ef4..18f84084b034 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieInstantWriter;
@@ -85,7 +86,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -402,7 +402,7 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String
instantTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(),
key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
true, false);
- return new RawTripTestPayload(Option.of(rec.toString()),
key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L);
+ return new RawTripTestPayload(Option.of(rec.toString()),
key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0);
}
/**
@@ -914,7 +914,7 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
public HoodieRecord generateDeleteRecord(HoodieKey key) throws IOException {
RawTripTestPayload payload =
- new RawTripTestPayload(Option.empty(), key.getRecordKey(),
key.getPartitionPath(), null, true, DEFAULT_ORDERING_VALUE);
+ new RawTripTestPayload(Option.empty(), key.getRecordKey(),
key.getPartitionPath(), null, true, OrderingValues.getDefault());
return new HoodieAvroRecord(key, payload);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
index 17b88c813683..9d081015cf1e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java
@@ -88,7 +88,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
}
public RawTripTestPayload(String jsonData, String rowKey, String
partitionPath, String schemaStr) throws IOException {
- this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
+ this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0);
}
public RawTripTestPayload(String jsonData) throws IOException {
@@ -98,7 +98,7 @@ public class RawTripTestPayload implements
HoodieRecordPayload<RawTripTestPayloa
this.rowKey = jsonRecordMap.get("_row_key").toString();
this.partitionPath =
extractPartitionFromTimeField(jsonRecordMap.get("time").toString());
this.isDeleted = false;
- this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number",
0L).toString());
+ this.orderingVal = Integer.valueOf(jsonRecordMap.getOrDefault("number",
0).toString());
}
public RawTripTestPayload(GenericRecord record, Comparable orderingVal) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 466486158076..96a299c9e6ab 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -18,16 +18,25 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class TestHoodieRecordUtils {
@@ -52,4 +61,22 @@ class TestHoodieRecordUtils {
HoodieRecordPayload payload =
HoodieRecordUtils.loadPayload(payloadClassName, null, 0);
assertEquals(payload.getClass().getName(), payloadClassName);
}
+
+ @Test
+ void testGetOrderingFields() {
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ TypedProperties props = new TypedProperties();
+ // Assert empty ordering fields for commit time ordering
+
assertTrue(HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.COMMIT_TIME_ORDERING,
props, metaClient).isEmpty());
+
+ // Assert table config precombine fields are returned when props are not
set with event time merge mode
+ HoodieTableConfig tableConfig = new HoodieTableConfig();
+ tableConfig.setValue(HoodieTableConfig.PRECOMBINE_FIELDS, "tbl");
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ assertEquals(Collections.singletonList("tbl"),
HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.EVENT_TIME_ORDERING,
props, metaClient));
+
+ // Assert props value is returned for precombine field configuration when
it is set with event time merge mode
+ props.setProperty("hoodie.datasource.write.precombine.field", "props");
+ assertEquals(Collections.singletonList("props"),
HoodieRecordUtils.getOrderingFieldNames(RecordMergeMode.EVENT_TIME_ORDERING,
props, metaClient));
+ }
}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index d989e2ec6338..8058eb19604e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -20,11 +20,13 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.MappingIterator;
@@ -46,6 +48,7 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.MutableIteratorWrapperIterator;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -70,6 +73,8 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import static
org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames;
+
/**
* Sink function to write the data to the underneath filesystem.
*
@@ -117,7 +122,9 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
protected transient WriteFunction writeFunction;
- private transient HoodieRecordMerger recordMerger;
+ private transient BufferedRecordMerger<RowData> recordMerger;
+ private transient HoodieReaderContext<RowData> readerContext;
+ private transient List<String> orderingFieldNames;
protected final RowType rowType;
@@ -226,7 +233,18 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
}
private void initMergeClass() {
- recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+ readerContext =
writeClient.getEngineContext().<RowData>getReaderContextFactory(metaClient).getContext();
+ orderingFieldNames = getOrderingFieldNames(readerContext.getMergeMode(),
writeClient.getConfig().getProps(), metaClient);
+ recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ writeClient.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+ new Schema.Parser().parse(writeClient.getConfig().getSchema()),
+ writeClient.getConfig().getProps(),
+ metaClient.getTableConfig().getPartialUpdateMode());
LOG.info("init hoodie merge with class [{}]",
recordMerger.getClass().getName());
}
@@ -416,7 +434,8 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
Iterator<HoodieRecord> recordItr = new MappingIterator<>(
rowItr, rowData -> recordConverter.convert(rowData,
rowDataBucket.getBucketInfo()));
- List<WriteStatus> statuses =
writeFunction.write(deduplicateRecordsIfNeeded(recordItr),
rowDataBucket.getBucketInfo(), instant);
+ List<WriteStatus> statuses = writeFunction.write(
+ deduplicateRecordsIfNeeded(recordItr), rowDataBucket.getBucketInfo(),
instant);
writeMetrics.endFileFlush();
writeMetrics.increaseNumOfFilesWritten();
return statuses;
@@ -425,7 +444,9 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
protected Iterator<HoodieRecord>
deduplicateRecordsIfNeeded(Iterator<HoodieRecord> records) {
if (config.get(FlinkOptions.PRE_COMBINE)) {
return FlinkWriteHelper.newInstance().deduplicateRecords(
- records, null, -1, this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
+ records, null, -1, this.writeClient.getConfig().getSchema(),
+ this.writeClient.getConfig().getProps(),
+ recordMerger, readerContext, orderingFieldNames.toArray(new
String[0]));
} else {
return records;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index c293d8b62b26..3ea434d2d3aa 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -114,7 +114,7 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
}
@Override
- public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData>
bufferedRecord) {
+ public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData>
bufferedRecord, String partitionPath) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
// delete record
if (bufferedRecord.isDelete()) {
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
index 54488c136044..b31e62abef5f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java
@@ -71,7 +71,7 @@ public class HiveRecordContext extends
RecordContext<ArrayWritable> {
}
@Override
- public HoodieRecord<ArrayWritable>
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+ public HoodieRecord<ArrayWritable>
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord, String
partitionPath) {
HoodieKey key = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 4b5865afff13..0b86ac6e3bf9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.expression.{Predicate => HPredicate}
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
import org.apache.hudi.metadata.HoodieTableMetadataUtil
@@ -166,7 +167,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
if (metaClient.isMetadataTable) {
val requestedSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
val instantRange =
InstantRange.builder().rangeType(RangeType.EXACT_MATCH).explicitInstants(validInstants.value).build()
- val readerContext = new HoodieAvroReaderContext(storageConf,
metaClient.getTableConfig, HOption.of(instantRange), HOption.empty())
+ val readerContext = new HoodieAvroReaderContext(storageConf,
metaClient.getTableConfig, HOption.of(instantRange),
HOption.empty().asInstanceOf[HOption[HPredicate]])
val fileGroupReader: HoodieFileGroupReader[IndexedRecord] =
HoodieFileGroupReader.newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 3f61827f13f3..94f17bdc5f1d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -270,6 +270,17 @@ class ExpressionPayload(@transient record: GenericRecord,
isDeletedRecord || isDeleteOnCondition
}
+ override def getIndexedRecord(schema: Schema, properties: Properties):
HOption[IndexedRecord] = {
+ val recordSchema = getRecordSchema(properties)
+ val indexedRecord = bytesToAvro(recordBytes, recordSchema)
+
+ if (super.isDeleteRecord(indexedRecord)) {
+ HOption.empty[IndexedRecord]()
+ } else {
+ HOption.of(indexedRecord)
+ }
+ }
+
override def getInsertValue(schema: Schema, properties: Properties):
HOption[IndexedRecord] = {
val recordSchema = getRecordSchema(properties)
val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes,
recordSchema))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 271f8682d297..0cf6fc1c58e3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -815,7 +815,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
}
@Override
- public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord, String
partitionPath) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 8941c05f4425..c0fc5846f4b2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -50,6 +50,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.RawTripTestPayloadKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
@@ -159,6 +161,8 @@ public class TestHoodieIndex extends TestHoodieMetadataBase
{
.withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.build();
+ KeyGeneratorType keyGeneratorType =
HoodieSparkKeyGeneratorFactory.inferKeyGeneratorTypeFromWriteConfig(config.getProps());
+ config.setValue(HoodieWriteConfig.KEYGENERATOR_TYPE,
keyGeneratorType.name());
writeClient = getHoodieWriteClient(config);
this.index = writeClient.getIndex();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index 973055e35a0e..9ca649dab4cd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -335,8 +335,9 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(
s"""
|merge into $tableName t0
- |using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
- |union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
+ |using ( select 1 as id, 1.0 as price, 100 as ts
+ |union select 1 as id, 12.0 as price, 999 as ts
+ |union select 3 as id, 25.0 as price, 1260 as ts) s0
|on t0.id = s0.id
|when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)