danny0405 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2251059110
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -331,7 +338,7 @@ public static HoodieIndex
createUserDefinedIndex(HoodieWriteConfig config) {
* @return {@link HoodieRecord}s that have the current location being set.
*/
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
- HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig
config, HoodieTable hoodieTable) {
+ HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig
config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory,
Schema dataSchema) {
Review Comment:
Did we intern the schema already?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,38 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+ readerContext.initRecordMerger(config.getProps());
+ // Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
+ // can be using an expression payload so here we rely on the table's
configured payload class if it is required.
+ ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
+ .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
+ RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ SerializableSchema writerSchema = new
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
Review Comment:
why we need a `SerializableSchema` here? isn't the avro `Schema` already
serializable?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,38 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+ readerContext.initRecordMerger(config.getProps());
+ // Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
+ // can be using an expression payload so here we rely on the table's
configured payload class if it is required.
+ ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
+ .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
+ RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ SerializableSchema writerSchema = new
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
+ SerializableSchema writerSchemaWithMetaFields = new
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(),
updatedConfig.allowOperationMetadataField()));
+ // Read the existing records with the meta fields and current writer
schema as the output schema
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable,
readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
+ List<String> orderingFieldNames = getOrderingFieldNames(
+ readerContext.getMergeMode(), hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null, config.getPayloadClass(),
null,
+ String.join(",", orderingFieldNames),
hoodieTable.getMetaClient().getTableConfig().getTableVersion()).getLeft();
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ recordMergeMode,
+ false,
+ Option.ofNullable(updatedConfig.getRecordMerger()),
Review Comment:
always use `HoodieReaderContext.getRecordMerger`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
+ .getContext();
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ readerContext.initRecordMerger(table.getConfig().getProps());
+ List<String> orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
table.getConfig().getProps(), table.getMetaClient());
HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
table.getConfig().getPayloadClass(), null,
+ String.join(",", orderingFieldNames),
tableConfig.getTableVersion()).getLeft();
+ Schema recordSchema;
+ if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+ recordSchema = new
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+ } else {
+ recordSchema = new
Schema.Parser().parse(table.getConfig().getWriteSchema());
+ }
+ BufferedRecordMerger<T> bufferedRecordMerger =
BufferedRecordMergerFactory.create(
+ readerContext,
+ recordMergeMode,
+ false,
+ Option.ofNullable(recordMerger),
+ orderingFieldNames,
+ Option.ofNullable(table.getConfig().getPayloadClass()),
+ recordSchema,
+ table.getConfig().getProps(),
+ tableConfig.getPartialUpdateMode());
+ return deduplicateRecords(
+ records,
+ table.getIndex(),
+ parallelism,
+ table.getConfig().getSchema(),
+ table.getConfig().getProps(),
+ bufferedRecordMerger,
+ readerContext,
+ orderingFieldNames.toArray(new String[0]));
}
- public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+ public abstract I deduplicateRecords(I records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schema,
+ TypedProperties props,
+ BufferedRecordMerger<T> merger,
+ HoodieReaderContext<T> readerContext,
+ String[] orderingFieldNames);
+
+ protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props,
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+ HoodieRecord<T> previous,
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+ try {
+ // NOTE: The order of previous and next is uncertain within a batch in
"reduceByKey".
+ // If the return value is empty, it means the previous should be chosen.
+ BufferedRecord<T> newBufferedRecord =
BufferedRecord.forRecordWithContext(next, schema, recordContext, props,
orderingFieldNames);
+ // Construct old buffered record.
+ BufferedRecord<T> oldBufferedRecord =
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props,
orderingFieldNames);
+ // Run merge.
+ Option<BufferedRecord<T>> merged =
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+ // NOTE: For merge mode based merging, it returns non-null.
+ // For mergers / payloads based merging, it may return null.
+ HoodieRecord<T> reducedRecord =
merged.map(recordContext::constructHoodieRecord).orElse(previous);
+ boolean choosePrevious = merged.isEmpty();
+ HoodieKey reducedKey = choosePrevious ? previous.getKey() :
next.getKey();
+ HoodieOperation operation = choosePrevious ? previous.getOperation() :
next.getOperation();
+ return reducedRecord.newInstance(reducedKey, operation);
Review Comment:
the location info got lost, some write handles like append handle needs this
into to decide whether the record is an update, we can use the location of the
previous record. If it is the partial update mode, new record always generated,
we will get a record with not location setup there.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
- Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+ if (mergeResult.isDelete()) {
//the record was deleted
return Option.empty();
}
- HoodieRecord<R> result = mergeResult.get().getLeft();
+ if (mergeResult.getMergedRecord() == null) {
+ // SENTINEL case: the record did not match and merge case and should not
be modified
+ return Option.of((HoodieRecord<R>) new
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+ }
+
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields,
+ existingRecordContext, orderingFieldNames, existing.getRecordKey(),
false);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
if (result.getData().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(result);
}
//record is inserted or updated
- String partitionPath = keyGenerator.getPartitionPath((GenericRecord)
result.getData());
+ String partitionPath =
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields));
Review Comment:
This is too costly to just fetch a partition path, can we just use
`incoming.getPartitionPath`? The two should belong to the same partition.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -226,7 +233,18 @@ private void initRecordConverter() {
}
private void initMergeClass() {
- recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+ readerContext =
writeClient.getEngineContext().<RowData>getReaderContextFactory(metaClient).getContext();
+ orderingFieldNames = getOrderingFieldNames(readerContext.getMergeMode(),
writeClient.getConfig().getProps(), metaClient);
+ recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ writeClient.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+ new SerializableSchema(writeClient.getConfig().getSchema()).get(),
Review Comment:
Can we write some utiity method in `HoodieAvroUtils` to parse string as avro
schema instead of using the `SerializableSchema` constructor.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,41 +459,46 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
+ Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
- Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
- Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
if (expressionPayloadKeygen.isPresent()) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext,
orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
- } else {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incomingPrepended,
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(),
orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+ if (mergeResult.isDelete()) {
+ // the record was deleted
return Option.empty();
}
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields, existingRecordContext,
+ orderingFieldNames, existing.getRecordKey(),
mergeResult.getMergedRecord() == null);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+ // the merged record needs to be converted back to the original payload
+ return
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(),
Review Comment:
this is also very costly.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
+ .getContext();
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ readerContext.initRecordMerger(table.getConfig().getProps());
+ List<String> orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
table.getConfig().getProps(), table.getMetaClient());
HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
table.getConfig().getPayloadClass(), null,
+ String.join(",", orderingFieldNames),
tableConfig.getTableVersion()).getLeft();
+ Schema recordSchema;
+ if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+ recordSchema = new
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
Review Comment:
let's intern these schema with `AvroSchemaCache`.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java:
##########
@@ -39,39 +41,54 @@
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.function.BiFunction;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+/**
+ * Record context provides the APIs for record related operations. Record
context is associated with
+ * a corresponding {@link HoodieReaderContext} and is used for getting field
values from a record,
+ * transforming a record etc.
+ */
public abstract class RecordContext<T> implements Serializable {
private static final long serialVersionUID = 1L;
- private SerializableBiFunction<T, Schema, String> recordKeyExtractor;
+ private final SerializableBiFunction<T, Schema, String> recordKeyExtractor;
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
protected JavaTypeConverter typeConverter;
- protected String partitionPath;
+ protected String partitionPath = null;
Review Comment:
Let's make the partition path non nullalbe.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -394,30 +407,36 @@ public MergeResult<T>
mergeNonDeleteRecord(BufferedRecord<T> olderRecord, Buffer
* based on {@code CUSTOM} merge mode and a given record payload class.
*/
private static class CustomPayloadRecordMerger<T> extends
BaseCustomMerger<T> {
- private final List<String> orderingFieldNames;
- private final String payloadClass;
+ private final String[] orderingFieldNames;
+ private final String basePayloadClass;
+ private final String incomingPayloadClass;
public CustomPayloadRecordMerger(
RecordContext<T> recordContext,
Option<HoodieRecordMerger> recordMerger,
List<String> orderingFieldNames,
- String payloadClass,
+ String basePayloadClass,
+ String incomingPayloadClass,
Schema readerSchema,
TypedProperties props) {
super(recordContext, recordMerger, readerSchema, props);
- this.orderingFieldNames = orderingFieldNames;
- this.payloadClass = payloadClass;
+ this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
+ this.basePayloadClass = basePayloadClass;
+ this.incomingPayloadClass = incomingPayloadClass;
}
@Override
public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
getMergedRecord(existingRecord, newRecord);
- if (combinedRecordAndSchemaOpt.isPresent()) {
- T combinedRecordData = recordContext.convertAvroRecord((IndexedRecord)
combinedRecordAndSchemaOpt.get().getLeft().getData());
+ Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
getMergedRecord(existingRecord, newRecord, false);
+ if (combinedRecordAndSchemaOpt.map(combinedRecordAndSchema ->
combinedRecordAndSchema.getRight() != null).orElse(false)) {
Review Comment:
why the schema can be null, can we fix the merger to return it correctly?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -427,31 +446,35 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
@Override
public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecord =
- getMergedRecord(olderRecord, newerRecord);
- if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(olderRecord, newerRecord, true);
+ if (mergedRecordAndSchema.isEmpty()) {
+ return new MergeResult<>(true, null);
+ }
+ HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ // Special handling for SENTINEL record in Expression Payload
+ if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
Review Comment:
+1
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java:
##########
@@ -79,15 +90,27 @@ public String getMetaFieldValue(IndexedRecord record, int
pos) {
}
@Override
- public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+ public HoodieRecord constructHoodieRecord(BufferedRecord<IndexedRecord>
bufferedRecord) {
+ // HoodieKey is not required so do not generate it if partitionPath is null
+ HoodieKey hoodieKey = partitionPath == null ? null : new
HoodieKey(bufferedRecord.getRecordKey(), partitionPath);
Review Comment:
Can we always set up the partition path for recordContext? the null key for
hoodie record will throw NPE for `HoodieRecord.getRecordKey`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -394,30 +407,36 @@ public MergeResult<T>
mergeNonDeleteRecord(BufferedRecord<T> olderRecord, Buffer
* based on {@code CUSTOM} merge mode and a given record payload class.
*/
private static class CustomPayloadRecordMerger<T> extends
BaseCustomMerger<T> {
- private final List<String> orderingFieldNames;
- private final String payloadClass;
+ private final String[] orderingFieldNames;
+ private final String basePayloadClass;
+ private final String incomingPayloadClass;
Review Comment:
curious why the incoming and base has two different payload classes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]