nsivabalan commented on code in PR #13498: URL: https://github.com/apache/hudi/pull/13498#discussion_r2196323251
########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { + private final HoodieReaderContext<T> readerContext; + private final PartialUpdateMode partialUpdateMode; + private Map<String, String> partialUpdateProperties; + + public PartialUpdateStrategy(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props) { + this.readerContext = readerContext; + this.partialUpdateMode = partialUpdateMode; + this.partialUpdateProperties = parsePartialUpdateProperties(props); + } + + /** + * Merge records based on partial update mode. + * Note that {@param newRecord} refers to the record with higher commit time if COMMIT_TIME_ORDERING mode is used, + * or higher event time if EVENT_TIME_ORDERING mode us used. + */ + BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + // Note that: When either newRecord or oldRecord is a delete record, + // skip partial update since delete records do not have meaningful columns. + if (partialUpdateMode == PartialUpdateMode.NONE + || null == oldRecord + || newRecord.isDelete() + || oldRecord.isDelete()) { + return newRecord; + } + + switch (partialUpdateMode) { + case KEEP_VALUES: + case FILL_DEFAULTS: + return newRecord; + case IGNORE_DEFAULTS: + return reconcileDefaultValues( + newRecord, oldRecord, newSchema, oldSchema, keepOldMetadataColumns); + case IGNORE_MARKERS: + return reconcileMarkerValues( + newRecord, oldRecord, newSchema, oldSchema); + default: + return newRecord; + } + } + + /** + * @param newRecord The newer record determined by the merge mode. + * @param oldRecord The older record determined by the merge mode. + * @param newSchema The schema of the newer record. + * @param oldSchema The schema of the older record. + * @param keepOldMetadataColumns Keep the metadata columns from the older record. + * @return + */ + BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + // The default value only from the top-level data type is validated. That means, + // for nested columns, we do not check the leaf level data type defaults. + for (Schema.Field field : fields) { + String fieldName = field.name(); + Object defaultValue = field.defaultVal(); + Object newValue = readerContext.getValue( + newRecord.getRecord(), newSchema, fieldName); + if (defaultValue == newValue + || (keepOldMetadataColumns && HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) { + updateValues.put(field.pos(), readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName)); + } + } + if (updateValues.isEmpty()) { + return newRecord; + } + engineRecord = readerContext.constructEngineRecord(newSchema, updateValues, newRecord); + return new BufferedRecord<>( + newRecord.getRecordKey(), + newRecord.getOrderingValue(), + engineRecord, + newRecord.getSchemaId(), + newRecord.isDelete()); + } + + BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + String partialUpdateCustomMarker = partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER); + for (Schema.Field field : fields) { + String fieldName = field.name(); + Object newValue = readerContext.getValue(newRecord.getRecord(), newSchema, fieldName); + if ((isStringTyped(field) || isBytesTyped(field)) + && (partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))) { + updateValues.put(field.pos(), readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName)); + } + } + if (updateValues.isEmpty()) { + return newRecord; + } + engineRecord = readerContext.constructEngineRecord(newSchema, updateValues, newRecord); + return new BufferedRecord<>( + newRecord.getRecordKey(), + newRecord.getOrderingValue(), + engineRecord, + newRecord.getSchemaId(), + newRecord.isDelete()); + } + + static boolean isStringTyped(Schema.Field field) { + return hasTargetType(field.schema(), Schema.Type.STRING); + } + + static boolean isBytesTyped(Schema.Field field) { + return hasTargetType(field.schema(), Schema.Type.BYTES); + } + + static boolean hasTargetType(Schema schema, Schema.Type targetType) { + if (schema.getType() == targetType) { + return true; + } else if (schema.getType() == Schema.Type.UNION) { + // Stream is lazy, so this is efficient even with multiple types + return schema.getTypes().stream().anyMatch(s -> s.getType() == targetType); + } + return false; + } + + static Map<String, String> parsePartialUpdateProperties(TypedProperties props) { + Map<String, String> properties = new HashMap<>(); + String raw = props.getProperty(HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key()); + if (StringUtils.isNullOrEmpty(raw)) { + return properties; + } + String[] entries = raw.split(","); Review Comment: don't we already have a util method for this. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -45,27 +46,34 @@ * Factory to create a {@link BufferedRecordMerger}. */ public class BufferedRecordMergerFactory { + + private BufferedRecordMergerFactory() { + } + public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerContext, RecordMergeMode recordMergeMode, boolean enablePartialMerging, Option<HoodieRecordMerger> recordMerger, Option<String> orderingFieldName, Option<String> payloadClass, Schema readerSchema, - TypedProperties props) { + TypedProperties props, + PartialUpdateMode partialUpdateMode) { if (enablePartialMerging) { BufferedRecordMerger<T> deleteRecordMerger = create( - readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props); + readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode); Review Comment: btw, is your plan to change this to using KEEP_VALUES once you have the impl ready ? or are we not looking to fix SQL MIT in 1.1 ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -1071,6 +1108,24 @@ public Set<String> getMetadataPartitions() { CONFIG_VALUES_DELIMITER)); } + public PartialUpdateMode getPartialUpdateMode() { + if (getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) { + String payloadClass = getPayloadClass(); + if (StringUtils.isNullOrEmpty(payloadClass)) { + return PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE)); + } else if (payloadClass.equals(OverwriteNonDefaultsWithLatestAvroPayload.class.getName()) Review Comment: during upgrade, don't we be setting the `PARTIAL_UPDATE_MODE`. and so shouldn't the logic be as follows ``` if (tbl v > 9 ){ return PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE)); } else { return PartialUpdateMode.NONE; } ``` or am I missing something. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -45,27 +46,34 @@ * Factory to create a {@link BufferedRecordMerger}. */ public class BufferedRecordMergerFactory { + + private BufferedRecordMergerFactory() { + } + public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerContext, RecordMergeMode recordMergeMode, boolean enablePartialMerging, Option<HoodieRecordMerger> recordMerger, Option<String> orderingFieldName, Option<String> payloadClass, Schema readerSchema, - TypedProperties props) { + TypedProperties props, + PartialUpdateMode partialUpdateMode) { if (enablePartialMerging) { BufferedRecordMerger<T> deleteRecordMerger = create( - readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props); + readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode); return new PartialUpdateBufferedRecordMerger<>(readerContext, recordMerger, deleteRecordMerger, readerSchema, props); } + switch (recordMergeMode) { case COMMIT_TIME_ORDERING: - return new CommitTimeBufferedRecordMerger<>(); + return new CommitTimeBufferedRecordMerger<>(readerContext, partialUpdateMode, props, readerSchema); Review Comment: yes, agree. then we can keep PartialUpdateStrategy out of CommitTimeBufferedRecordMerger, and only add it to CommitTimeBufferedRecordPartialUpdateMerger ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -98,12 +122,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord * based on {@code EVENT_TIME_ORDERING} merge mode. */ private static class EventTimeBufferedRecordMerger<T> implements BufferedRecordMerger<T> { + private final PartialUpdateStrategy<T> partialUpdateStrategy; + private final Schema readerSchema; + + public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props, + Schema readerSchema) { + this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, partialUpdateMode, props); + this.readerSchema = readerSchema; + } + @Override public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, BufferedRecord<T> existingRecord) { if (existingRecord == null || shouldKeepNewerRecord(existingRecord, newRecord)) { + newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( + newRecord, existingRecord, readerSchema, readerSchema, false); return Option.of(newRecord); + } else { + existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( Review Comment: lets be very cautious to not make any changes to existing full record merge logic ########## hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextTypeHandler.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.engine; + +/** + * Helper class that handle cell level operation. + */ +public class ReaderContextTypeHandler { Review Comment: +1 ########## hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java: ########## @@ -280,7 +286,8 @@ public ClosableIterator<T> getFileRecordIterator( public void initRecordMerger(TypedProperties properties) { RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode(); String mergeStrategyId = tableConfig.getRecordMergeStrategyId(); - if (!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) { + if (tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE) + || tableConfig.getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { Review Comment: not sure I get you Danny. previously, we were inferring just for tbl v 6. and w/ our payload to merge mode migration, we have to infer for tbl v 8 as well. anyways, I guess what lin has coded above is wrong. we need to do inference for tables w/ versios <= 8 ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -312,6 +318,21 @@ public class HoodieTableConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("When set to true, the table can support reading and writing multiple base file formats."); + public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = ConfigProperty + .key("hoodie.write.partial.update.mode") + .defaultValue(PartialUpdateMode.NONE) + .sinceVersion("1.1.0") + .withDocumentation("This property when set, will define how two versions of the record will be " + + "merged together where the later contains only partial set of values and not entire record."); + + public static final ConfigProperty<String> PARTIAL_UPDATE_PROPERTIES = ConfigProperty + .key("hoodie.write.partial.update.properties") + .noDefaultValue() Review Comment: why have "partial" in the naming. we are looking to leverge this even for full record merges right. for eg, incase of AWSDmsAvroPayload, we will be setting some custom delete marker configs. So, how about`hoodie.table.merge.properties` ? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -89,6 +111,12 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, BufferedRecord @Override public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord) { + if (newerRecord == null) { + return Pair.of(olderRecord.isDelete(), olderRecord.getRecord()); + } + + newerRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( Review Comment: why not call it `partialUpdateStrategy.partialMerge()` this will be inline w/ other methods we have named like deltaMerge(), finalMerge() and now partialMerge() ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -98,12 +122,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord * based on {@code EVENT_TIME_ORDERING} merge mode. */ private static class EventTimeBufferedRecordMerger<T> implements BufferedRecordMerger<T> { + private final PartialUpdateStrategy<T> partialUpdateStrategy; + private final Schema readerSchema; + + public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props, + Schema readerSchema) { + this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, partialUpdateMode, props); + this.readerSchema = readerSchema; + } + @Override public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, BufferedRecord<T> existingRecord) { if (existingRecord == null || shouldKeepNewerRecord(existingRecord, newRecord)) { + newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( + newRecord, existingRecord, readerSchema, readerSchema, false); return Option.of(newRecord); + } else { + existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( Review Comment: lets add a comment here that we are swapping the old and new due to lower ordering value before calling partialUpdateStrategy.reconcileFieldsWithOldRecord() in this `else` block ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -77,8 +85,22 @@ public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerCo * based on {@code COMMIT_TIME_ORDERING} merge mode. */ private static class CommitTimeBufferedRecordMerger<T> implements BufferedRecordMerger<T> { + private final PartialUpdateStrategy<T> partialUpdateStrategy; + private final Schema readerSchema; + + public CommitTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props, + Schema readerSchema) { + this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, partialUpdateMode, props); + this.readerSchema = readerSchema; + } + @Override - public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, BufferedRecord<T> existingRecord) { + public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, + BufferedRecord<T> existingRecord) { + newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( Review Comment: lets not touch exiting record mergers as proposed above. we can add new ones for partial updates ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -312,6 +318,21 @@ public class HoodieTableConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("When set to true, the table can support reading and writing multiple base file formats."); + public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = ConfigProperty + .key("hoodie.write.partial.update.mode") Review Comment: yes, lets name this `hoodie.table.partial.update.mode` since its a table property ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { Review Comment: java docs please ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { + private final HoodieReaderContext<T> readerContext; + private final PartialUpdateMode partialUpdateMode; + private Map<String, String> partialUpdateProperties; + + public PartialUpdateStrategy(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props) { + this.readerContext = readerContext; + this.partialUpdateMode = partialUpdateMode; + this.partialUpdateProperties = parsePartialUpdateProperties(props); + } + + /** + * Merge records based on partial update mode. + * Note that {@param newRecord} refers to the record with higher commit time if COMMIT_TIME_ORDERING mode is used, + * or higher event time if EVENT_TIME_ORDERING mode us used. + */ + BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + // Note that: When either newRecord or oldRecord is a delete record, + // skip partial update since delete records do not have meaningful columns. + if (partialUpdateMode == PartialUpdateMode.NONE + || null == oldRecord + || newRecord.isDelete() + || oldRecord.isDelete()) { + return newRecord; + } + + switch (partialUpdateMode) { + case KEEP_VALUES: + case FILL_DEFAULTS: + return newRecord; + case IGNORE_DEFAULTS: + return reconcileDefaultValues( + newRecord, oldRecord, newSchema, oldSchema, keepOldMetadataColumns); + case IGNORE_MARKERS: + return reconcileMarkerValues( + newRecord, oldRecord, newSchema, oldSchema); + default: + return newRecord; + } + } + + /** + * @param newRecord The newer record determined by the merge mode. + * @param oldRecord The older record determined by the merge mode. + * @param newSchema The schema of the newer record. + * @param oldSchema The schema of the older record. + * @param keepOldMetadataColumns Keep the metadata columns from the older record. + * @return + */ + BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + List<Schema.Field> fields = newSchema.getFields(); Review Comment: We need to fix HoodieMergeHandle for COW to also use FG reader. thats the next work item either of Lin or Lokesh will be working on :) ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -45,27 +46,34 @@ * Factory to create a {@link BufferedRecordMerger}. */ public class BufferedRecordMergerFactory { + + private BufferedRecordMergerFactory() { + } + public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerContext, RecordMergeMode recordMergeMode, boolean enablePartialMerging, Option<HoodieRecordMerger> recordMerger, Option<String> orderingFieldName, Option<String> payloadClass, Schema readerSchema, - TypedProperties props) { + TypedProperties props, + PartialUpdateMode partialUpdateMode) { if (enablePartialMerging) { BufferedRecordMerger<T> deleteRecordMerger = create( - readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props); + readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode); Review Comment: can we add java docs here as to how is this code block different from the partialUpdateMode used below in L 70, 72 etc. don't think its apparent ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { + private final HoodieReaderContext<T> readerContext; + private final PartialUpdateMode partialUpdateMode; + private Map<String, String> partialUpdateProperties; + + public PartialUpdateStrategy(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props) { + this.readerContext = readerContext; + this.partialUpdateMode = partialUpdateMode; + this.partialUpdateProperties = parsePartialUpdateProperties(props); + } + + /** + * Merge records based on partial update mode. + * Note that {@param newRecord} refers to the record with higher commit time if COMMIT_TIME_ORDERING mode is used, + * or higher event time if EVENT_TIME_ORDERING mode us used. + */ + BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + // Note that: When either newRecord or oldRecord is a delete record, + // skip partial update since delete records do not have meaningful columns. + if (partialUpdateMode == PartialUpdateMode.NONE + || null == oldRecord + || newRecord.isDelete() + || oldRecord.isDelete()) { + return newRecord; + } + + switch (partialUpdateMode) { + case KEEP_VALUES: + case FILL_DEFAULTS: + return newRecord; + case IGNORE_DEFAULTS: + return reconcileDefaultValues( + newRecord, oldRecord, newSchema, oldSchema, keepOldMetadataColumns); + case IGNORE_MARKERS: + return reconcileMarkerValues( + newRecord, oldRecord, newSchema, oldSchema); + default: + return newRecord; + } + } + + /** + * @param newRecord The newer record determined by the merge mode. + * @param oldRecord The older record determined by the merge mode. + * @param newSchema The schema of the newer record. + * @param oldSchema The schema of the older record. + * @param keepOldMetadataColumns Keep the metadata columns from the older record. + * @return + */ + BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + // The default value only from the top-level data type is validated. That means, + // for nested columns, we do not check the leaf level data type defaults. + for (Schema.Field field : fields) { + String fieldName = field.name(); + Object defaultValue = field.defaultVal(); + Object newValue = readerContext.getValue( + newRecord.getRecord(), newSchema, fieldName); + if (defaultValue == newValue + || (keepOldMetadataColumns && HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) { + updateValues.put(field.pos(), readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName)); + } + } + if (updateValues.isEmpty()) { + return newRecord; + } + engineRecord = readerContext.constructEngineRecord(newSchema, updateValues, newRecord); + return new BufferedRecord<>( + newRecord.getRecordKey(), + newRecord.getOrderingValue(), + engineRecord, + newRecord.getSchemaId(), + newRecord.isDelete()); + } + + BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + String partialUpdateCustomMarker = partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER); + for (Schema.Field field : fields) { + String fieldName = field.name(); + Object newValue = readerContext.getValue(newRecord.getRecord(), newSchema, fieldName); + if ((isStringTyped(field) || isBytesTyped(field)) + && (partialUpdateCustomMarker.equals(readerContext.getTypeHandler().castToString(newValue)))) { + updateValues.put(field.pos(), readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName)); + } + } + if (updateValues.isEmpty()) { + return newRecord; + } + engineRecord = readerContext.constructEngineRecord(newSchema, updateValues, newRecord); + return new BufferedRecord<>( + newRecord.getRecordKey(), + newRecord.getOrderingValue(), + engineRecord, + newRecord.getSchemaId(), + newRecord.isDelete()); + } + + static boolean isStringTyped(Schema.Field field) { + return hasTargetType(field.schema(), Schema.Type.STRING); + } + + static boolean isBytesTyped(Schema.Field field) { + return hasTargetType(field.schema(), Schema.Type.BYTES); + } + + static boolean hasTargetType(Schema schema, Schema.Type targetType) { + if (schema.getType() == targetType) { + return true; + } else if (schema.getType() == Schema.Type.UNION) { + // Stream is lazy, so this is efficient even with multiple types + return schema.getTypes().stream().anyMatch(s -> s.getType() == targetType); + } + return false; + } + + static Map<String, String> parsePartialUpdateProperties(TypedProperties props) { + Map<String, String> properties = new HashMap<>(); + String raw = props.getProperty(HoodieTableConfig.PARTIAL_UPDATE_PROPERTIES.key()); + if (StringUtils.isNullOrEmpty(raw)) { + return properties; + } + String[] entries = raw.split(","); Review Comment: https://github.com/apache/hudi/blob/aa3dcd83fe660daff4d061be7459ddf02d038696/hudi-hadoop-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java#L243 can we try to move this to a util class and re-use ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -116,12 +160,18 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord if (newerRecord.isCommitTimeOrderingDelete()) { return Pair.of(true, newerRecord.getRecord()); } + Comparable newOrderingValue = newerRecord.getOrderingValue(); Comparable oldOrderingValue = olderRecord.getOrderingValue(); if (!olderRecord.isCommitTimeOrderingDelete() && oldOrderingValue.compareTo(newOrderingValue) > 0) { + olderRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( + olderRecord, newerRecord, readerSchema, readerSchema, true); Review Comment: same comment as about about schema per record ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java: ########## @@ -110,6 +112,23 @@ public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRo return new HoodieSparkRecord(hoodieKey, row, HoodieInternalRowUtils.getCachedSchema(schema), false); } + @Override + public InternalRow constructEngineRecord(Schema schema, + Map<Integer, Object> updateValues, + BufferedRecord<InternalRow> baseRecord) { + List<Schema.Field> fields = schema.getFields(); + Object[] values = new Object[fields.size()]; + for (Schema.Field field : fields) { + int pos = field.pos(); + if (updateValues.containsKey(pos)) { + values[pos] = updateValues.get(pos); + } else { + values[pos] = getValue(baseRecord.getRecord(), schema, field.name()); + } + } + return toBinaryRow(schema, new GenericInternalRow(values)); Review Comment: +1 ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -98,12 +126,28 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord * based on {@code EVENT_TIME_ORDERING} merge mode. */ private static class EventTimeBufferedRecordMerger<T> implements BufferedRecordMerger<T> { + private final PartialUpdateStrategy<T> partialUpdateStrategy; + private final Schema readerSchema; + + public EventTimeBufferedRecordMerger(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props, + Schema readerSchema) { + this.partialUpdateStrategy = new PartialUpdateStrategy<>(readerContext, partialUpdateMode, props); + this.readerSchema = readerSchema; + } + @Override public Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, BufferedRecord<T> existingRecord) { if (existingRecord == null || shouldKeepNewerRecord(existingRecord, newRecord)) { + newRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( + newRecord, existingRecord, readerSchema, readerSchema, false); return Option.of(newRecord); + } else { + existingRecord = partialUpdateStrategy.reconcileFieldsWithOldRecord( + existingRecord, newRecord, readerSchema, readerSchema, true); Review Comment: how come we are passing in `readerSchema` for both old and new. incase of partial merges, (even before this patch), won't we fetch the schema at per record level ref: https://github.com/apache/hudi/blob/aa3dcd83fe660daff4d061be7459ddf02d038696/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java#L165 Can we ensure we don't cause any regression to any of existing code ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { + private final HoodieReaderContext<T> readerContext; + private final PartialUpdateMode partialUpdateMode; + private Map<String, String> partialUpdateProperties; + + public PartialUpdateStrategy(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props) { + this.readerContext = readerContext; + this.partialUpdateMode = partialUpdateMode; + this.partialUpdateProperties = parsePartialUpdateProperties(props); + } + + /** + * Merge records based on partial update mode. + * Note that {@param newRecord} refers to the record with higher commit time if COMMIT_TIME_ORDERING mode is used, + * or higher event time if EVENT_TIME_ORDERING mode us used. + */ + BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + // Note that: When either newRecord or oldRecord is a delete record, + // skip partial update since delete records do not have meaningful columns. + if (partialUpdateMode == PartialUpdateMode.NONE + || null == oldRecord + || newRecord.isDelete() + || oldRecord.isDelete()) { + return newRecord; + } + + switch (partialUpdateMode) { + case KEEP_VALUES: + case FILL_DEFAULTS: + return newRecord; Review Comment: guess, lin is fixing KEEP_VALUES in a follow up PR ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.PartialUpdateMode; +import org.apache.hudi.common.util.StringUtils; + +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS; +import static org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER; + +public class PartialUpdateStrategy<T> { + private final HoodieReaderContext<T> readerContext; + private final PartialUpdateMode partialUpdateMode; + private Map<String, String> partialUpdateProperties; + + public PartialUpdateStrategy(HoodieReaderContext<T> readerContext, + PartialUpdateMode partialUpdateMode, + TypedProperties props) { + this.readerContext = readerContext; + this.partialUpdateMode = partialUpdateMode; + this.partialUpdateProperties = parsePartialUpdateProperties(props); + } + + /** + * Merge records based on partial update mode. + * Note that {@param newRecord} refers to the record with higher commit time if COMMIT_TIME_ORDERING mode is used, + * or higher event time if EVENT_TIME_ORDERING mode us used. + */ + BufferedRecord<T> reconcileFieldsWithOldRecord(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + // Note that: When either newRecord or oldRecord is a delete record, + // skip partial update since delete records do not have meaningful columns. + if (partialUpdateMode == PartialUpdateMode.NONE + || null == oldRecord + || newRecord.isDelete() + || oldRecord.isDelete()) { + return newRecord; + } + + switch (partialUpdateMode) { + case KEEP_VALUES: + case FILL_DEFAULTS: + return newRecord; + case IGNORE_DEFAULTS: + return reconcileDefaultValues( + newRecord, oldRecord, newSchema, oldSchema, keepOldMetadataColumns); + case IGNORE_MARKERS: + return reconcileMarkerValues( + newRecord, oldRecord, newSchema, oldSchema); + default: + return newRecord; + } + } + + /** + * @param newRecord The newer record determined by the merge mode. + * @param oldRecord The older record determined by the merge mode. + * @param newSchema The schema of the newer record. + * @param oldSchema The schema of the older record. + * @param keepOldMetadataColumns Keep the metadata columns from the older record. + * @return + */ + BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema, + boolean keepOldMetadataColumns) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + // The default value only from the top-level data type is validated. That means, + // for nested columns, we do not check the leaf level data type defaults. + for (Schema.Field field : fields) { + String fieldName = field.name(); + Object defaultValue = field.defaultVal(); + Object newValue = readerContext.getValue( + newRecord.getRecord(), newSchema, fieldName); + if (defaultValue == newValue + || (keepOldMetadataColumns && HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName))) { + updateValues.put(field.pos(), readerContext.getValue(oldRecord.getRecord(), oldSchema, fieldName)); + } + } + if (updateValues.isEmpty()) { + return newRecord; + } + engineRecord = readerContext.constructEngineRecord(newSchema, updateValues, newRecord); + return new BufferedRecord<>( + newRecord.getRecordKey(), + newRecord.getOrderingValue(), + engineRecord, + newRecord.getSchemaId(), + newRecord.isDelete()); + } + + BufferedRecord<T> reconcileMarkerValues(BufferedRecord<T> newRecord, + BufferedRecord<T> oldRecord, + Schema newSchema, + Schema oldSchema) { + List<Schema.Field> fields = newSchema.getFields(); + Map<Integer, Object> updateValues = new HashMap<>(); + T engineRecord; + String partialUpdateCustomMarker = partialUpdateProperties.get(PARTIAL_UPDATE_CUSTOM_MARKER); Review Comment: can we do this one time in the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
