cshuo commented on code in PR #13688:
URL: https://github.com/apache/hudi/pull/13688#discussion_r2261743610
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java:
##########
@@ -126,11 +173,11 @@ public boolean equals(Object o) {
}
BufferedRecord<?> that = (BufferedRecord<?>) o;
return isDelete == that.isDelete && Objects.equals(recordKey,
that.recordKey) && Objects.equals(orderingValue, that.orderingValue)
- && Objects.equals(record, that.record) && Objects.equals(schemaId,
that.schemaId);
+ && Objects.equals(record, that.record) && Objects.equals(schemaId,
that.schemaId) && hoodieOperation == that.hoodieOperation;
Review Comment:
`hoodieOperation` is `Enum` object, so it's safe here. (Actually the
`equals()` code is auto-generated.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordConverter.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.RecordContext;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+/**
+ * The converter used to convert the engine-specific record into {@link
BufferedRecord}
+ * according to different {@link IteratorMode}s for the file group reader.
+ */
+public interface BufferedRecordConverter<T> {
+ BufferedRecord<T> convert(T record);
+
+ static <T> BufferedRecordConverter<T> createConverter(
+ IteratorMode iteratorMode, Schema readerSchema, RecordContext<T>
recordContext, List<String> orderingFieldNames) {
+ switch (iteratorMode) {
+ case ENGINE_RECORD:
+ return new BufferedRecordConverter<T>() {
+ private final BufferedRecord<T> reusedBufferedRecord = new
BufferedRecord<>();
Review Comment:
ok
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -95,15 +102,15 @@ public CallbackProcessor(BaseFileUpdateCallback callback,
UpdateProcessor<T> del
}
@Override
- public T processUpdate(String recordKey, T previousRecord, T
currentRecord, boolean isDelete) {
- T result = delegate.processUpdate(recordKey, previousRecord,
currentRecord, isDelete);
+ public BufferedRecord<T> processUpdate(String recordKey, BufferedRecord<T>
previousRecord, BufferedRecord<T> currentRecord, boolean isDelete) {
+ BufferedRecord<T> result = delegate.processUpdate(recordKey,
previousRecord, currentRecord, isDelete);
if (isDelete) {
- callback.onDelete(recordKey, previousRecord);
- } else if (previousRecord != null && previousRecord != currentRecord) {
- callback.onUpdate(recordKey, previousRecord, currentRecord);
+ callback.onDelete(recordKey, previousRecord.getRecord());
+ } else if (previousRecord != null && previousRecord.getRecord() !=
currentRecord.getRecord()) {
Review Comment:
Seems BaseFileUpdateCallback does not need operation type in BufferedRecord ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java:
##########
@@ -66,12 +72,26 @@ public Pair<HoodieFileGroupRecordBuffer<T>, List<String>>
getRecordBuffer(Hoodie
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
}
- Iterator<BufferedRecord> recordIterator = inputSplit.getRecordIterator();
-
+ RecordContext<T> recordContext = readerContext.getRecordContext();
+ Schema recordSchema = readerContext.getSchemaHandler().getTableSchema();
+ Iterator<HoodieRecord> recordIterator = inputSplit.getRecordIterator();
+ String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
while (recordIterator.hasNext()) {
- BufferedRecord bufferedRecord = recordIterator.next();
+ HoodieRecord hoodieRecord = recordIterator.next();
+ T data = recordContext.extractDataFromRecord(hoodieRecord, recordSchema,
props);
try {
- recordBuffer.processNextDataRecord(bufferedRecord,
bufferedRecord.getRecordKey());
+ if (data == null) {
+ DeleteRecord deleteRecord =
DeleteRecord.create(hoodieRecord.getKey(),
hoodieRecord.getOrderingValue(recordSchema, props, orderingFieldsArray));
+ recordBuffer.processNextDeletedRecord(deleteRecord,
deleteRecord.getRecordKey());
+ } else {
+ // HoodieRecord#isDelete does not check if a record is a DELETE
marked by a custom delete marker,
Review Comment:
If the record is a delete with custom payload, then the `data` extracted
from the payload should be null? I think the corner case you mentioned exists,
it should be fixed in `RecordContext#extractDataFromRecord` instead. Maybe we
should create another jira to track the potential problem.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -87,6 +89,8 @@ public abstract class HoodieReaderContext<T> {
private RecordMergeMode mergeMode;
protected RecordContext<T> recordContext;
private FileGroupReaderSchemaHandler<T> schemaHandler = null;
+ // the default iterator mode is engine-specific record mode
+ private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD;
Review Comment:
It'll be a long passing path to deliver the IteratorMode to RecordBuffer. I
think it's ok the put it in ReaderContext, just like RecordMergeMode.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java:
##########
@@ -66,6 +67,8 @@ <T> byte[] serialize(BufferedRecord<T> record,
RecordSerializer<T> recordSeriali
output.writeVarInt(record.getSchemaId(), true);
}
output.writeBoolean(record.isDelete());
+ HoodieOperation operation = record.getHoodieOperation();
Review Comment:
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]