nsivabalan commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2183607961
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java:
##########
@@ -164,6 +165,20 @@ public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedR
return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}
+ @Override
+ public IndexedRecord constructEngineRecord(Schema schema, List<Object>
values) {
+ if (schema.getFields().size() != values.size()) {
+ throw new IllegalArgumentException("Schema field count and values size
must match.");
+ }
+ GenericData.Record record = new GenericData.Record(schema);
+ List<Schema.Field> fields = schema.getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ record.put(fields.get(i).name(), values.get(i));
Review Comment:
we should be using the position directly instead of the field name while
updating the record here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -841,6 +841,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service. So far
Clustering and Compaction support incremental processing.");
+ public static final ConfigProperty<Boolean>
EVENT_TIME_WATERMARK_METADATA_ENABLED = ConfigProperty
+ .key("hoodie.write.event.time.watermark.metadata.enabled")
Review Comment:
`hoodie.write.track.event.time.watermark`
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroReaderContextTypeHandler.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.engine.ReaderContextTypeHandler;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.nio.ByteBuffer;
+
+public class AvroReaderContextTypeHandler extends ReaderContextTypeHandler {
+ @Override
+ public String castToString(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof CharSequence) {
Review Comment:
we don't need this condition right. anyways, we are falling back to
`value.toString` at L 41 which is the default fall back.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+public enum PartialUpdateMode {
+ @EnumFieldDescription(
+ "No partial update logic should be employed.")
+ NONE,
+
+ @EnumFieldDescription(
+ "For any column values missing in current record, pick value from
previous version of the record.")
+ KEEP_VALUES,
+
+ @EnumFieldDescription(
+ "For columns values missing in current record, pick the default value
from the schema.")
+ FILL_DEFAULTS,
+
+ @EnumFieldDescription(
+ "For columns having default values set in current record, pick the value
from previous version of the record.")
+ IGNORE_DEFAULTS,
+
+ @EnumFieldDescription(
+ "For columns having marker values in the current record, pick value from
previous version of the record."
+ + "Marker value can be defined using hoodie.write.partial.custom.marker,
which is a table property.")
Review Comment:
`Marker value can be defined using hoodie.write.partial.custom.marker
property, which will be part of table property
hoodie.write.partial.update.properties config value`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -141,6 +151,30 @@ protected FileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().isPresent();
this.shouldCheckBuiltInDeleteMarker =
readerContext.getSchemaHandler().hasBuiltInDelete();
+ this.partialUpdateProperties = parsePartialUpdateProperties(props);
+ }
+
+ public static Map<String, String>
parsePartialUpdateProperties(TypedProperties props) {
+ Map<String, String> properties = new HashMap<>();
+ String raw =
props.getProperty(HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key());
Review Comment:
lets pay due attention to var namings.
`partialUpdateProperties`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavio
return Triple.of(inferredRecordMergeMode, inferredPayloadClassName,
inferredRecordMergeStrategyId);
}
- public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName) {
+ public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName, HoodieTableVersion tableVersion) {
if (isNullOrEmpty(payloadClassName)) {
return null;
}
+ // TODO: make this only for version > 8 after upgrade table version.
Review Comment:
which patch are we waiting on to get past this?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -110,6 +112,14 @@ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRo
return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema), false);
}
+ @Override
+ public InternalRow constructEngineRecord(Schema schema, List<Object> values)
{
+ if (schema.getFields().size() != values.size()) {
+ throw new IllegalArgumentException("Schema field count and values size
must match.");
+ }
+ return new GenericInternalRow(values.toArray());
Review Comment:
yes, chatgpt says UnsafeRow is performant, but its immutable. So the
recommended way is to go w/ GenericInternalRow for cases where we wanted to
modify some cols.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -841,6 +841,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service. So far
Clustering and Compaction support incremental processing.");
+ public static final ConfigProperty<Boolean>
EVENT_TIME_WATERMARK_METADATA_ENABLED = ConfigProperty
+ .key("hoodie.write.event.time.watermark.metadata.enabled")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Record event time in record metadata during
runtime.");
Review Comment:
`Records event time watermark metadata in commit metadata when enabled`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -849,16 +869,25 @@ public static Triple<RecordMergeMode, String, String>
inferCorrectMergingBehavio
return Triple.of(inferredRecordMergeMode, inferredPayloadClassName,
inferredRecordMergeStrategyId);
}
- public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName) {
+ public static RecordMergeMode inferRecordMergeModeFromPayloadClass(String
payloadClassName, HoodieTableVersion tableVersion) {
if (isNullOrEmpty(payloadClassName)) {
return null;
}
+ // TODO: make this only for version > 8 after upgrade table version.
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ if (PartialUpdateAvroPayload.class.getName().equals(payloadClassName)
+ ||
PostgresDebeziumAvroPayload.class.getName().equals(payloadClassName)) {
+ return EVENT_TIME_ORDERING;
+ } else if
(OverwriteNonDefaultsWithLatestAvroPayload.class.getName().equals(payloadClassName)
+ || AWSDmsAvroPayload.class.getName().equals(payloadClassName)) {
+ return COMMIT_TIME_ORDERING;
+ }
+ }
Review Comment:
I would say lets separate version 8 from 6 to keep it less confusing.
```
if (table version == 8) {
....
} else if (table version == 6){
....
}
```
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -410,6 +398,15 @@ public Comparable getOrderingValue(T record,
*/
public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord);
+ /**
+ * Constructs a new Engine based record based on a list of values from each
field.
Review Comment:
`Constructs a new Engine based record based on a list of values and given
schema.`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,97 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
Review Comment:
again, lets also add docs that we are only doing this for root level fields
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -841,6 +841,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service. So far
Clustering and Compaction support incremental processing.");
+ public static final ConfigProperty<Boolean>
EVENT_TIME_WATERMARK_METADATA_ENABLED = ConfigProperty
+ .key("hoodie.write.event.time.watermark.metadata.enabled")
+ .defaultValue(true)
Review Comment:
lets mark default as false
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -280,15 +285,11 @@ public ClosableIterator<T> getFileRecordIterator(
public void initRecordMerger(TypedProperties properties) {
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
- if
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
- Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
- recordMergeMode, tableConfig.getPayloadClass(),
- mergeStrategyId, null, tableConfig.getTableVersion());
- recordMergeMode = triple.getLeft();
- mergeStrategyId = triple.getRight();
- }
- this.mergeMode = recordMergeMode;
- this.recordMerger = getRecordMerger(recordMergeMode, mergeStrategyId,
+ Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
Review Comment:
lets add a todo here.
we probably want something like
```
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) {
Triple<RecordMergeMode, String, String> triple =
HoodieTableConfig.inferCorrectMergingBehavior(
recordMergeMode, tableConfig.getPayloadClass(),
mergeStrategyId, null, tableConfig.getTableVersion());
recordMergeMode = triple.getLeft();
mergeStrategyId = triple.getRight();
}
```
essentially, for versions less than 9, we should call infer func.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -1049,6 +1078,20 @@ public Set<String> getMetadataPartitions() {
CONFIG_VALUES_DELIMITER));
}
+ public PartialUpdateMode getPartialUpdateMode() {
+ String payloadClass = getPayloadClass();
Review Comment:
don't we need to return None for version 8 and below.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,97 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
Review Comment:
can you get into the specifics of what we do in
PartialUpdateAvroPayload.mergeDisorderRecordsWithMetadata and ensure the new
logic accounts for that.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+public enum PartialUpdateMode {
+ @EnumFieldDescription(
+ "No partial update logic should be employed.")
+ NONE,
+
+ @EnumFieldDescription(
+ "For any column values missing in current record, pick value from
previous version of the record.")
+ KEEP_VALUES,
+
+ @EnumFieldDescription(
+ "For columns values missing in current record, pick the default value
from the schema.")
Review Comment:
minor.
`For column values ...`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +310,20 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("When set to true, the table can support reading and
writing multiple base file formats.");
+ public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE =
ConfigProperty
+ .key("hoodie.write.partial.update.mode")
+ .defaultValue(PartialUpdateMode.NONE)
+ .sinceVersion("1.1.0")
+ .withDocumentation("This property when set, will define how two versions
of the record will be "
+ + "merged together where the later contains only partial set of
values and not entire record.");
+
+ public static final ConfigProperty<String> PARTIAL_UPDATE_PROPERTIES =
ConfigProperty
+ .key("hoodie.write.partial.update.properties")
+ .noDefaultValue()
+ .sinceVersion("1.1.0")
+ .withDocumentation("The value of this property is in the format of
'K1=V1,K2=V2,...,Ki=Vi,...'."
+ + "Each (Ki, Vi) pair presents a property used by partial update.");
Review Comment:
minor.
`presents a property used by partial update scenarios leveraging
hoodie.write.partial.update.mode`.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +343,64 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private void updatePartiallyIfNeeded(BufferedRecord<T> newRecord,
+ BufferedRecord<T> oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ case COLUMN_FAMILY:
+ // No-op for these modes
+ break;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
Review Comment:
I checked OverwriteNonDefaultsWithLatestAvroPayload. we only do this for
root level fields.
So, lets call it out in the partial update mode documentation.
and update the RFC as well.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,97 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object newValue = readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
+ if ((isStringTyped(field) || isBytesTyped(field))
+ &&
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
{
Review Comment:
we have optimization to check for length before checking for exact euqality
```
private boolean containsBytesToastedValues(IndexedRecord incomingRecord,
Schema.Field field) {
return ((field.schema().getType() == Schema.Type.BYTES
|| (field.schema().getType() == Schema.Type.UNION &&
field.schema().getTypes().stream().anyMatch(s -> s.getType() ==
Schema.Type.BYTES)))
// Check length first as an optimization
&& ((ByteBuffer) ((GenericData.Record)
incomingRecord).get(field.name())).array().length ==
DEBEZIUM_TOASTED_VALUE.length()
&& DEBEZIUM_TOASTED_VALUE.equals(fromUTF8Bytes(((ByteBuffer)
((GenericData.Record) incomingRecord).get(field.name())).array())));
}
```
can we do the same here
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala:
##########
@@ -647,7 +647,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
"hoodie.datasource.write.keygenerator.class" ->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
"hoodie.datasource.write.partitionpath.field" -> "",
"hoodie.datasource.write.payload.class" ->
"org.apache.hudi.common.model.AWSDmsAvroPayload",
- DataSourceWriteOptions.RECORD_MERGE_MODE.key() ->
RecordMergeMode.CUSTOM.name(),
+ DataSourceWriteOptions.RECORD_MERGE_MODE.key() ->
RecordMergeMode.COMMIT_TIME_ORDERING.name(),
Review Comment:
why switching this in this patch?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +369,97 @@ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> ne
}
}
+ /**
+ * Merge records based on partial update mode.
+ * Note that {@param newRecord} refers to the record with higher commit time
+ * if COMMIT_TIME_ORDERING mode is used, or higher event time if
EVENT_TIME_ORDERING mode us used.
+ */
+ private BufferedRecord<T> updatePartiallyIfNeeded(BufferedRecord<T>
newRecord,
+ BufferedRecord<T>
oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ PartialUpdateMode
partialUpdateMode) {
+ if (newRecord.isDelete()) {
+ return newRecord;
+ }
+
+ List<Schema.Field> fields = newSchema.getFields();
+ List<Object> values = new ArrayList<>();
+ T engineRecord;
+ switch (partialUpdateMode) {
+ case NONE:
+ case KEEP_VALUES:
+ case FILL_DEFAULTS:
+ return newRecord;
+
+ case IGNORE_DEFAULTS:
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object defaultValue = field.defaultVal();
+ Object newValue = readerContext.getValue(
+ newRecord.getRecord(), newSchema, fieldName);
+ if (defaultValue == newValue) {
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ case IGNORE_MARKERS:
+ String partialUpdateCustomMarker =
partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER);
+ if (StringUtils.isNullOrEmpty(partialUpdateCustomMarker)) {
+ throw new IllegalStateException(
+ "For 'IGNORE_MARKERS' mode, custom marker '"
+ + PARTIAL_UPDATE_CUSTOM_MARKER + "' must be configured");
+ }
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ Object newValue = readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName);
+ if ((isStringTyped(field) || isBytesTyped(field))
+ &&
partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))
{
+ values.add(readerContext.getValue(oldRecord.getRecord(),
oldSchema, fieldName));
+ } else {
+ values.add(readerContext.getValue(newRecord.getRecord(),
newSchema, fieldName));
+ }
+ }
+ engineRecord = readerContext.constructEngineRecord(newSchema, values);
+ return new BufferedRecord<>(
+ newRecord.getRecordKey(),
+ newRecord.getOrderingValue(),
+ engineRecord,
+ newRecord.getSchemaId(),
+ newRecord.isDelete());
+
+ default:
+ return newRecord;
+ }
+ }
+
+ static boolean isStringTyped(Schema.Field field) {
+ return hasType(field.schema(), Schema.Type.STRING);
+ }
+
+ static boolean isBytesTyped(Schema.Field field) {
+ return hasType(field.schema(), Schema.Type.BYTES);
+ }
+
+ static boolean hasType(Schema schema, Schema.Type targetType) {
Review Comment:
minor
hasTargetType
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, EventTimeAvroPayload,
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload,
PartialUpdateAvroPayload}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestPayloadDeprecationFlow extends SparkClientFunctionalTestHarness {
+ @ParameterizedTest
+ @MethodSource(Array("provideParams"))
+ def testMergerBuiltinPayload(tableType: String,
+ payloadClazz: String): Unit = {
+ val opts: Map[String, String] = Map(
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+ HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key() ->
+ "hoodie.payload.delete.field=xp,hoodie.payload.delete.marker=d")
+ val columns = Seq("ts", "key", "rider", "driver", "fare", "Op")
+
+ // 1. Add an insert.
+ val data = Seq(
+ (10, "1", "rider-A", "driver-A", 19.10, "i"),
+ (10, "2", "rider-B", "driver-B", 27.70, "i"),
+ (10, "3", "rider-C", "driver-C", 33.90, "i"),
+ (10, "4", "rider-D", "driver-D", 34.15, "i"),
+ (10, "5", "rider-E", "driver-E", 17.85, "i"))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+ // 2. Add an update.
+ val updateData = Seq(
Review Comment:
w/ mor, lets ensure we trigger compaction and validate the data post
compaction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]