linliu-code commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2190263755
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,97 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]