the-other-tim-brown commented on code in PR #13688:
URL: https://github.com/apache/hudi/pull/13688#discussion_r2261319448
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java:
##########
@@ -126,11 +173,11 @@ public boolean equals(Object o) {
}
BufferedRecord<?> that = (BufferedRecord<?>) o;
return isDelete == that.isDelete && Objects.equals(recordKey,
that.recordKey) && Objects.equals(orderingValue, that.orderingValue)
- && Objects.equals(record, that.record) && Objects.equals(schemaId,
that.schemaId);
+ && Objects.equals(record, that.record) && Objects.equals(schemaId,
that.schemaId) && hoodieOperation == that.hoodieOperation;
Review Comment:
`hoodieOperation` may be null so let's use `Objects.equals`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -160,6 +161,8 @@ public String getFieldName() {
protected transient Comparable<?> orderingValue;
+ protected Boolean isDelete;
Review Comment:
Is it worth adding this to the serialized output of the record?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -87,6 +89,8 @@ public abstract class HoodieReaderContext<T> {
private RecordMergeMode mergeMode;
protected RecordContext<T> recordContext;
private FileGroupReaderSchemaHandler<T> schemaHandler = null;
+ // the default iterator mode is engine-specific record mode
+ private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD;
Review Comment:
I don't think this makes a lot of sense as a member of the ReaderContext, is
it possible to keep it confined to the FileGroupReader and RecordBuffer classes?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java:
##########
@@ -121,8 +121,8 @@ public HoodieRecord<RowData>
constructHoodieRecord(BufferedRecord<RowData> buffe
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE,
bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.FLINK);
}
RowData rowData = bufferedRecord.getRecord();
- HoodieOperation operation =
HoodieOperation.fromValue(rowData.getRowKind().toByteValue());
- return new HoodieFlinkRecord(hoodieKey, operation,
bufferedRecord.getOrderingValue(), rowData);
+ // HoodieOperation operation =
HoodieOperation.fromValue(rowData.getRowKind().toByteValue()); ???
Review Comment:
Can this line be removed now?
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java:
##########
@@ -79,22 +79,15 @@ protected static GenericRecord createTestRecord(String
recordKey, int counter, l
return record;
}
- protected static List<BufferedRecord>
convertToBufferedRecordsList(List<IndexedRecord> indexedRecords,
-
HoodieReaderContext<IndexedRecord> readerContext,
-
TypedProperties props, String[] orderingFieldNames) {
- return indexedRecords.stream().map(rec -> {
- HoodieAvroIndexedRecord indexedRecord = new HoodieAvroIndexedRecord(new
HoodieKey(rec.get(0).toString(), ""), rec, null);
- return (BufferedRecord)
BufferedRecord.forRecordWithContext(indexedRecord,
readerContext.getSchemaHandler().getRequestedSchema(),
- readerContext.getRecordContext(), props, orderingFieldNames);
- }).collect(Collectors.toList());
+ protected static List<HoodieRecord>
convertToHoodieRecordsList(List<IndexedRecord> indexedRecords,
+
HoodieReaderContext<IndexedRecord> readerContext,
+
TypedProperties props, String[] orderingFieldNames) {
Review Comment:
Looks like the method signature can be updated to remove these args now
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -576,27 +579,34 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
}
@Override
- public MergeResult<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
+ public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
if (olderRecord.isDelete() || newerRecord.isDelete()) {
if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
// return Option.empty when the new payload data is empty(a delete)
and ignores its ordering value directly.
- return new MergeResult<>(newerRecord.isDelete(),
newerRecord.getRecord());
+ return newerRecord;
} else {
- return new MergeResult<>(olderRecord.isDelete(),
olderRecord.getRecord());
+ return olderRecord;
}
}
return mergeNonDeleteRecord(olderRecord, newerRecord);
}
- public abstract MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException;
+ public abstract BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
+ // Special handling for SENTINEL record in Expression Payload
+ private static BufferedRecord SENTINEL = new BufferedRecord(null, null,
null, null, false);
+
+ private static <T> BufferedRecord<T> getDeleteBufferedRecord(String
recordKey, RecordContext<T> recordContext) {
Review Comment:
The `recordContext` looks like it is not used anymore, can it be removed
from the method arguments?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -339,12 +339,12 @@ public MergeResult<T> finalMerge(BufferedRecord<T>
olderRecord, BufferedRecord<T
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- T data = (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData();
- return new MergeResult<>(false, data);
+ hoodieRecord =
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema);
}
- return new MergeResult<>(false, (T) hoodieRecord.getData());
+ BufferedRecord<T> result =
BufferedRecord.forRecordWithContext(hoodieRecord, readerSchema, recordContext,
props, orderingFields, false);
+ return result;
Review Comment:
nitpick: just directly return instead of assigning to a local variable
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordConverter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+/**
+ * The converter used to convert the engine-specific record into {@link
BufferedRecord}
+ * according to different {@link IteratorMode}s for the file group reader.
+ */
+public interface BufferedRecordConverter<T> {
+ BufferedRecord<T> convert(T record);
+
+ static <T> BufferedRecordConverter<T> createConverter(
+ IteratorMode iteratorMode, Schema readerSchema, RecordContext<T>
recordContext, List<String> orderingFieldNames) {
+ switch (iteratorMode) {
+ case ENGINE_RECORD:
+ return new BufferedRecordConverter<T>() {
+ private final BufferedRecord<T> reusedBufferedRecord = new
BufferedRecord<>();
Review Comment:
Can you add some in-line comment explaining why it is safe to reuse this
object?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -576,27 +579,34 @@ public Option<DeleteRecord> deltaMerge(DeleteRecord
deleteRecord, BufferedRecord
}
@Override
- public MergeResult<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
+ public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
if (olderRecord.isDelete() || newerRecord.isDelete()) {
if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
// return Option.empty when the new payload data is empty(a delete)
and ignores its ordering value directly.
- return new MergeResult<>(newerRecord.isDelete(),
newerRecord.getRecord());
+ return newerRecord;
} else {
- return new MergeResult<>(olderRecord.isDelete(),
olderRecord.getRecord());
+ return olderRecord;
}
}
return mergeNonDeleteRecord(olderRecord, newerRecord);
}
- public abstract MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException;
+ public abstract BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
+ // Special handling for SENTINEL record in Expression Payload
+ private static BufferedRecord SENTINEL = new BufferedRecord(null, null,
null, null, false);
Review Comment:
let's make this `final` as well
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -390,19 +390,20 @@ public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> new
}
@Override
- public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
+ public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
recordContext.constructHoodieRecord(olderRecord),
recordContext.getSchemaFromBufferRecord(olderRecord),
recordContext.constructHoodieRecord(newerRecord),
recordContext.getSchemaFromBufferRecord(newerRecord), props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return new MergeResult<>(false, (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData());
+ hoodieRecord =
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema);
}
- return new MergeResult<>(false, (T) hoodieRecord.getData());
+ BufferedRecord<T> result =
BufferedRecord.forRecordWithContext(hoodieRecord, readerSchema, recordContext,
props, orderingFields, false);
+ return result;
Review Comment:
Similarly here the assignment is redundant
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java:
##########
@@ -66,6 +67,8 @@ <T> byte[] serialize(BufferedRecord<T> record,
RecordSerializer<T> recordSeriali
output.writeVarInt(record.getSchemaId(), true);
}
output.writeBoolean(record.isDelete());
+ HoodieOperation operation = record.getHoodieOperation();
Review Comment:
Can you update `TestBufferedRecordSerializer` to include a case where the
operation is non-null?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -95,15 +102,15 @@ public CallbackProcessor(BaseFileUpdateCallback callback,
UpdateProcessor<T> del
}
@Override
- public T processUpdate(String recordKey, T previousRecord, T
currentRecord, boolean isDelete) {
- T result = delegate.processUpdate(recordKey, previousRecord,
currentRecord, isDelete);
+ public BufferedRecord<T> processUpdate(String recordKey, BufferedRecord<T>
previousRecord, BufferedRecord<T> currentRecord, boolean isDelete) {
+ BufferedRecord<T> result = delegate.processUpdate(recordKey,
previousRecord, currentRecord, isDelete);
if (isDelete) {
- callback.onDelete(recordKey, previousRecord);
- } else if (previousRecord != null && previousRecord != currentRecord) {
- callback.onUpdate(recordKey, previousRecord, currentRecord);
+ callback.onDelete(recordKey, previousRecord.getRecord());
+ } else if (previousRecord != null && previousRecord.getRecord() !=
currentRecord.getRecord()) {
Review Comment:
Can we update the BaseFileUpdateCallback to also operate on the
BufferedRecord so we can use the operation type?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java:
##########
@@ -80,6 +80,15 @@ public HoodieHiveRecord(HoodieKey key, ArrayWritable data,
Schema schema, Object
isDeleted = data == null;
}
+ public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema,
ObjectInspectorCache objectInspectorCache, HoodieOperation hoodieOperation,
boolean isDelete) {
+ super(key, data, hoodieOperation, isDelete, Option.empty());
+ this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+ this.objectInspectorCache = objectInspectorCache;
+ this.schema = schema;
+ this.copy = false;
+ isDeleted = data == null;
Review Comment:
Should this be set to `isDelete`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]