the-other-tim-brown commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2234127450
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +104,117 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType())
+ .getContext();
+ List<String> orderingFieldNames = getOrderingFieldName(readerContext,
table.getConfig().getProps(), table.getMetaClient());
+ BufferedRecordMerger<T> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ table.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(table.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(table.getConfig().getPayloadClass()),
+ new SerializableSchema(table.getConfig().getSchema()).get(),
+ table.getConfig().getProps(),
+ table.getMetaClient().getTableConfig().getPartialUpdateMode());
+ // Due to new records we cant use meta fields for record key extraction
+
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
false);
+ return deduplicateRecords(
+ records,
+ table.getIndex(),
+ parallelism,
+ table.getConfig().getSchema(),
+ table.getConfig().getProps(),
+ recordMerger,
+ readerContext,
+ orderingFieldNames);
+ }
+
+ public abstract I deduplicateRecords(I records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schema,
+ TypedProperties props,
+ BufferedRecordMerger<T> merger,
+ HoodieReaderContext<T> readerContext,
+ List<String> orderingFieldNames);
+
+ public static List<String> getOrderingFieldName(HoodieReaderContext
readerContext,
+ TypedProperties props,
+ HoodieTableMetaClient
metaClient) {
+ return readerContext.getMergeMode() == RecordMergeMode.COMMIT_TIME_ORDERING
+ ? Collections.emptyList()
+ :
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElse(metaClient.getTableConfig().getPreCombineFields());
+ }
+
+ /**
+ * Check if the value of column "_hoodie_is_deleted" is true.
+ */
+ public static <T> boolean isBuiltInDeleteRecord(T record,
Review Comment:
These methods for determining a delete are very similar to what is in the
FileGroupRecordBuffer. Let's move these into a common place so we don't let the
logic drift between them.
It may be possible to add this to the logic for the BufferedRecord creation
or as a method in the RecordContext?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,27 +461,57 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> recordContext,
+ List<String> orderingFieldNames,
+ boolean hasBuiltInDelete,
+ Option<Pair<String, String>> customDeleteMarkerKeyValue,
+ int hoodieOperationPos,
Review Comment:
Let's make some POJO for capturing the delete field configuration so it is
easier to extract and pass between methods.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -56,10 +65,23 @@ protected HoodieData<HoodieRecord<T>>
tag(HoodieData<HoodieRecord<T>> dedupedRec
}
@Override
- public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger
merger) {
+ public HoodieData<HoodieRecord<T>>
deduplicateRecords(HoodieData<HoodieRecord<T>> records,
+ HoodieIndex<?, ?>
index,
+ int parallelism,
+ String schemaStr,
+ TypedProperties props,
+
BufferedRecordMerger<T> recordMerger,
+ HoodieReaderContext<T>
readerContext,
+ List<String>
orderingFieldNames) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
+ RecordContext recordContext = readerContext.getRecordContext();
Review Comment:
Add type to this since it is a parameterized class
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -98,11 +98,15 @@ public FileGroupReaderSchemaHandler(HoodieReaderContext<T>
readerContext,
this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
this.hasBuiltInDelete = deleteConfigs.getRight();
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
- this.hoodieOperationPos =
Option.ofNullable(requiredSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)).map(Schema.Field::pos).orElse(-1);
+ this.hoodieOperationPos = getHoodieOperationPos(requiredSchema);
this.internalSchema = pruneInternalSchema(requiredSchema,
internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
}
+ public static Integer getHoodieOperationPos(Schema schema) {
Review Comment:
return primitive `int`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -492,10 +542,34 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, readerContextFactory);
+ List<String> orderingFieldNames = getOrderingFieldName(
+ readerContext, hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ hoodieTable.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(hoodieTable.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(hoodieTable.getConfig().getPayloadClass()),
+ new SerializableSchema(hoodieTable.getConfig().getSchema()).get(),
+ hoodieTable.getConfig().getProps(),
+ hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
+ RecordContext recordContext = readerContext.getRecordContext();
Review Comment:
Add type here as well please
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -492,10 +542,34 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, readerContextFactory);
+ List<String> orderingFieldNames = getOrderingFieldName(
+ readerContext, hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ hoodieTable.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(hoodieTable.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(hoodieTable.getConfig().getPayloadClass()),
+ new SerializableSchema(hoodieTable.getConfig().getSchema()).get(),
+ hoodieTable.getConfig().getProps(),
+ hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
+ RecordContext recordContext = readerContext.getRecordContext();
+ // Due to new records we cant use meta fields for record key extraction
+
recordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
false);
+ Schema writerSchema = new
Schema.Parser().parse(hoodieTable.getConfig().getSchema());
Review Comment:
Can we avoid parsing this schema per pair of records?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +104,117 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType())
+ .getContext();
+ List<String> orderingFieldNames = getOrderingFieldName(readerContext,
table.getConfig().getProps(), table.getMetaClient());
+ BufferedRecordMerger<T> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ table.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(table.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(table.getConfig().getPayloadClass()),
+ new SerializableSchema(table.getConfig().getSchema()).get(),
+ table.getConfig().getProps(),
+ table.getMetaClient().getTableConfig().getPartialUpdateMode());
+ // Due to new records we cant use meta fields for record key extraction
+
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
false);
+ return deduplicateRecords(
+ records,
+ table.getIndex(),
+ parallelism,
+ table.getConfig().getSchema(),
+ table.getConfig().getProps(),
+ recordMerger,
+ readerContext,
+ orderingFieldNames);
+ }
+
+ public abstract I deduplicateRecords(I records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schema,
+ TypedProperties props,
+ BufferedRecordMerger<T> merger,
+ HoodieReaderContext<T> readerContext,
+ List<String> orderingFieldNames);
+
+ public static List<String> getOrderingFieldName(HoodieReaderContext
readerContext,
+ TypedProperties props,
+ HoodieTableMetaClient
metaClient) {
+ return readerContext.getMergeMode() == RecordMergeMode.COMMIT_TIME_ORDERING
+ ? Collections.emptyList()
+ :
Option.ofNullable(ConfigUtils.getOrderingFields(props)).map(Arrays::asList).orElse(metaClient.getTableConfig().getPreCombineFields());
+ }
+
+ /**
+ * Check if the value of column "_hoodie_is_deleted" is true.
+ */
+ public static <T> boolean isBuiltInDeleteRecord(T record,
+ RecordContext<T>
recordContext,
+ Schema schema,
Option<Pair<String, String>> customDeleteMarkerKeyValue) {
+ if (!customDeleteMarkerKeyValue.isPresent()) {
+ return false;
+ }
+ Object columnValue = recordContext.getValue(record, schema,
HOODIE_IS_DELETED_FIELD);
+ return columnValue != null &&
recordContext.getTypeConverter().castToBoolean(columnValue);
}
- public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+ /**
+ * Check if a record is a DELETE marked by the '_hoodie_operation' field.
+ */
+ public static <T> boolean isDeleteHoodieOperation(T record,
+ RecordContext<T>
recordContext,
+ int hoodieOperationPos) {
+ if (hoodieOperationPos < 0) {
+ return false;
+ }
+ String hoodieOperation = recordContext.getMetaFieldValue(record,
hoodieOperationPos);
+ return hoodieOperation != null &&
HoodieOperation.isDeleteRecord(hoodieOperation);
+ }
+
+ /**
+ * Check if a record is a DELETE marked by a custom delete marker.
+ */
+ public static <T> boolean isCustomDeleteRecord(T record,
+ RecordContext<T>
recordContext,
+ Schema schema,
+ boolean hasBuiltInDelete,
+ Option<Pair<String, String>>
customDeleteMarkerKeyValue) {
+ if (!hasBuiltInDelete || customDeleteMarkerKeyValue.isEmpty()) {
+ return false;
+ }
+ Pair<String, String> markerKeyValue = customDeleteMarkerKeyValue.get();
+ Object deleteMarkerValue =
+ recordContext.getValue(record, schema, markerKeyValue.getLeft());
+ return deleteMarkerValue != null
+ && markerKeyValue.getRight().equals(deleteMarkerValue.toString());
+ }
+
+ public static <T> Option<BufferedRecord<T>> merge(HoodieRecord<T> newRecord,
+ HoodieRecord<T> oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ RecordContext<T>
recordContext,
+ List<String>
orderingFieldNames,
+ BufferedRecordMerger<T>
recordMerger,
+ boolean hasBuiltInDelete,
+ Option<Pair<String,
String>> customDeleteMarkerKeyValue,
+ int hoodieOperationPos)
throws IOException {
+ // Construct new buffered record.
+ boolean isDelete1 = isBuiltInDeleteRecord(newRecord.getData(),
recordContext, newSchema, customDeleteMarkerKeyValue)
+ || isCustomDeleteRecord(newRecord.getData(), recordContext, newSchema,
hasBuiltInDelete, customDeleteMarkerKeyValue)
+ || isDeleteHoodieOperation(newRecord.getData(), recordContext,
hoodieOperationPos);
+ BufferedRecord<T> bufferedRec1 = BufferedRecord.forRecordWithContext(
+ newRecord.getData(), newSchema, recordContext, orderingFieldNames,
isDelete1);
Review Comment:
You can pass in the `HoodieRecord` instead of the underlying data. This
allows us to try to get the key from the `HoodieRecord` instead of inspecting
the object which avoids some overhead per record.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,27 +461,57 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> recordContext,
+ List<String> orderingFieldNames,
+ boolean hasBuiltInDelete,
+ Option<Pair<String, String>> customDeleteMarkerKeyValue,
+ int hoodieOperationPos,
+ KeyGenerator keyGenerator) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
if (expressionPayloadKeygen.isPresent()) {
- return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ HoodieRecord newRecord = incoming;
+ HoodieRecord oldRecord = existing;
+ if (recordContext instanceof AvroRecordContext) {
+ // We need to convert HoodieAvroRecord to HoodieAvroIndexedRecord in
order to use the reader context
+ newRecord = incoming.toIndexedRecord(writeSchema,
config.getProps()).get();
+ oldRecord = existing.toIndexedRecord(writeSchema,
config.getProps()).get();
+ }
+ return mergeIncomingWithExistingRecordWithExpressionPayload(
+ newRecord, oldRecord, writeSchema, existingSchema,
writeSchemaWithMetaFields,
+ config, recordMerger, expressionPayloadKeygen.get(), recordContext,
orderingFieldNames,
+ hasBuiltInDelete, customDeleteMarkerKeyValue, hoodieOperationPos);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
- HoodieRecord incomingPrepended = incoming
+ HoodieRecord<R> incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
// after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
+ HoodieRecord<R> incomingWithMetaFields = incomingPrepended
.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
+ HoodieRecord newRecord = incomingWithMetaFields;
+ HoodieRecord oldRecord = existing;
+ if (recordContext instanceof AvroRecordContext) {
+ // We need to convert HoodieAvroRecord to HoodieAvroIndexedRecord in
order to use the reader context
+ newRecord = incoming.toIndexedRecord(writeSchema,
config.getProps()).get();
+ oldRecord = existing.toIndexedRecord(writeSchema,
config.getProps()).get();
+ // For Avro, we need to use avro key generator factory to create key
generator
+ keyGenerator =
HoodieAvroKeyGeneratorFactory.createKeyGenerator(config.getProps());
+ }
+ Option<BufferedRecord<R>> mergeResult = merge(
+ newRecord, oldRecord, writeSchemaWithMetaFields, existingSchema,
+ recordContext, orderingFieldNames, recordMerger,
+ hasBuiltInDelete, customDeleteMarkerKeyValue, hoodieOperationPos);
if (mergeResult.isPresent()) {
// the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
+ TypedProperties recordCreationProps =
TypedProperties.copy(config.getProps());
Review Comment:
Why does this require a copy of the props now?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +104,117 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
- HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType())
+ .getContext();
+ List<String> orderingFieldNames = getOrderingFieldName(readerContext,
table.getConfig().getProps(), table.getMetaClient());
+ BufferedRecordMerger<T> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ table.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(table.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(table.getConfig().getPayloadClass()),
+ new SerializableSchema(table.getConfig().getSchema()).get(),
+ table.getConfig().getProps(),
+ table.getMetaClient().getTableConfig().getPartialUpdateMode());
+ // Due to new records we cant use meta fields for record key extraction
+
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
false);
+ return deduplicateRecords(
+ records,
+ table.getIndex(),
+ parallelism,
+ table.getConfig().getSchema(),
+ table.getConfig().getProps(),
+ recordMerger,
+ readerContext,
+ orderingFieldNames);
+ }
+
+ public abstract I deduplicateRecords(I records,
+ HoodieIndex<?, ?> index,
+ int parallelism,
+ String schema,
+ TypedProperties props,
+ BufferedRecordMerger<T> merger,
+ HoodieReaderContext<T> readerContext,
+ List<String> orderingFieldNames);
+
+ public static List<String> getOrderingFieldName(HoodieReaderContext
readerContext,
Review Comment:
Let's update this to `getOrderingFieldNames` for consistency with the other
changes that were made
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -492,10 +542,34 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, readerContextFactory);
+ List<String> orderingFieldNames = getOrderingFieldName(
+ readerContext, hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ hoodieTable.getConfig().getRecordMergeMode(),
+ false,
+ Option.ofNullable(hoodieTable.getConfig().getRecordMerger()),
+ orderingFieldNames,
+ Option.ofNullable(hoodieTable.getConfig().getPayloadClass()),
+ new SerializableSchema(hoodieTable.getConfig().getSchema()).get(),
+ hoodieTable.getConfig().getProps(),
+ hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
+ RecordContext recordContext = readerContext.getRecordContext();
+ // Due to new records we cant use meta fields for record key extraction
+
recordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
false);
Review Comment:
Can we add a method for `getRecordContext` that takes in some boolean to
indicate the key lookup needs to be done with the key generator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]