nsivabalan commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2181089525
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -164,6 +165,20 @@ public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedR
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}
+ @Override
+ public IndexedRecord constructEngineRecord(Schema schema, List<Object>
values) {
Review Comment:
do we have UTs for this
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -841,6 +841,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service. So far
Clustering and Compaction support incremental processing.");
+ public static final ConfigProperty<Boolean>
EVENT_TIME_WATERMARK_METADATA_ENABLED = ConfigProperty
+ .key("hoodie.write.event.time.watermark.metadata.enabled")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Record event time in record metadata during
runtime.");
Review Comment:
more documentation please
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,14 @@ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRo
return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema), false);
}
+ @Override
+ public InternalRow constructEngineRecord(Schema schema, List<Object> values)
{
+ if (schema.getFields().size() != values.size()) {
+ throw new IllegalArgumentException("Schema field count and values size
must match.");
+ }
+ return new GenericInternalRow(values.toArray());
Review Comment:
We use UsafeRow in most places.
https://github.com/apache/hudi/blob/e3f93425cd3b5d5fc07d1b39f6396ab403f9b76d/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java#L383
@yihua : can you take a look please
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +310,20 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("When set to true, the table can support reading and
writing multiple base file formats.");
+ public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE =
ConfigProperty
+ .key("hoodie.write.partial.update.mode")
+ .defaultValue(PartialUpdateMode.NONE)
+ .sinceVersion("1.1.0")
+ .withDocumentation("This property when set, will define how two versions
of the record will be "
+ + "merged together where the later contains only partial set of
values and not entire record.");
+
+ public static final ConfigProperty<String> PARTIAL_UPDATE_PROPERTIES =
ConfigProperty
+ .key("hoodie.write.partial.update.properties")
Review Comment:
can you update PR description to call this out.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -263,15 +297,20 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != existingRecord.getRecord()) {
- return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), readerContext, props));
+ return Option.of(BufferedRecord.forRecordWithContext(
+ combinedRecord, combinedRecordAndSchema.getRight(),
readerContext, props));
}
return Option.empty();
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
+ newRecord = updatePartiallyIfNeeded(
+ newRecord, existingRecord, readerSchema, readerSchema,
partialUpdateMode);
return Option.of(newRecord);
case EVENT_TIME_ORDERING:
if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+ newRecord = updatePartiallyIfNeeded(
+ newRecord, existingRecord, readerSchema, readerSchema,
partialUpdateMode);
Review Comment:
even if we are choosing older records for ordering field, don't we still
need to merge w/ new incoming incase of partial updates?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavio
return Triple.of(inferredRecordMergeMode, inferredPayloadClassName,
inferredRecordMergeStrategyId);
}
- public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName) {
+ public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName, HoodieTableVersion tableVersion) {
if (isNullOrEmpty(payloadClassName)) {
return null;
}
+ // TODO: make this only for version > 8 after upgrade table version.
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Review Comment:
I would say this is unreachable code.
inferRecordMergeModeFromPayloadClass should never be invoked for table
version 8 and either of event time ordering and commit time ordering.
```
// Inferring record merge mode
if (isNullOrEmpty(payloadClassName) &&
isNullOrEmpty(recordMergeStrategyId)) {
// If nothing is set on record merge mode, payload class, or record
merge strategy ID,
// use the default merge mode determined by whether the ordering field
name is set.
inferredRecordMergeMode = recordMergeMode != null
? recordMergeMode
: (isNullOrEmpty(orderingFieldName) ? COMMIT_TIME_ORDERING :
EVENT_TIME_ORDERING);
} else {
// Infer the merge mode from either the payload class or record merge
strategy ID
RecordMergeMode modeBasedOnPayload =
inferRecordMergeModeFromPayloadClass(payloadClassName);
.
.
```
We need to account for case where after upgrade, we might have the payload
property, but also merge mode property in the table config.
So, in case of table version 9 and above,
if both exists, we should only consider merge mode.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
Review Comment:
can we introduce private methods for each of the switch case if its more
than 1 or 2 lines.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object newValue = readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
+ if ((isStringTyped(field) || isBytesTyped(field))
Review Comment:
are we accounting for nested fields as well
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavio
return Triple.of(inferredRecordMergeMode, inferredPayloadClassName,
inferredRecordMergeStrategyId);
}
- public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName) {
+ public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName, HoodieTableVersion tableVersion) {
if (isNullOrEmpty(payloadClassName)) {
return null;
}
+ // TODO: make this only for version > 8 after upgrade table version.
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
Review Comment:
also, isn't the version NINE instead of EIGHT ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -75,10 +80,12 @@
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
+ private static final String PARTIAL_UPDATE_CUSTOM_MARKER =
"hoodie.write.partial.update.custom.marker";
Review Comment:
we should define these are ConfigProperties.
for now, add it to HoodieWriteConfig.
@yihua @danny0405 : is it worth adding a new config class named
HoodieRecordMergeConfig
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -141,6 +151,30 @@ protected FileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().isPresent();
this.shouldCheckBuiltInDeleteMarker =
readerContext.getSchemaHandler().hasBuiltInDelete();
+ this.partialUpdateProperties = parsePartialUpdateProperties(props);
+ }
+
+ public static Map<String, String>
parsePartialUpdateProperties(TypedProperties props) {
Review Comment:
why public static?
why not private
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
Review Comment:
why can't we do this validation once somewhere in the constructor and not
per record.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object newValue = readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
+ if ((isStringTyped(field) || isBytesTyped(field))
+ &&
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
{
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
Review Comment:
if there are no markers, can we return the incoming record as is w/o
constructing a new record altogether.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+public enum PartialUpdateMode {
+ @EnumFieldDescription(
+ "No partial update logic should be employed.")
+ NONE,
+
+ @EnumFieldDescription(
+ "For any column values missing in current record, pick value from
previous version of the record.")
+ KEEP_VALUES,
+
+ @EnumFieldDescription(
+ "For columns values missing in current record, pick the default value
from the schema.")
+ FILL_DEFAULTS,
+
+ @EnumFieldDescription(
+ "For columns having default values set in current record, pick the value
from previous version of the record.")
+ IGNORE_DEFAULTS,
+
+ @EnumFieldDescription(
+ "For columns having marker values in the current record, pick value from
previous version of the record."
+ + "Marker value can be defined using hoodie.write.partial.custom.marker,
which is a table property.")
Review Comment:
we should also call out how to set these props. its a writer property that
one needs to set when creating the table for the first time.
during upgrades, it will automatically get added to the table config
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,98 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object newValue = readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
+ if ((isStringTyped(field) || isBytesTyped(field))
+ &&
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
{
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ default:
+ return newRecord;
+ }
+ }
+
+ static boolean isStringTyped(Schema.Field field) {
Review Comment:
why not private for these.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,14 @@ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRo
return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema), false);
}
+ @Override
+ public InternalRow constructEngineRecord(Schema schema, List<Object> values)
{
+ if (schema.getFields().size() != values.size()) {
+ throw new IllegalArgumentException("Schema field count and values size
must match.");
+ }
+ return new GenericInternalRow(values.toArray());
Review Comment:
@linliu-code : can you sync up w/ ethan and get an alignment here
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -446,6 +576,8 @@ protected Pair<Boolean, T> merge(BufferedRecord<T>
olderRecord, BufferedRecord<T
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
+ newerRecord = updatePartiallyIfNeeded(
Review Comment:
can we name this more generically.
`mergeRecords` or something.
and internally, if partial update mode is NONE, lets return right away (1st
line w/n the method)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]