jonvex commented on code in PR #13242:
URL: https://github.com/apache/hudi/pull/13242#discussion_r2070653590
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java:
##########
@@ -39,31 +40,46 @@
* @param <T> The type of the engine specific row.
*/
public class BufferedRecord<T> implements Serializable {
+ // the key of the record
private final String recordKey;
+ // the ordering value of the record to be used for even time based ordering
Review Comment:
*event
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -262,30 +265,48 @@ private BiFunction<T, Schema, String>
virtualKeyExtractor(String[] recordKeyFiel
/**
* Gets the ordering value in particular type.
*
- * @param record An option of record.
+ * @param record An engine specific record.
* @param schema The Avro schema of the record.
- * @param orderingFieldName name of the ordering field
+ * @param orderingFieldName name of the ordering field, if any
* @return The ordering value.
*/
- public Comparable getOrderingValue(T record,
- Schema schema,
- Option<String> orderingFieldName) {
+ public Comparable getOrderingValue(T record, Schema schema, Option<String>
orderingFieldName) {
if (orderingFieldName.isEmpty()) {
return DEFAULT_ORDERING_VALUE;
}
Object value = getValue(record, schema, orderingFieldName.get());
- Comparable finalOrderingVal = value != null ?
convertValueToEngineType((Comparable) value) : DEFAULT_ORDERING_VALUE;
- return finalOrderingVal;
+ return value != null ? convertValueToEngineType((Comparable) value) :
DEFAULT_ORDERING_VALUE;
}
/**
* Constructs a new {@link HoodieRecord} based on the given buffered record
{@link BufferedRecord}.
+ * Safe to assume that the buffered record is not a delete.
*
* @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
* @return A new instance of {@link HoodieRecord}.
*/
- public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord);
+ protected abstract HoodieRecord<T>
constructHoodieDataRecord(BufferedRecord<T> bufferedRecord);
+
+ /**
+ * Constructs a new {@link HoodieRecord} based on the given buffered record
{@link BufferedRecord}.
+ *
+ * @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
+ * @return A new instance of {@link HoodieRecord}.
+ */
+ public HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord) {
+ if (bufferedRecord.isDelete()) {
+ return new HoodieEmptyRecord<>(
+ new HoodieKey(bufferedRecord.getRecordKey(), null),
+ HoodieOperation.DELETE,
+ bufferedRecord.getOrderingValue(),
+ getRecordType());
+ } else {
+ return constructHoodieDataRecord(bufferedRecord);
+ }
+ }
+
+ protected abstract HoodieRecord.HoodieRecordType getRecordType();
Review Comment:
Very obvious what this does, but add javadoc
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/EngineBasedMerger.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+
+public class EngineBasedMerger<T> {
+ private final HoodieReaderContext<T> readerContext;
+ private final RecordMergeMode recordMergeMode;
+ private final Option<HoodieRecordMerger> recordMerger;
+ private final Option<String> payloadClass;
+ private final Schema readerSchema;
+ private final TypedProperties props;
+
+ public EngineBasedMerger(HoodieReaderContext<T> readerContext,
RecordMergeMode recordMergeMode, HoodieTableConfig tableConfig, TypedProperties
props) {
+ this.readerContext = readerContext;
+ this.readerSchema =
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
+ this.recordMergeMode = recordMergeMode;
+ this.recordMerger = readerContext.getRecordMerger();
+ if (recordMerger.isPresent() &&
recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID))
{
+ this.payloadClass = Option.of(tableConfig.getPayloadClass());
+ } else {
+ this.payloadClass = Option.empty();
+ }
+ this.props = props;
+ }
+
+ BufferedRecord<T> merge(Option<BufferedRecord<T>> olderOption,
Review Comment:
is all this copied from the record buffer or did you make any logic changes
here?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -148,27 +148,28 @@ public HoodieFileGroupReader(HoodieReaderContext<T>
readerContext, HoodieStorage
/**
* Initialize correct record buffer
*/
- private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext
readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode
recordMergeMode,
- TypedProperties props,
- Option<HoodieBaseFile>
baseFileOption,
- boolean hasNoLogFiles,
- boolean isSkipMerge,
- boolean
shouldUseRecordPosition,
- HoodieReadStats
readStats) {
+ private static <T> FileGroupRecordBuffer<T>
getRecordBuffer(HoodieReaderContext<T> readerContext,
+
HoodieTableMetaClient hoodieTableMetaClient,
+ RecordMergeMode
recordMergeMode,
+ TypedProperties
props,
+
Option<HoodieBaseFile> baseFileOption,
+ boolean
hasNoLogFiles,
+ boolean
isSkipMerge,
+ boolean
shouldUseRecordPosition,
+ HoodieReadStats
readStats) {
+ EngineBasedMerger<T> merger = new EngineBasedMerger<>(readerContext,
recordMergeMode, hoodieTableMetaClient.getTableConfig(), props);
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats, merger);
Review Comment:
do we still need to be passing in the merge mode here?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/EngineBasedMerger.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+
+public class EngineBasedMerger<T> {
Review Comment:
javadocs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]