nsivabalan commented on code in PR #13588:
URL: https://github.com/apache/hudi/pull/13588#discussion_r2221176430
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -370,27 +379,38 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ HoodieReaderContext<R> readerContext,
+ Option<String> orderingFieldNameOpt) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
if (expressionPayloadKeygen.isPresent()) {
- return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ return mergeIncomingWithExistingRecordWithExpressionPayload(
+ incoming, existing, writeSchema, existingSchema,
writeSchemaWithMetaFields,
+ config, recordMerger, expressionPayloadKeygen.get(), readerContext,
orderingFieldNameOpt);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
- HoodieRecord incomingPrepended = incoming
+ HoodieRecord<R> incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
// after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
+ HoodieRecord<R> incomingWithMetaFields = incomingPrepended
.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
+ Option<BufferedRecord<R>> mergeResult = merge(
+ incomingWithMetaFields, existing, writeSchemaWithMetaFields,
existingSchema,
+ readerContext, orderingFieldNameOpt, recordMerger);
if (mergeResult.isPresent()) {
// the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
+ HoodieRecord<R> merged =
+ readerContext.constructHoodieRecord(mergeResult.get())
+ .wrapIntoHoodieRecordPayloadWithParams(
Review Comment:
yes. we might need to revisit this entire class and wherever we are
constructing HoodieRecord, we might need to fix it. bcox, it is using payload
way of record generation. but we are looking to move away from payloads. So, in
all these places, we might need to call
```
HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> bufferedRecord)
```
from ReaderContext to construct the HoodieRecord from bufferedRecord.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]