nsivabalan commented on code in PR #17604: URL: https://github.com/apache/hudi/pull/17604#discussion_r2729557354
########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.engine.RecordContext; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class to assist with merging two versions of the record that may contain partial updates using + * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode. + */ Review Comment: we should call out the expectations from this class. w/ a simple example as well. Also, lets call out the expectations from nested fields ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java: ########## @@ -101,6 +101,7 @@ protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext, this.readerSchema = HoodieSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema()); this.recordMergeMode = recordMergeMode; this.partialUpdateModeOpt = partialUpdateModeOpt; + this.enablePartialMerging = partialUpdateModeOpt.isPresent(); Review Comment: lets be cautious in expanding the terminology now. previously,`enablePartialMerging` was mainly referring to partial encoding from MOR. but now w/ this patch, it is also referring to partial update mode (table property). ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java: ########## @@ -82,17 +85,43 @@ BufferedRecord<T> partialMerge(BufferedRecord<T> highOrderRecord, } switch (partialUpdateMode) { + case KEEP_VALUES: + return reconcileBasedOnKeepValues(highOrderRecord, lowOrderRecord, highOrderSchema, lowOrderSchema, newSchema); case IGNORE_DEFAULTS: return reconcileDefaultValues( highOrderRecord, lowOrderRecord, highOrderSchema, lowOrderSchema, newSchema); case FILL_UNAVAILABLE: return reconcileMarkerValues( highOrderRecord, lowOrderRecord, highOrderSchema, lowOrderSchema, newSchema); default: - return highOrderRecord; + throw new HoodieIOException("Unsupported PartialUpdateMode " + partialUpdateMode + " detected"); } } + /** + * Reconcile two versions of the record based on KEEP_VALUES. + * i.e for values missing from new record, we pick from older record, if not, value from new record is picked for each column. + * @param highOrderRecord record with higher commit time or higher ordering value + * @param lowOrderRecord record with lower commit time or lower ordering value + * @param highOrderSchema The schema of highOrderRecord + * @param lowOrderSchema The schema of the older record + * @param newSchema The schema of the new incoming record + * @return the merged record of type {@link BufferedRecord} + */ + BufferedRecord<T> reconcileBasedOnKeepValues(BufferedRecord<T> highOrderRecord, Review Comment: private ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.engine.RecordContext; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class to assist with merging two versions of the record that may contain partial updates using + * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode. + */ +public class PartialMergerWithKeepValues<T> implements Serializable { + private final Map<HoodieSchema, Set<String>> + fieldNameCache = new HashMap<>(); + private final Map<Pair<Pair<HoodieSchema, HoodieSchema>, HoodieSchema>, HoodieSchema> + mergedSchemaCache = new HashMap<>(); + + /** + * Merges records which can contain partial updates. + * + * @param lowOrderRecord record with lower commit time or lower ordering value + * @param lowOrderSchema The schema of the older record + * @param highOrderRecord record with higher commit time or higher ordering value + * @param highOrderSchema The schema of highOrderRecord + * @param newSchema The schema of the new incoming record + * @return The merged record and schema. + */ + Pair<BufferedRecord<T>, HoodieSchema> mergePartialRecords(BufferedRecord<T> lowOrderRecord, + HoodieSchema lowOrderSchema, + BufferedRecord<T> highOrderRecord, + HoodieSchema highOrderSchema, + HoodieSchema newSchema, + RecordContext<T> recordContext) { + // The merged schema contains fields that only appear in either older and/or newer record. + HoodieSchema mergedSchema = + getCachedMergedSchema(lowOrderSchema, highOrderSchema, newSchema); + boolean isNewerPartial = isPartial(highOrderSchema, mergedSchema); + if (!isNewerPartial) { + return Pair.of(highOrderRecord, highOrderSchema); + } + Set<String> fieldNamesInNewRecord = + getCachedFieldNames(highOrderSchema); + // Collect field values. + List<HoodieSchemaField> fields = mergedSchema.getFields(); + Object[] fieldVals = new Object[fields.size()]; + int idx = 0; + List<HoodieSchemaField> mergedSchemaFields = mergedSchema.getFields(); + for (HoodieSchemaField mergedSchemaField : mergedSchemaFields) { + String fieldName = mergedSchemaField.name(); + if (fieldNamesInNewRecord.contains(fieldName)) { // field present in newer record. + fieldVals[idx++] = recordContext.getValue(highOrderRecord.getRecord(), highOrderSchema, fieldName); + } else { // if not present in newer record pick from old record + fieldVals[idx++] = recordContext.getValue(lowOrderRecord.getRecord(), lowOrderSchema, fieldName); + } + } + // Build merged record. + T engineRecord = recordContext.constructEngineRecord(mergedSchema, fieldVals); + BufferedRecord<T> mergedRecord = new BufferedRecord<>( + highOrderRecord.getRecordKey(), + highOrderRecord.getOrderingValue(), + engineRecord, + recordContext.encodeSchema(mergedSchema), + highOrderRecord.getHoodieOperation()); + return Pair.of(mergedRecord, mergedSchema); + } + + /** + * @param hoodieSchema Hoodie schema. + * @return The set of field names. + */ + Set<String> getCachedFieldNames(HoodieSchema hoodieSchema) { + return fieldNameCache.computeIfAbsent(hoodieSchema, schema -> { + Set<String> fieldNames = new HashSet<>(); + for (HoodieSchemaField field : schema.getFields()) { + fieldNames.add(field.name()); + } + return fieldNames; + }); + } + + /** + * Merges the two schemas so the merged schema contains all the fields from the two schemas, + * with the same ordering of fields based on the provided reader schema. + * + * @param oldSchema Old schema. + * @param newSchema New schema. + * @param readerSchema Reader schema containing all the fields to read. + * @return The merged Avro schema. + */ + HoodieSchema getCachedMergedSchema(HoodieSchema oldSchema, Review Comment: private ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java: ########## @@ -115,6 +115,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO // When a data block contains partial updates, subsequent record merging must always use // partial merging. enablePartialMerging = true; + if (partialUpdateModeOpt.isEmpty()) { + this.partialUpdateModeOpt = Option.of(PartialUpdateMode.KEEP_VALUES); Review Comment: can you confirm this behavior is retained. File group: base file, and 2 log files has full schema. and 3rd log file contains partially encoded schema (MOR partial encoding). when we are merging different log files, we should enable `enablePartialMerging` and `partialUpdateModeOpt` only when 3rd log file is processed and not before that. and hence these lines ``` this.bufferedRecordMerger = BufferedRecordMergerFactory.create( readerContext, recordMergeMode, enablePartialMerging, recordMerger, readerSchema, payloadClasses, props, partialUpdateModeOpt); ``` in FileGroupRecordBuffer should reflect that accordingly ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java: ########## @@ -97,16 +104,41 @@ public HoodieRecord<ArrayWritable> constructHoodieRecord(BufferedRecord<ArrayWri @Override public ArrayWritable constructEngineRecord(HoodieSchema recordSchema, Object[] fieldValues) { - return new ArrayWritable(Writable.class, (Writable[]) fieldValues); + Schema avroSchema = recordSchema.toAvroSchema(); + List<Schema.Field> fields = avroSchema.getFields(); + if (fields.size() != fieldValues.length) { + throw new IllegalArgumentException("Mismatch between schema fields and values"); + } + + Writable[] writables = new Writable[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Schema fieldSchema = resolveUnion(fields.get(i).schema()); + writables[i] = convertToWritable(fieldSchema, fieldValues[i]); + } + return new ArrayWritable(Writable.class, writables); } @Override public ArrayWritable mergeWithEngineRecord(HoodieSchema schema, Map<Integer, Object> updateValues, BufferedRecord<ArrayWritable> baseRecord) { Writable[] engineRecord = baseRecord.getRecord().get(); + Schema avroSchema = schema.toAvroSchema(); + List<Schema.Field> fields = avroSchema.getFields(); for (Map.Entry<Integer, Object> value : updateValues.entrySet()) { - engineRecord[value.getKey()] = (Writable) value.getValue(); + int pos = value.getKey(); + Object updateValue = value.getValue(); + + // If value is already a Writable, use it directly + if (updateValue instanceof Writable) { Review Comment: do we have test coverage for this ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.engine.RecordContext; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Class to assist with merging two versions of the record that may contain partial updates using + * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode. + */ Review Comment: lets file a follow up ticket. Looks like in this patch, we are mainly focusing on MOR partial encoding. But I am thinking we should expand the functionality to COW as well. for eg, in spark data source writes, users should be able to supply only a subset of columns, and able to partial update the table. we can take it as a follow up patch. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java: ########## @@ -148,7 +148,17 @@ public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> buffe @Override public RowData constructEngineRecord(HoodieSchema recordSchema, Object[] fieldValues) { - return GenericRowData.of(fieldValues); + List<HoodieSchemaField> fields = recordSchema.getFields(); Review Comment: yes, I also feel this is too heavy since we are doing this for every record. We got to be cautious in adding any per record validation in general. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java: ########## @@ -77,6 +77,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO // When a data block contains partial updates, subsequent record merging must always use // partial merging. enablePartialMerging = true; + if (partialUpdateModeOpt.isEmpty()) { + this.partialUpdateModeOpt = Option.of(PartialUpdateMode.KEEP_VALUES); Review Comment: since we are standardizing partialUpdateModeOpt, why do we need `enablePartialMerging` anymore ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -22,10 +22,14 @@ import org.apache.hudi.common.config.EnumFieldDescription; public enum PartialUpdateMode { + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") Review Comment: we should call out the nested fields as well ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala: ########## @@ -680,6 +689,59 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { ) } + test("Test MERGE INTO with KEEP_VALUES partial update mode on MOR table with Avro log format") { Review Comment: If we have covered diff combinations either with TestPartialUpdateHandler or TestBufferedRecordMerger or else where, we don't need to do end to end functional. Its worth having 1 or 2 end to end functionally, but not for all combinations. For eg, Event time ordering based merge mode : - MOR, partial encoding and no partial update mode set in table config. - MOR, partial encoding + partial update mode set in table config. these 2 should suffice for end to end functionally. but lets ensure all combinations are tested at lower levels ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -63,28 +63,18 @@ public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerCo Option<Pair<String, String>> payloadClasses, TypedProperties props, Option<PartialUpdateMode> partialUpdateModeOpt) { - /** - * This part implements KEEP_VALUES partial update mode, which merges two records that do not have all columns. - * Other Partial update modes, like IGNORE_DEFAULTS assume all columns exists in the record, - * but some columns contain specific values that should be replaced by that from older version of the record. - */ - if (enablePartialMerging) { - BufferedRecordMerger<T> deleteRecordMerger = create( - readerContext, recordMergeMode, false, recordMerger, readerSchema, payloadClasses, props, Option.empty()); - return new PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), recordMerger, deleteRecordMerger, readerSchema, props); - } switch (recordMergeMode) { case COMMIT_TIME_ORDERING: if (partialUpdateModeOpt.isEmpty()) { return new CommitTimeRecordMerger<>(); } - return new CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), partialUpdateModeOpt.get(), props); + return new CommitTimePartialRecordMerger<>(readerContext.getRecordContext(), partialUpdateModeOpt.get(), readerSchema, props); case EVENT_TIME_ORDERING: if (partialUpdateModeOpt.isEmpty()) { return new EventTimeRecordMerger<>(readerContext.getRecordContext()); } - return new EventTimePartialRecordMerger<>(readerContext.getRecordContext(), partialUpdateModeOpt.get(), props); + return new EventTimePartialRecordMerger<>(readerContext.getRecordContext(), partialUpdateModeOpt.get(), readerSchema, props); Review Comment: @yihua : ping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
