nsivabalan commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2250848943
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,41 +459,46 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
+ Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
- Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
- Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
if (expressionPayloadKeygen.isPresent()) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext,
orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
- } else {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incomingPrepended,
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(),
orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+ if (mergeResult.isDelete()) {
+ // the record was deleted
return Option.empty();
}
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields, existingRecordContext,
+ orderingFieldNames, existing.getRecordKey(),
mergeResult.getMergedRecord() == null);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
Review Comment:
In case of expression payload, say we end up choosing a base file record
when merged w/ new incoming.
won't we miss out on the unevaluated expression which is present only in new
incoming record and no in the exiting record on storage.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -115,4 +120,12 @@ public static String
getCurrentLocationInstant(HoodieRecord<?> record) {
}
return null;
}
+
+ public static List<String> getOrderingFieldNames(RecordMergeMode mergeMode,
Review Comment:
have we added UTs for this.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -427,31 +446,35 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
@Override
public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecord =
- getMergedRecord(olderRecord, newerRecord);
- if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(olderRecord, newerRecord, true);
+ if (mergedRecordAndSchema.isEmpty()) {
+ return new MergeResult<>(true, null);
+ }
+ HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ // Special handling for SENTINEL record in Expression Payload
+ if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
Review Comment:
not required to be fixed in this patch.
but is it worth moving SENTINAL into the HoodieRecord only.
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java:
##########
@@ -79,15 +90,27 @@ public String getMetaFieldValue(IndexedRecord record, int
pos) {
}
@Override
- public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+ public HoodieRecord constructHoodieRecord(BufferedRecord<IndexedRecord>
bufferedRecord) {
+ // HoodieKey is not required so do not generate it if partitionPath is null
+ HoodieKey hoodieKey = partitionPath == null ? null : new
HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
+
if (bufferedRecord.isDelete()) {
- return SpillableMapUtils.generateEmptyPayload(
- bufferedRecord.getRecordKey(),
- partitionPath,
- bufferedRecord.getOrderingValue(),
- payloadClass);
+ if (payloadClass != null) {
+ return SpillableMapUtils.generateEmptyPayload(
+ bufferedRecord.getRecordKey(),
+ partitionPath,
+ bufferedRecord.getOrderingValue(),
+ payloadClass);
+ } else {
+ return new HoodieEmptyRecord<>(
+ hoodieKey,
+ HoodieRecord.HoodieRecordType.AVRO);
+ }
+ }
+ if (requiresPayloadRecords) {
Review Comment:
can you add java docs to call out what kind of write flows will hit his.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,77 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
+ .getContext();
+
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
false);
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ readerContext.initRecordMerger(table.getConfig().getProps());
+ List<String> orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
table.getConfig().getProps(), table.getMetaClient());
HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
table.getConfig().getPayloadClass(), null,
Review Comment:
gotcha
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,36 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+
incomingRecordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
false);
+ readerContext.initRecordMerger(config.getProps());
+ // Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
+ // can be using an expression payload so here we rely on the table's
configured payload class if it is required.
+ ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
+ RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable,
readerContextFactoryForExistingRecords);
+ List<String> orderingFieldNames = getOrderingFieldNames(
+ readerContext.getMergeMode(), hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null, config.getPayloadClass(),
null,
+ String.join(",", orderingFieldNames),
hoodieTable.getMetaClient().getTableConfig().getTableVersion()).getLeft();
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ recordMergeMode,
+ false,
Review Comment:
is this addressed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]