nsivabalan commented on code in PR #17604:
URL: https://github.com/apache/hudi/pull/17604#discussion_r2692971729
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -63,28 +63,18 @@ public static <T> BufferedRecordMerger<T>
create(HoodieReaderContext<T> readerCo
Option<Pair<String,
String>> payloadClasses,
TypedProperties props,
Option<PartialUpdateMode>
partialUpdateModeOpt) {
- /**
- * This part implements KEEP_VALUES partial update mode, which merges two
records that do not have all columns.
- * Other Partial update modes, like IGNORE_DEFAULTS assume all columns
exists in the record,
- * but some columns contain specific values that should be replaced by
that from older version of the record.
- */
- if (enablePartialMerging) {
- BufferedRecordMerger<T> deleteRecordMerger = create(
- readerContext, recordMergeMode, false, recordMerger, readerSchema,
payloadClasses, props, Option.empty());
- return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, readerSchema, props);
- }
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
if (partialUpdateModeOpt.isEmpty()) {
return new CommitTimeRecordMerger<>();
}
- return new
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), props);
+ return new
CommitTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), readerSchema, props);
case EVENT_TIME_ORDERING:
if (partialUpdateModeOpt.isEmpty()) {
return new EventTimeRecordMerger<>(readerContext.getRecordContext());
}
- return new
EventTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), props);
+ return new
EventTimePartialRecordMerger<>(readerContext.getRecordContext(),
partialUpdateModeOpt.get(), readerSchema, props);
Review Comment:
hey @yihua : looks like partial encoding for MergeInto is only supported
when either of merge mode (commit time or event time) is used. and not for
custom mergers. Was that intentional? Or we wanted to add support down the
line?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialMergerWithKeepValues.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Class to assist with merging two versions of the record that may contain
partial updates using
+ * {@link org.apache.hudi.common.table.PartialUpdateMode#KEEP_VALUES} mode.
+ */
+public class KeepValuesPartialMergingUtils<T> {
+ static KeepValuesPartialMergingUtils INSTANCE = new
KeepValuesPartialMergingUtils();
+ private static final Map<HoodieSchema, Map<String, Integer>>
+ FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+ private static final Map<Pair<Pair<HoodieSchema, HoodieSchema>,
HoodieSchema>, HoodieSchema>
+ MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+ /**
+ * Merges records which can contain partial updates.
+ *
+ * @param lowOrderRecord record with lower commit time or lower ordering
value
+ * @param lowOrderSchema The schema of the older record
+ * @param highOrderRecord record with higher commit time or higher ordering
value
+ * @param highOrderSchema The schema of highOrderRecord
+ * @param newSchema The schema of the new incoming record
+ * @return The merged record and schema.
+ */
+ Pair<BufferedRecord<T>, HoodieSchema> mergePartialRecords(BufferedRecord<T>
lowOrderRecord,
+ HoodieSchema
lowOrderSchema,
+ BufferedRecord<T>
highOrderRecord,
+ HoodieSchema
highOrderSchema,
+ HoodieSchema
newSchema,
+ RecordContext<T>
recordContext) {
+ // The merged schema contains fields that only appear in either older
and/or newer record.
+ HoodieSchema mergedSchema =
+ getCachedMergedSchema(lowOrderSchema, highOrderSchema, newSchema);
+ boolean isNewerPartial = isPartial(highOrderSchema, mergedSchema);
+ if (!isNewerPartial) {
+ return Pair.of(highOrderRecord, highOrderSchema);
+ }
+ Set<String> fieldNamesInNewRecord =
+ getCachedFieldNameToIdMapping(highOrderSchema).keySet();
+ // Collect field values.
+ List<HoodieSchemaField> fields = mergedSchema.getFields();
+ Object[] fieldVals = new Object[fields.size()];
+ int idx = 0;
+ List<HoodieSchemaField> mergedSchemaFields = mergedSchema.getFields();
+ for (HoodieSchemaField mergedSchemaField : mergedSchemaFields) {
Review Comment:
can we create a follow up issue for nested fields.
we should add the support. just that the payloads which we migrated only
supported top level, we tried to maintain parity. But should not block us from
adding support for new users.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -365,6 +365,12 @@ public static final String getDefaultPayloadClassName() {
.sinceVersion("1.1.0")
.withDocumentation("This property when set, will define how two versions
of the record will be merged together when records are partially formed");
+ public static final ConfigProperty<String> MERGE_PROPERTIES = ConfigProperty
Review Comment:
We changed the logic towards this.
we have a common prefix for all merge properties.
you can refer to
https://github.com/apache/hudi/blob/066887ae5b88989b6869c57a3c56137319bbd029/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java#L902
for merge props
may be in the first version of this PR when raised, we had this property,
but later we changed course.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]