codope commented on code in PR #9894:
URL: https://github.com/apache/hudi/pull/9894#discussion_r1372317061
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -1276,5 +1291,35 @@ public HoodieTableMetaClient initTable(Configuration
configuration, String baseP
throws IOException {
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration,
basePath, build());
}
+
+ private void validateMergeConfigs() {
+ boolean payloadClassNameSet = null != payloadClassName;
+ boolean payloadTypeSet = null != payloadType;
+ boolean recordMergerStrategySet = null != recordMergerStrategy;
+ boolean recordMergeModeSet = null != recordMergeMode;
+
+ checkArgument(recordMergeModeSet,
+ "Record merge mode " + HoodieTableConfig.RECORD_MERGE_MODE.key() + "
should be set");
Review Comment:
Is it a mandatry config? How will it affect users upgrading to new version?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -119,21 +124,36 @@ protected Option<T> doProcessNextDataRecord(T record,
Map<String, Object> metadata,
Pair<Option<T>, Map<String,
Object>> existingRecordMetadataPair) throws IOException {
if (existingRecordMetadataPair != null) {
- // Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it should
be put as
- // the `older` in the merge API
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
- readerContext.constructHoodieRecord(Option.of(record), metadata,
readerSchema),
- readerSchema,
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema),
- readerSchema,
- payloadProps).get().getLeft();
- // If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
- return Option.of(combinedRecord.getData());
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ return Option.empty();
+ case EVENT_TIME_ORDERING:
+ Comparable incomingOrderingValue = readerContext.getOrderingValue(
+ Option.of(record), metadata, readerSchema, payloadProps);
+ Comparable existingOrderingValue = readerContext.getOrderingValue(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+ if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+ return Option.of(record);
+ }
+ return Option.empty();
Review Comment:
Why empty? Should it not be from `existingRecordMetadataPair`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]