nsivabalan commented on code in PR #13511: URL: https://github.com/apache/hudi/pull/13511#discussion_r2181166614
########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.Objects; + +public class BufferedRecordMergerFactory { + public static <T> BufferedRecordMerger<T> create( + HoodieReaderContext<T> readerContext, + RecordMergeMode recordMergeMode, + boolean enablePartialMerging, + Option<HoodieRecordMerger> recordMerger, + Option<String> orderingFieldName, + Option<String> payloadClass, + Schema readerSchema, + TypedProperties props) { + if (enablePartialMerging) { + BufferedRecordMerger<T> deleteRecordMerger = create( + readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props); + return new PartialUpdateMerger<>(readerContext, recordMerger, deleteRecordMerger, readerSchema, props); + } + switch (recordMergeMode) { + case COMMIT_TIME_ORDERING: + return new CommitTimeOrderingMerger<>(); + case EVENT_TIME_ORDERING: + return new EventTimeOrderingMerger<>(); + default: + if (payloadClass.isPresent()) { + return new CustomPayloadMerger<>(readerContext, recordMerger, orderingFieldName, payloadClass.get(), readerSchema, props); + } else { + return new CustomMerger<>(readerContext, recordMerger, readerSchema, props); + } + } + } + + private static class CommitTimeOrderingMerger<T> implements BufferedRecordMerger<T> { Review Comment: again, this naming is overlapping w/ actual mergers we have already in our code base. lets be conscious on naming classes. Also, shouldn't we suffix/prefix `Buffered` to the naming ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMerger.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.IOException; + +/** + * {@link BufferedRecordMerger} defines how to merge two {@link BufferedRecord} or {@link DeleteRecord}. Review Comment: lets be clear on what this interface is about. this is more of a merge delegator and not really the interface which dictates how merges happen. actual merges are done via merge mode or record merger interface. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java: ########## @@ -67,12 +68,6 @@ public BufferType getBufferType() { public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException { Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, keySpecOpt); - if (dataBlock.containsPartialUpdates()) { - // When a data block contains partial updates, subsequent record merging must always use Review Comment: we are making some minor change on how to detect partial merging previously, if we have 4 log files and only if 4th file contains partial merges, we merge 3 log files as full record and only do partial merge for 4th log file record processing. but now, we are processing all log headers once and then enable partial merges for all log files. @yihua : is there any issue w/ this? should we not do this at all or just that it might result in some perf hit. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMerger.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.IOException; + +/** + * {@link BufferedRecordMerger} defines how to merge two {@link BufferedRecord} or {@link DeleteRecord}. + */ +public interface BufferedRecordMerger<T> { + + /** + * Merging incoming record from log file with existing record in record buffer. + * + * @param newRecord Incoming record from log file + * @param existingRecord Existing record in record buffer + * + * @return The merged record. + */ + Option<BufferedRecord<T>> deltaMerge(BufferedRecord<T> newRecord, BufferedRecord<T> existingRecord) throws IOException; + + /** + * Merging incoming delete record from log file with existing record in record buffer. + * + * @param deleteRecord Incoming delete record from log file + * @param existingRecord Existing record in record buffer + * + * @return The merged record. + */ + Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord, BufferedRecord<T> existingRecord); + + /** + * Merging newer record from log file with old record from base file. + * + * @param olderRecord Older record from base file + * @param newerRecord Newer record from log file + * + * @return The merged record. + */ + Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord) throws IOException; Review Comment: somehow I don't like `finalMerge` name. can we do ``` mergeLogRecords mergeBaseWithLogRecord ``` ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMerger.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import java.io.IOException; + +/** + * {@link BufferedRecordMerger} defines how to merge two {@link BufferedRecord} or {@link DeleteRecord}. Review Comment: how about BufferedRecordMergeHandler ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java: ########## @@ -67,12 +68,6 @@ public BufferType getBufferType() { public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException { Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, keySpecOpt); - if (dataBlock.containsPartialUpdates()) { - // When a data block contains partial updates, subsequent record merging must always use Review Comment: can we start w/ an instance of BufferedRecordMerger initially. and later whenever we come across a log w/ partial header, why not we re-instantiate the BufferedRecordMerger. So that we retain the old behavior and no additional perf hit ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java: ########## @@ -67,12 +68,6 @@ public BufferType getBufferType() { public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException { Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, keySpecOpt); - if (dataBlock.containsPartialUpdates()) { - // When a data block contains partial updates, subsequent record merging must always use Review Comment: oh, and we are processing the log headers additional in this case. ie with ``` boolean enablePartialMerging = ConfigUtils.isPartialMergingEnabled(storage, logFiles, readerContext.getSchemaHandler().getRequiredSchema()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
