This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f0fcbf6eaf3 [HUDI-9318] Refactor the log records presentation in
FileGroupRecordBuffer (#13225)
f0fcbf6eaf3 is described below
commit f0fcbf6eaf39dfe79e2b27ff7d626b0a8c06bce0
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Apr 28 13:50:29 2025 +0800
[HUDI-9318] Refactor the log records presentation in FileGroupRecordBuffer
(#13225)
---
.../hudi/client/model/AbstractHoodieRowData.java | 2 +-
.../hudi/client/model/HoodieFlinkRecord.java | 27 +--
.../model/HoodieRowDataWithUpdatedMetaField.java | 57 +++++
.../apache/hudi/util/RowDataAvroQueryContexts.java | 11 +
.../hudi/common/model/HoodieSparkRecord.java | 20 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 23 +-
.../apache/spark/sql/HoodieInternalRowUtils.scala | 14 ++
.../apache/hudi/avro/HoodieAvroReaderContext.java | 20 +-
.../hudi/common/engine/HoodieReaderContext.java | 92 ++------
.../hudi/common/model/HoodieAvroIndexedRecord.java | 2 +-
.../apache/hudi/common/model/HoodieAvroRecord.java | 2 +-
.../hudi/common/model/HoodieEmptyRecord.java | 2 +-
.../org/apache/hudi/common/model/HoodieRecord.java | 25 ++-
.../table/log/HoodieMergedLogRecordReader.java | 8 +-
.../hudi/common/table/read/BufferedRecord.java | 110 +++++++++
.../common/table/read/FileGroupRecordBuffer.java | 247 +++++++++------------
.../table/read/HoodieFileGroupRecordBuffer.java | 15 +-
.../table/read/KeyBasedFileGroupRecordBuffer.java | 61 ++---
.../read/PositionBasedFileGroupRecordBuffer.java | 75 +++----
.../table/read/UnmergedFileGroupRecordBuffer.java | 22 +-
.../table/read/TestFileGroupRecordBuffer.java | 35 ++-
.../table/read/TestHoodieFileGroupReaderBase.java | 39 ++--
.../table/format/FlinkRowDataReaderContext.java | 55 +++--
.../hudi/hadoop/HiveHoodieReaderContext.java | 20 +-
.../org/apache/hudi/hadoop/HoodieHiveRecord.java | 2 +-
.../TestPositionBasedFileGroupRecordBuffer.java | 8 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 15 +-
27 files changed, 526 insertions(+), 483 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
index 7f3217f339b..5c92ebb1500 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
@@ -161,7 +161,7 @@ public abstract class AbstractHoodieRowData implements
RowData {
return row.getMap(rebaseOrdinal(ordinal));
}
- private String getMetaColumnVal(int ordinal) {
+ protected String getMetaColumnVal(int ordinal) {
return this.metaColumns[ordinal];
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 376606f0e21..c3d9ef4364d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.util.RowDataAvroQueryContexts;
@@ -53,7 +52,6 @@ import static
org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
* Flink Engine-specific Implementations of `HoodieRecord`, which is expected
to hold {@code RowData} as payload.
*/
public class HoodieFlinkRecord extends HoodieRecord<RowData> {
- private Comparable<?> orderingValue;
public HoodieFlinkRecord(RowData rowData) {
super(null, rowData);
@@ -96,16 +94,13 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
- if (this.orderingValue == null) {
- String orderingField = ConfigUtils.getOrderingField(props);
- if (isNullOrEmpty(orderingField)) {
- this.orderingValue = DEFAULT_ORDERING_VALUE;
- } else {
- this.orderingValue = (Comparable<?>)
getColumnValueAsJava(recordSchema, orderingField, props, false);
- }
+ protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
+ String orderingField = ConfigUtils.getOrderingField(props);
+ if (isNullOrEmpty(orderingField)) {
+ return DEFAULT_ORDERING_VALUE;
+ } else {
+ return (Comparable<?>) getColumnValueAsJava(recordSchema, orderingField,
props, false);
}
- return this.orderingValue;
}
@Override
@@ -170,11 +165,11 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
@Override
public HoodieRecord updateMetaField(Schema recordSchema, int ordinal, String
value) {
-
ValidationUtils.checkArgument(recordSchema.getField(RECORD_KEY_METADATA_FIELD)
!= null,
- "The record is expected to contain metadata fields.");
- GenericRowData rowData = (GenericRowData) getData();
- rowData.setField(ordinal, StringData.fromString(value));
- return this;
+ String[] metaVals = new String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+ metaVals[ordinal] = value;
+ boolean withOperation = recordSchema.getField(OPERATION_METADATA_FIELD) !=
null;
+ RowData rowData = new HoodieRowDataWithUpdatedMetaField(metaVals, ordinal,
getData(), withOperation);
+ return new HoodieFlinkRecord(getKey(), getOperation(), orderingValue,
rowData);
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
new file mode 100644
index 00000000000..9d66201dc82
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+/**
+ * RowData implementation for Hoodie Row. It wraps an {@link RowData} and
keeps meta columns locally,
+ * but the meta columns array only contains one updated meta field, e.g.,
updated `FILENAME_METADATA_FIELD`
+ * for base file writing during compaction.
+ */
+public class HoodieRowDataWithUpdatedMetaField extends
HoodieRowDataWithMetaFields {
+ private final int updatedMetaOrdinal;
+
+ public HoodieRowDataWithUpdatedMetaField(
+ String[] metaVals,
+ int updatedMetaOrdinal,
+ RowData row,
+ boolean withOperation) {
+ super(metaVals[0], metaVals[1], metaVals[2], metaVals[3], metaVals[4],
row, withOperation);
+ this.updatedMetaOrdinal = updatedMetaOrdinal;
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ if (updatedMetaOrdinal == ordinal) {
+ return null == getMetaColumnVal(ordinal);
+ } else {
+ return row.isNullAt(rebaseOrdinal(ordinal));
+ }
+ }
+
+ @Override
+ public StringData getString(int ordinal) {
+ if (updatedMetaOrdinal == ordinal) {
+ return StringData.fromString(getMetaColumnVal(ordinal));
+ }
+ return row.getString(rebaseOrdinal(ordinal));
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
index bf7875cb892..055d41f1d61 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
import org.apache.avro.Schema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +40,9 @@ import java.util.function.Function;
public class RowDataAvroQueryContexts {
private static final Map<Schema, RowDataQueryContext> QUERY_CONTEXT_MAP =
new ConcurrentHashMap<>();
+ // BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe.
+ private static final ThreadLocal<Map<Schema, RowDataSerializer>>
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
+
public static RowDataQueryContext fromAvroSchema(Schema avroSchema) {
return fromAvroSchema(avroSchema, true);
}
@@ -62,6 +66,13 @@ public class RowDataAvroQueryContexts {
});
}
+ public static RowDataSerializer getRowDataSerializer(Schema avroSchema) {
+ return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(avroSchema, schema
-> {
+ RowType rowType = (RowType)
fromAvroSchema(schema).getRowType().getLogicalType();
+ return new RowDataSerializer(rowType);
+ });
+ }
+
public static class RowDataQueryContext {
private final DataType rowType;
private final Map<String, FieldQueryContext> contextMap;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index e6b336f9870..d94888fd83b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -324,20 +324,18 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
- if (isNullOrEmpty(orderingField)) {
- return DEFAULT_ORDERING_VALUE;
- }
- scala.Option<NestedFieldPath> cachedNestedFieldPath =
- HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
- if (cachedNestedFieldPath.isDefined()) {
- NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
- return (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
- } else {
- return DEFAULT_ORDERING_VALUE;
+ if (!isNullOrEmpty(orderingField)) {
+ scala.Option<NestedFieldPath> cachedNestedFieldPath =
+ HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+ if (cachedNestedFieldPath.isDefined()) {
+ NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
+ return (Comparable<?>)
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
+ }
}
+ return DEFAULT_ORDERING_VALUE;
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 8df5bdf57b7..92478ee19fc 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StorageConfiguration;
@@ -36,6 +37,7 @@ import org.apache.avro.Schema;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -93,17 +95,15 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
}
@Override
- public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow>
rowOption,
- Map<String, Object>
metadataMap) {
- if (!rowOption.isPresent()) {
+ public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+ if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
- new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
- (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
+ new HoodieKey(bufferedRecord.getRecordKey(), null),
HoodieRecord.HoodieRecordType.SPARK);
}
- Schema schema = getSchemaFromMetadata(metadataMap);
- InternalRow row = rowOption.get();
+ Schema schema = getSchemaFromBufferRecord(bufferedRecord);
+ InternalRow row = bufferedRecord.getRecord();
return new HoodieSparkRecord(row,
HoodieInternalRowUtils.getCachedSchema(schema));
}
@@ -112,6 +112,15 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
return internalRow.copy();
}
+ @Override
+ public InternalRow toBinaryRow(Schema schema, InternalRow internalRow) {
+ if (internalRow instanceof UnsafeRow) {
+ return internalRow;
+ }
+ final UnsafeProjection unsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
+ return unsafeProjection.apply(internalRow);
+ }
+
private Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 2a0c2568cdb..519519fea37 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -60,6 +60,12 @@ object HoodieInternalRowUtils {
new mutable.HashMap[(StructType, StructType), UnsafeProjection]
})
+ private val identicalUnsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[Schema, UnsafeProjection]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[Schema,
UnsafeProjection]] {
+ override def get(): mutable.HashMap[Schema, UnsafeProjection] =
+ new mutable.HashMap[Schema, UnsafeProjection]
+ })
+
private val schemaMap = new ConcurrentHashMap[Schema, StructType]
private val orderPosListMap = new ConcurrentHashMap[(StructType, String),
Option[NestedFieldPath]]
@@ -75,6 +81,14 @@ object HoodieInternalRowUtils {
.getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
}
+ /**
+ * Provides cached instance of [[UnsafeProjection]] to project Java object
based [[InternalRow]] to [[UnsafeRow]].
+ */
+ def getCachedUnsafeProjection(schema: Schema): UnsafeProjection = {
+ identicalUnsafeProjectionThreadLocal.get()
+ .getOrElseUpdate(schema,
UnsafeProjection.create(getCachedSchema(schema)))
+ }
+
/**
* Provides cached instance of [[UnsafeRowWriter]] transforming provided
[[InternalRow]]s from
* one [[StructType]] and into another [[StructType]]
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 13831b62c96..613a21159de 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -160,17 +161,15 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
}
@Override
- public HoodieRecord constructHoodieRecord(
- Option<IndexedRecord> recordOpt,
- Map<String, Object> metadataMap) {
- if (!recordOpt.isPresent()) {
+ public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+ if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
- (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
- (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
- (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
+ bufferedRecord.getRecordKey(),
+ null,
+ bufferedRecord.getOrderingValue(),
payloadClass);
}
- return new HoodieAvroIndexedRecord(recordOpt.get());
+ return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
}
@Override
@@ -178,6 +177,11 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return record;
}
+ @Override
+ public IndexedRecord toBinaryRow(Schema avroSchema, IndexedRecord record) {
+ return record;
+ }
+
@Override
public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
Schema
skeletonRequiredSchema,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 88d6ca971d3..62249b384cd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -39,7 +40,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
@@ -151,15 +151,6 @@ public abstract class HoodieReaderContext<T> {
return storageConfiguration;
}
- // These internal key names are only used in memory for record metadata and
merging,
- // and should not be persisted to storage.
- public static final String INTERNAL_META_RECORD_KEY = "_0";
- public static final String INTERNAL_META_PARTITION_PATH = "_1";
- public static final String INTERNAL_META_ORDERING_FIELD = "_2";
- public static final String INTERNAL_META_OPERATION = "_3";
- public static final String INTERNAL_META_INSTANT_TIME = "_4";
- public static final String INTERNAL_META_SCHEMA_ID = "_5";
-
/**
* Gets the record iterator based on the type of engine-specific record
representation from the
* file.
@@ -251,39 +242,30 @@ public abstract class HoodieReaderContext<T> {
/**
* Gets the ordering value in particular type.
*
- * @param recordOption An option of record.
- * @param metadataMap A map containing the record metadata.
- * @param schema The Avro schema of the record.
+ * @param record An option of record.
+ * @param schema The Avro schema of the record.
* @param orderingFieldName name of the ordering field
* @return The ordering value.
*/
- public Comparable getOrderingValue(Option<T> recordOption,
- Map<String, Object> metadataMap,
+ public Comparable getOrderingValue(T record,
Schema schema,
Option<String> orderingFieldName) {
- if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
- return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
- }
-
- if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
+ if (orderingFieldName.isEmpty()) {
return DEFAULT_ORDERING_VALUE;
}
- Object value = getValue(recordOption.get(), schema,
orderingFieldName.get());
+ Object value = getValue(record, schema, orderingFieldName.get());
Comparable finalOrderingVal = value != null ?
convertValueToEngineType((Comparable) value) : DEFAULT_ORDERING_VALUE;
- metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
return finalOrderingVal;
}
/**
- * Constructs a new {@link HoodieRecord} based on the record of
engine-specific type and metadata for merging.
+ * Constructs a new {@link HoodieRecord} based on the given buffered record
{@link BufferedRecord}.
*
- * @param recordOption An option of the record in engine-specific type if
exists.
- * @param metadataMap The record metadata.
+ * @param bufferedRecord The {@link BufferedRecord} object with
engine-specific row
* @return A new instance of {@link HoodieRecord}.
*/
- public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
- Map<String, Object>
metadataMap);
+ public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T>
bufferedRecord);
/**
* Seals the engine-specific record to make sure the data referenced in
memory do not change.
@@ -294,58 +276,24 @@ public abstract class HoodieReaderContext<T> {
public abstract T seal(T record);
/**
- * Generates metadata map based on the information.
+ * Converts engine specific row into binary format.
*
- * @param recordKey Record key in String.
- * @param partitionPath Partition path in String.
- * @param orderingVal Ordering value in String.
- * @return A mapping containing the metadata.
- */
- public Map<String, Object> generateMetadataForRecord(
- String recordKey, String partitionPath, Comparable orderingVal) {
- Map<String, Object> meta = new HashMap<>();
- meta.put(INTERNAL_META_RECORD_KEY, recordKey);
- meta.put(INTERNAL_META_PARTITION_PATH, partitionPath);
- meta.put(INTERNAL_META_ORDERING_FIELD, orderingVal);
- return meta;
- }
-
- /**
- * Generates metadata of the record. Only fetches record key that is
necessary for merging.
+ * @param avroSchema The avro schema of the row
+ * @param record The engine row
*
- * @param record The record.
- * @param schema The Avro schema of the record.
- * @return A mapping containing the metadata.
+ * @return row in binary format
*/
- public Map<String, Object> generateMetadataForRecord(T record, Schema
schema) {
- Map<String, Object> meta = new HashMap<>();
- meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
- meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
- return meta;
- }
+ public abstract T toBinaryRow(Schema avroSchema, T record);
/**
- * Gets the schema encoded in the metadata map
+ * Gets the schema encoded in the buffered record {@code BufferedRecord}.
*
- * @param infoMap The record metadata
- * @return the avro schema if it is encoded in the metadata map, else null
- */
- public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
- return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
- }
-
- /**
- * Updates the schema and reset the ordering value in existing metadata
mapping of a record.
+ * @param record {@link BufferedRecord} object with engine-specific type
*
- * @param meta Metadata in a mapping.
- * @param schema New schema to set.
- * @return The input metadata mapping.
+ * @return The avro schema if it is encoded in the metadata map, else null
*/
- public Map<String, Object>
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
- Schema
schema) {
- meta.remove(INTERNAL_META_ORDERING_FIELD);
- meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
- return meta;
+ public Schema getSchemaFromBufferRecord(BufferedRecord<T> record) {
+ return decodeAvroSchema(record.getSchemaId());
}
/**
@@ -417,7 +365,7 @@ public abstract class HoodieReaderContext<T> {
/**
* Encodes the given avro schema for efficient serialization.
*/
- private Integer encodeAvroSchema(Schema schema) {
+ public Integer encodeAvroSchema(Schema schema) {
return this.localAvroSchemaCache.cacheSchema(schema);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index f4c53543103..8797b428d0b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -203,7 +203,7 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
String orderingField = ConfigUtils.getOrderingField(props);
if (isNullOrEmpty(orderingField)) {
return DEFAULT_ORDERING_VALUE;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index e4782ff5b2c..a3b9e5e521f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -96,7 +96,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload>
extends HoodieRecor
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
return this.getData().getOrderingValue();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index f482b21e56e..2d5dda75d1a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -58,7 +58,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
return orderingVal;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 97aff9db70a..601b62450b3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -158,6 +158,8 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
*/
protected Option<Map<String, String>> metaData;
+ protected transient Comparable<?> orderingValue;
+
public HoodieRecord(HoodieKey key, T data) {
this(key, data, null, Option.empty());
}
@@ -211,7 +213,28 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
return operation;
}
- public abstract Comparable<?> getOrderingValue(Schema recordSchema,
Properties props);
+ /**
+ * Get ordering value for the record from the cached variable, or extracting
from the record if not cached.
+ *
+ * @param recordSchema Avro schema for the record
+ * @param props Properties containing the necessary configurations
+ * @return The ordering value for the record
+ */
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ if (orderingValue == null) {
+ orderingValue = doGetOrderingValue(recordSchema, props);
+ }
+ return orderingValue;
+ }
+
+ /**
+ * Extracting the ordering value from the record.
+ *
+ * @param recordSchema Avro schema for the record
+ * @param props Properties containing the necessary configurations
+ * @return The ordering value for the record
+ */
+ protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema,
Properties props);
public T getData() {
if (data == null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 91ef3681120..11d32dbc5f1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -22,11 +22,11 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -51,7 +51,7 @@ import static
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
* @param <T> type of engine-specific record representation.
*/
public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
- implements Iterable<Pair<Option<T>, Map<String, Object>>>, Closeable {
+ implements Iterable<BufferedRecord<T>>, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = HoodieTimer.create();
@@ -166,11 +166,11 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
}
@Override
- public Iterator<Pair<Option<T>, Map<String, Object>>> iterator() {
+ public Iterator<BufferedRecord<T>> iterator() {
return recordBuffer.getLogRecordIterator();
}
- public Map<Serializable, Pair<Option<T>, Map<String, Object>>> getRecords() {
+ public Map<Serializable, BufferedRecord<T>> getRecords() {
return recordBuffer.getLogRecords();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
new file mode 100644
index 00000000000..3972fd4257e
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+/**
+ * Buffered Record used by file group reader.
+ *
+ * @param <T> The type of the engine specific row.
+ */
+public class BufferedRecord<T> implements Serializable {
+ private final String recordKey;
+ private final Comparable orderingValue;
+ private T record;
+ private final Integer schemaId;
+ private final boolean isDelete;
+
+ private BufferedRecord(String recordKey, Comparable orderingValue, T record,
Integer schemaId, boolean isDelete) {
+ this.recordKey = recordKey;
+ this.orderingValue = orderingValue;
+ this.record = record;
+ this.schemaId = schemaId;
+ this.isDelete = isDelete;
+ }
+
+ public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T>
record, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
+ HoodieKey hoodieKey = record.getKey();
+ String recordKey = hoodieKey == null ?
readerContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
+ Integer schemaId = readerContext.encodeAvroSchema(schema);
+ boolean isDelete;
+ try {
+ isDelete = record.isDelete(schema, props);
+ } catch (IOException e) {
+ throw new HoodieException("Failed to get isDelete from record.", e);
+ }
+ return new BufferedRecord<>(recordKey, record.getOrderingValue(schema,
props), record.getData(), schemaId, isDelete);
+ }
+
+ public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema
schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName,
boolean isDelete) {
+ String recordKey = readerContext.getRecordKey(record, schema);
+ Integer schemaId = readerContext.encodeAvroSchema(schema);
+ Comparable orderingValue = readerContext.getOrderingValue(record, schema,
orderingFieldName);
+ return new BufferedRecord<>(recordKey, orderingValue, record, schemaId,
isDelete);
+ }
+
+ public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord
deleteRecord, Comparable orderingValue) {
+ return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue,
null, null, true);
+ }
+
+ public String getRecordKey() {
+ return recordKey;
+ }
+
+ public Comparable getOrderingValue() {
+ return orderingValue;
+ }
+
+ public T getRecord() {
+ return record;
+ }
+
+ public Integer getSchemaId() {
+ return schemaId;
+ }
+
+ public boolean isDelete() {
+ return isDelete;
+ }
+
+ public boolean isCommitTimeOrderingDelete() {
+ return isDelete && getOrderingValue().equals(DEFAULT_ORDERING_VALUE);
+ }
+
+ public BufferedRecord<T> toBinary(HoodieReaderContext<T> readerContext) {
+ if (record != null) {
+ record =
readerContext.seal(readerContext.toBinaryRow(readerContext.getSchemaFromBufferRecord(this),
record));
+ }
+ return this;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 3414fc254da..d19ed4a56e3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -60,7 +60,6 @@ import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
@@ -71,8 +70,6 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
@@ -89,12 +86,12 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected final Option<HoodieRecordMerger> recordMerger;
protected final Option<String> payloadClass;
protected final TypedProperties props;
- protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
+ protected final ExternalSpillableMap<Serializable, BufferedRecord<T>>
records;
protected final HoodieReadStats readStats;
protected final boolean shouldCheckCustomDeleteMarker;
protected final boolean shouldCheckBuiltInDeleteMarker;
protected ClosableIterator<T> baseFileIterator;
- protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
+ protected Iterator<BufferedRecord<T>> logRecordIterator;
protected T nextRecord;
protected boolean enablePartialMerging = false;
protected InternalSchema internalSchema;
@@ -207,7 +204,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
}
@Override
- public Map<Serializable, Pair<Option<T>, Map<String, Object>>>
getLogRecords() {
+ public Map<Serializable, BufferedRecord<T>> getLogRecords() {
return records;
}
@@ -221,7 +218,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
}
@Override
- public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator()
{
+ public Iterator<BufferedRecord<T>> getLogRecordIterator() {
return records.values().iterator();
}
@@ -234,28 +231,22 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
* Merge two log data records if needed.
*
* @param newRecord The new incoming record
- * @param metadata The metadata
- * @param existingRecordMetadataPair The existing record metadata pair
- *
- * @return The pair of the record that needs to be updated with and its
metadata,
- * returns empty to skip the update.
+ * @param existingRecord The existing record
+ * @return the {@link BufferedRecord} that needs to be updated, returns
empty to skip the update.
*/
- protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T newRecord,
-
Map<String, Object> metadata,
-
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
+ protected Option<BufferedRecord<T>>
doProcessNextDataRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord)
throws IOException {
totalLogRecords++;
- if (existingRecordMetadataPair != null) {
+ if (existingRecord != null) {
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
-
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
- readerContext.constructHoodieRecord(Option.of(newRecord),
metadata),
- readerContext.getSchemaFromMetadata(metadata),
+ readerContext.constructHoodieRecord(existingRecord),
+ readerContext.getSchemaFromBufferRecord(existingRecord),
+ readerContext.constructHoodieRecord(newRecord),
+ readerContext.getSchemaFromBufferRecord(newRecord),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -265,56 +256,49 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
// If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().orElse(null)) {
- return Option.of(Pair.of(
- Option.ofNullable(combinedRecord.getData()),
-
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())));
+ if (combinedRecord.getData() != existingRecord.getRecord()) {
+ return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), readerContext, props));
}
return Option.empty();
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+ return Option.of(newRecord);
case EVENT_TIME_ORDERING:
- if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata))
{
- return Option.of(Pair.of(Option.of(newRecord), metadata));
+ if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+ return Option.of(newRecord);
}
return Option.empty();
case CUSTOM:
default:
- // Merge and store the combined record
- if (payloadClass.isPresent()) {
- if (existingRecordMetadataPair.getLeft().isEmpty()
- &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata))
{
+ if (existingRecord.isDelete() || newRecord.isDelete()) {
+ if (shouldKeepNewerRecord(existingRecord, newRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
// return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
- return Option.of(Pair.of(Option.of(newRecord), metadata));
+ return Option.of(newRecord);
+ } else {
+ return Option.empty();
}
- Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
- getMergedRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.of(newRecord), metadata);
+ }
+ // Merge and store the combined record
+ if (payloadClass.isPresent()) {
+ Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
getMergedRecord(existingRecord, newRecord);
if (combinedRecordAndSchemaOpt.isPresent()) {
T combinedRecordData =
readerContext.convertAvroRecord((IndexedRecord)
combinedRecordAndSchemaOpt.get().getLeft().getData());
// If pre-combine does not return existing record, update it
- if (combinedRecordData !=
existingRecordMetadataPair.getLeft().orElse(null)) {
- return
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
+ if (combinedRecordData != existingRecord.getRecord()) {
+ Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
+ return
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData,
combinedRecordAndSchema.getRight(), readerContext, orderingFieldName, false));
}
}
return Option.empty();
} else {
- if (existingRecordMetadataPair.getLeft().isEmpty()
- &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata))
{
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
- // return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
- return Option.of(Pair.of(Option.of(newRecord), metadata));
- }
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().merge(
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
-
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
- readerContext.constructHoodieRecord(Option.of(newRecord),
metadata),
- readerContext.getSchemaFromMetadata(metadata),
+ readerContext.constructHoodieRecord(existingRecord),
+ readerContext.getSchemaFromBufferRecord(existingRecord),
+ readerContext.constructHoodieRecord(newRecord),
+ readerContext.getSchemaFromBufferRecord(newRecord),
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -325,8 +309,8 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
HoodieRecord<T> combinedRecord =
combinedRecordAndSchema.getLeft();
// If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().orElse(null)) {
- return
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
+ if (combinedRecord.getData() != existingRecord.getRecord()) {
+ return
Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), readerContext, props));
}
return Option.empty();
}
@@ -337,7 +321,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into records(Map).
- return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+ return Option.of(newRecord);
}
}
@@ -345,26 +329,23 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
* Merge a delete record with another record (data, or delete).
*
* @param deleteRecord The delete record
- * @param existingRecordMetadataPair The existing record metadata pair
+ * @param existingRecord The existing {@link BufferedRecord}
*
* @return The option of new delete record that needs to be updated with.
*/
- protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord,
- Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
+ protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord, BufferedRecord<T> existingRecord) {
totalLogRecords++;
- if (existingRecordMetadataPair != null) {
+ if (existingRecord != null) {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
return Option.of(deleteRecord);
case EVENT_TIME_ORDERING:
case CUSTOM:
default:
- Comparable existingOrderingVal = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
- orderingFieldName);
- if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
+ if (existingRecord.isCommitTimeOrderingDelete()) {
return Option.empty();
}
+ Comparable existingOrderingVal = existingRecord.getOrderingValue();
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
@@ -432,68 +413,60 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
/**
* Merge two records using the configured record merger.
*
- * @param older
- * @param olderInfoMap
- * @param newer
- * @param newerInfoMap
- * @return
+ * @param olderRecord old {@link BufferedRecord}
+ * @param newerRecord newer {@link BufferedRecord}
+ * @return a value pair, left is boolean value `isDelete`, and right is
engine row.
* @throws IOException
*/
- protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
- Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
- if (!older.isPresent()) {
- return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
- }
-
+ protected Pair<Boolean, T> merge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap),
+ readerContext.constructHoodieRecord(olderRecord),
readerContext.getSchemaFromBufferRecord(olderRecord),
+ readerContext.constructHoodieRecord(newerRecord),
readerContext.getSchemaFromBufferRecord(newerRecord),
readerSchema, props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ T data = (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData();
+ return Pair.of(false, data);
}
- return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+ return Pair.of(false, (T) hoodieRecord.getData());
}
- return Option.empty();
+ return Pair.of(true, null);
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
+ return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
case EVENT_TIME_ORDERING:
- Comparable newOrderingValue = readerContext.getOrderingValue(
- newer, newerInfoMap, readerSchema, orderingFieldName);
- if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
- return Option.empty();
+ if (newerRecord.isCommitTimeOrderingDelete()) {
+ return Pair.of(true, newerRecord.getRecord());
}
- Comparable oldOrderingValue = readerContext.getOrderingValue(
- older, olderInfoMap, readerSchema, orderingFieldName);
- if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
+ Comparable newOrderingValue = newerRecord.getOrderingValue();
+ Comparable oldOrderingValue = olderRecord.getOrderingValue();
+ if (!olderRecord.isCommitTimeOrderingDelete()
&& oldOrderingValue.compareTo(newOrderingValue) > 0) {
- return isDeleteRecord(older,
readerContext.getSchemaFromMetadata(olderInfoMap)) ? Option.empty() : older;
+ return Pair.of(olderRecord.isDelete(), olderRecord.getRecord());
}
- return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
+ return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
case CUSTOM:
default:
if (payloadClass.isPresent()) {
- if (older.isEmpty() || newer.isEmpty()) {
- if (shouldKeepNewerRecord(older, olderInfoMap, newer,
newerInfoMap)) {
+ if (olderRecord.isDelete() || newerRecord.isDelete()) {
+ if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
// return Option.empty when the new payload data is empty(a
delete) and ignores its ordering value directly.
- return newer;
+ return Pair.of(newerRecord.isDelete(),
newerRecord.getRecord());
} else {
- return older;
+ return Pair.of(olderRecord.isDelete(),
olderRecord.getRecord());
}
}
-
Option<Pair<HoodieRecord, Schema>> mergedRecord =
- getMergedRecord(older, olderInfoMap, newer, newerInfoMap);
+ getMergedRecord(olderRecord, newerRecord);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
IndexedRecord indexedRecord;
@@ -502,32 +475,32 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
} else {
indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().getData();
}
- return
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+ return Pair.of(false,
readerContext.convertAvroRecord(indexedRecord));
}
- return Option.empty();
+ return Pair.of(true, null);
} else {
- if (older.isEmpty() || newer.isEmpty()) {
- if (shouldKeepNewerRecord(older, olderInfoMap, newer,
newerInfoMap)) {
+ if (olderRecord.isDelete() || newerRecord.isDelete()) {
+ if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
// return Option.empty when the new payload data is empty(a
delete) and ignores its ordering value directly.
- return newer;
+ return Pair.of(newerRecord.isDelete(),
newerRecord.getRecord());
} else {
- return older;
+ return Pair.of(olderRecord.isDelete(),
olderRecord.getRecord());
}
}
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().merge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+ readerContext.constructHoodieRecord(olderRecord),
readerContext.getSchemaFromBufferRecord(olderRecord),
+ readerContext.constructHoodieRecord(newerRecord),
readerContext.getSchemaFromBufferRecord(newerRecord), props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ return Pair.of(false, (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData());
}
- return Option.ofNullable((T)
mergedRecord.get().getLeft().getData());
+ return Pair.of(false, (T) hoodieRecord.getData());
}
-
- return Option.empty();
+ return Pair.of(true, null);
}
}
}
@@ -536,24 +509,22 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
/**
* Decides whether to keep the incoming record with ordering value
comparison.
*/
- private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object>
oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) {
- Comparable newOrderingVal = readerContext.getOrderingValue(newVal,
newMetadata, readerSchema, orderingFieldName);
- if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) {
+ private boolean shouldKeepNewerRecord(BufferedRecord<T> oldRecord,
BufferedRecord<T> newRecord) {
+ if (newRecord.isCommitTimeOrderingDelete()) {
// handle records coming from DELETE statements(the orderingVal is
constant 0)
return true;
}
- Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal,
oldMetadata, readerSchema, orderingFieldName);
- return newOrderingVal.compareTo(oldOrderingVal) >= 0;
+ return
newRecord.getOrderingValue().compareTo(oldRecord.getOrderingValue()) >= 0;
}
- private Option<Pair<HoodieRecord, Schema>> getMergedRecord(Option<T> older,
Map<String, Object> olderInfoMap, Option<T> newer, Map<String, Object>
newerInfoMap) throws IOException {
+ private Option<Pair<HoodieRecord, Schema>> getMergedRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
ValidationUtils.checkArgument(!Objects.equals(payloadClass,
OverwriteWithLatestAvroPayload.class.getCanonicalName())
&& !Objects.equals(payloadClass,
DefaultHoodieRecordPayload.class.getCanonicalName()));
- HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(readerContext,
older, olderInfoMap);
- HoodieRecord newHoodieRecord = constructHoodieAvroRecord(readerContext,
newer, newerInfoMap);
+ HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(readerContext,
olderRecord);
+ HoodieRecord newHoodieRecord = constructHoodieAvroRecord(readerContext,
newerRecord);
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge(
- oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord,
olderInfoMap),
- newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord,
newerInfoMap), props);
+ oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord,
olderRecord),
+ newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord,
newerRecord), props);
return mergedRecord;
}
@@ -561,38 +532,34 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
* Constructs a new {@link HoodieAvroRecord} for payload based merging
*
* @param readerContext reader context
- * @param recordOption An option of the record in engine-specific type if
exists.
- * @param metadataMap The record metadata.
+ * @param bufferedRecord buffered record
* @return A new instance of {@link HoodieRecord}.
*/
- private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T>
readerContext, Option<T> recordOption, Map<String, Object> metadataMap) {
- Schema recordSchema = readerSchema;
+ private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T>
readerContext, BufferedRecord<T> bufferedRecord) {
GenericRecord record = null;
- if (recordOption.isPresent()) {
- recordSchema = readerContext.getSchemaFromMetadata(metadataMap);
- record = readerContext.convertToAvroRecord(recordOption.get(),
recordSchema);
+ if (!bufferedRecord.isDelete()) {
+ Schema recordSchema =
readerContext.getSchemaFromBufferRecord(bufferedRecord);
+ record = readerContext.convertToAvroRecord(bufferedRecord.getRecord(),
recordSchema);
}
- HoodieKey hoodieKey = new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH));
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
return new HoodieAvroRecord<>(hoodieKey,
- HoodieRecordUtils.loadPayload(payloadClass.get(), record,
readerContext.getOrderingValue(recordOption, metadataMap,
- recordSchema, orderingFieldName)), null);
+ HoodieRecordUtils.loadPayload(payloadClass.get(), record,
bufferedRecord.getOrderingValue()), null);
}
- private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, Map<String,
Object> infoMap) throws IOException {
+ private Schema getSchemaForAvroPayloadMerge(HoodieRecord record,
BufferedRecord<T> bufferedRecord) throws IOException {
if (record.isDelete(readerSchema, props)) {
return readerSchema;
}
- return readerContext.getSchemaFromMetadata(infoMap);
+ return readerContext.getSchemaFromBufferRecord(bufferedRecord);
}
- protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>,
Map<String, Object>> logRecordInfo) throws IOException {
- Map<String, Object> metadata =
readerContext.generateMetadataForRecord(baseRecord, readerSchema);
-
+ protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T>
logRecordInfo) throws IOException {
if (logRecordInfo != null) {
- Option<T> resultRecord = merge(Option.of(baseRecord), metadata,
logRecordInfo.getLeft(), logRecordInfo.getRight());
- if (resultRecord.isPresent()) {
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext,
orderingFieldName, false);
+ Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord,
logRecordInfo);
+ if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(resultRecord.get());
+ nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
readStats.incrementNumUpdates();
return true;
} else {
@@ -608,18 +575,15 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
return true;
}
- protected boolean hasNextLogRecord() throws IOException {
+ protected boolean hasNextLogRecord() {
if (logRecordIterator == null) {
logRecordIterator = records.values().iterator();
}
while (logRecordIterator.hasNext()) {
- Pair<Option<T>, Map<String, Object>> nextRecordInfo =
logRecordIterator.next();
- Option<T> resultRecord;
- resultRecord = merge(Option.empty(), Collections.emptyMap(),
- nextRecordInfo.getLeft(), nextRecordInfo.getRight());
- if (resultRecord.isPresent()) {
- nextRecord = readerContext.seal(resultRecord.get());
+ BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
+ if (!nextRecordInfo.isDelete()) {
+ nextRecord = nextRecordInfo.getRecord();
readStats.incrementNumInserts();
return true;
} else {
@@ -655,11 +619,6 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
:
readerContext.convertValueToEngineType(deleteRecord.getOrderingValue());
}
- private boolean isDeleteRecordWithNaturalOrder(Option<T> rowOption,
- Comparable orderingValue) {
- return rowOption.isEmpty() && orderingValue.equals(DEFAULT_ORDERING_VALUE);
- }
-
private boolean isDeleteRecord(Option<T> record, Schema schema) {
if (record.isEmpty()) {
return true;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index a7b9423be2e..cc6930c7a39 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
import java.io.Serializable;
@@ -59,11 +58,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
/**
* Process a next record in a log data block.
*
- * @param record
- * @param metadata
+ * @param record Buffered record
+ * @param index Record key or position
* @throws Exception
*/
- void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable index) throws IOException;
+ void processNextDataRecord(BufferedRecord<T> record, Serializable index)
throws IOException;
/**
* Process a log delete block, and store the resulting records into the
buffer.
@@ -75,10 +74,8 @@ public interface HoodieFileGroupRecordBuffer<T> {
/**
* Process next delete record.
- *
- * @param deleteRecord
*/
- void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable index);
+ void processNextDeletedRecord(DeleteRecord record, Serializable index);
/**
* Check if a record exists in the buffered records.
@@ -98,12 +95,12 @@ public interface HoodieFileGroupRecordBuffer<T> {
/**
* @return An iterator on the log records.
*/
- Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator();
+ Iterator<BufferedRecord<T>> getLogRecordIterator();
/**
* @return The underlying data stored in the buffer.
*/
- Map<Serializable, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+ Map<Serializable, BufferedRecord<T>> getLogRecords();
/**
* Link the base file iterator for consequential merge.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index d19519a3f86..7a0e1aacd22 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -40,9 +39,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
-import java.util.Map;
-
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
@@ -82,31 +78,20 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
T nextRecord = recordIterator.next();
- Map<String, Object> metadata = readerContext.generateMetadataForRecord(
- nextRecord, schema);
- String recordKey = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
-
- if (isBuiltInDeleteRecord(nextRecord) ||
isCustomDeleteRecord(nextRecord)) {
- processDeleteRecord(nextRecord, metadata);
- } else {
- processNextDataRecord(nextRecord, metadata, recordKey);
- }
+ boolean isDelete = isBuiltInDeleteRecord(nextRecord) ||
isCustomDeleteRecord(nextRecord);
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext,
orderingFieldName, isDelete);
+ processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
}
}
}
@Override
- public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordKey) throws IOException {
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
- Option<Pair<Option<T>, Map<String, Object>>> mergedRecordAndMetadata =
- doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
-
- if (mergedRecordAndMetadata.isPresent()) {
- records.put(recordKey, Pair.of(
- mergedRecordAndMetadata.get().getLeft().isPresent()
- ?
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft().get()))
- : Option.empty(),
- mergedRecordAndMetadata.get().getRight()));
+ public void processNextDataRecord(BufferedRecord<T> record, Serializable
recordKey) throws IOException {
+ BufferedRecord<T> existingRecord = records.get(recordKey);
+ Option<BufferedRecord<T>> bufferRecord = doProcessNextDataRecord(record,
existingRecord);
+
+ if (bufferRecord.isPresent()) {
+ records.put(recordKey, bufferRecord.get().toBinary(readerContext));
}
}
@@ -115,36 +100,20 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
while (it.hasNext()) {
DeleteRecord record = it.next();
- String recordKey = record.getRecordKey();
- processNextDeletedRecord(record, recordKey);
+ processNextDeletedRecord(record, record.getRecordKey());
}
}
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordKey) {
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
- Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
+ BufferedRecord<T> existingRecord = records.get(recordKey);
+ Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecord);
if (recordOpt.isPresent()) {
- records.put(recordKey, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
- (String) recordKey, recordOpt.get().getPartitionPath(),
- getOrderingValue(readerContext, recordOpt.get()))));
+ Comparable orderingValue = getOrderingValue(readerContext,
recordOpt.get());
+ records.put(recordKey, BufferedRecord.forDeleteRecord(deleteRecord,
orderingValue));
}
}
- protected void processDeleteRecord(T record, Map<String, Object> metadata) {
- DeleteRecord deleteRecord = DeleteRecord.create(
- new HoodieKey(
- (String) metadata.get(INTERNAL_META_RECORD_KEY),
- // The partition path of the delete record is set to null because
it is not
- // used, and the delete record is never surfaced from the file
group reader
- null),
- readerContext.getOrderingValue(
- Option.of(record), metadata, readerSchema, orderingFieldName));
- processNextDeletedRecord(
- deleteRecord,
- (String) metadata.get(INTERNAL_META_RECORD_KEY));
- }
-
@Override
public boolean containsLogRecord(String recordKey) {
return records.containsKey(recordKey);
@@ -152,7 +121,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(recordKey);
+ BufferedRecord<T> logRecordInfo = records.remove(recordKey);
return hasNextBaseRecord(baseRecord, logRecordInfo);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 6b2f4d2c656..40de6a172cf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -45,16 +44,12 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
-
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into record position based map.
@@ -136,12 +131,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
long recordPosition = recordPositions.get(recordIndex++);
T evolvedNextRecord =
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
- Map<String, Object> metadata =
readerContext.generateMetadataForRecord(evolvedNextRecord, schema);
- if (isBuiltInDeleteRecord(evolvedNextRecord) ||
isCustomDeleteRecord(evolvedNextRecord)) {
- processDeleteRecord(evolvedNextRecord, metadata, recordPosition);
- } else {
- processNextDataRecord(evolvedNextRecord, metadata, recordPosition);
- }
+ boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) ||
isCustomDeleteRecord(evolvedNextRecord);
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext,
orderingFieldName, isDelete);
+ processNextDataRecord(bufferedRecord, recordPosition);
}
}
}
@@ -151,11 +143,11 @@ public class PositionBasedFileGroupRecordBuffer<T>
extends KeyBasedFileGroupReco
//need to make a copy of the keys to avoid concurrent modification
exception
ArrayList<Serializable> positions = new ArrayList<>(records.keySet());
for (Serializable position : positions) {
- Pair<Option<T>, Map<String, Object>> entry = records.get(position);
- Object recordKey = entry.getRight().get(INTERNAL_META_RECORD_KEY);
- if (entry.getLeft().isPresent() || recordKey != null) {
+ BufferedRecord<T> entry = records.get(position);
+ String recordKey = entry.getRecordKey();
+ if (!entry.isDelete() || recordKey != null) {
- records.put((String) recordKey, entry);
+ records.put(recordKey, entry);
records.remove(position);
} else {
//if it's a delete record and the key is null, then we need to still
use positions
@@ -196,9 +188,8 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
// this delete-vector could be kept in the records cache(see the
check in #fallbackToKeyBasedBuffer),
// and these keys would be deleted no matter whether there are
following-up inserts/updates.
DeleteRecord deleteRecord =
deleteRecords[commitTimeBasedRecordIndex++];
- records.put(recordPosition,
- Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
- deleteRecord.getRecordKey(), "",
deleteRecord.getOrderingValue())));
+ BufferedRecord<T> record =
BufferedRecord.forDeleteRecord(deleteRecord, deleteRecord.getOrderingValue());
+ records.put(recordPosition, record);
}
return;
case EVENT_TIME_ORDERING:
@@ -214,34 +205,21 @@ public class PositionBasedFileGroupRecordBuffer<T>
extends KeyBasedFileGroupReco
}
}
- protected void processDeleteRecord(T record, Map<String, Object> metadata,
long recordPosition) {
- DeleteRecord deleteRecord = DeleteRecord.create(
- new HoodieKey(
- // The partition path of the delete record is set to null because
it is not
- // used, and the delete record is never surfaced from the file
group reader
- (String) metadata.get(INTERNAL_META_RECORD_KEY), null),
- readerContext.getOrderingValue(
- Option.of(record), metadata, readerSchema, orderingFieldName));
- processNextDeletedRecord(deleteRecord, recordPosition);
- }
-
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordPosition) {
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
- Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
+ BufferedRecord<T> existingRecord = records.get(recordPosition);
+ Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecord);
if (recordOpt.isPresent()) {
- String recordKey = recordOpt.get().getRecordKey();
- records.put(recordPosition, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
- recordKey, recordOpt.get().getPartitionPath(),
- getOrderingValue(readerContext, recordOpt.get()))));
+ Comparable orderingValue = getOrderingValue(readerContext,
recordOpt.get());
+ records.put(recordPosition,
BufferedRecord.forDeleteRecord(recordOpt.get(), orderingValue));
}
}
@Override
public boolean containsLogRecord(String recordKey) {
return records.values().stream()
- .filter(r -> r.getLeft().isPresent())
- .map(r -> readerContext.getRecordKey(r.getKey().get(),
readerSchema)).anyMatch(recordKey::equals);
+ .filter(r -> !r.isDelete())
+ .map(r -> readerContext.getRecordKey(r.getRecord(),
readerSchema)).anyMatch(recordKey::equals);
}
@Override
@@ -252,27 +230,26 @@ public class PositionBasedFileGroupRecordBuffer<T>
extends KeyBasedFileGroupReco
nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
-
- Map<String, Object> metadata = readerContext.generateMetadataForRecord(
- baseRecord, readerSchema);
+ BufferedRecord<T> logRecordInfo = records.remove(nextRecordPosition++);
- final Option<T> resultRecord;
+ final Pair<Boolean, T> isDeleteAndRecord;
+ T resultRecord = null;
if (logRecordInfo != null) {
- resultRecord = merge(
- Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight());
- if (resultRecord.isPresent()) {
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext,
orderingFieldName, false);
+ isDeleteAndRecord = merge(bufferedRecord, logRecordInfo);
+ if (!isDeleteAndRecord.getLeft()) {
+ resultRecord = isDeleteAndRecord.getRight();
readStats.incrementNumUpdates();
} else {
readStats.incrementNumDeletes();
}
} else {
- resultRecord = merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), metadata);
+ resultRecord = baseRecord;
readStats.incrementNumInserts();
}
- if (resultRecord.isPresent()) {
- nextRecord = readerContext.seal(resultRecord.get());
+ if (resultRecord != null) {
+ nextRecord = readerContext.seal(resultRecord);
return true;
}
return false;
@@ -283,7 +260,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
//see if there is a delete block with record positions
nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
+ BufferedRecord<T> logRecordInfo = records.remove(nextRecordPosition++);
if (logRecordInfo != null) {
//we have a delete that was not to be able to be converted. Since it
is the newest version, the record is deleted
//remove a key based record if it exists
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index 7874b994c2a..91e5e06cbfd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -39,7 +39,6 @@ import org.apache.avro.Schema;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
-import java.util.Map;
public class UnmergedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T>
{
// Used to order the records in the record map.
@@ -69,14 +68,14 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
// Output records based on the index to preserve the order.
if (!records.isEmpty()) {
- Pair<Option<T>, Map<String, Object>> nextRecordInfo =
records.remove(getIndex++);
+ BufferedRecord<T> nextRecordInfo = records.remove(getIndex++);
if (nextRecordInfo == null) {
throw new HoodieException("Row index should be continuous!");
}
- if (nextRecordInfo.getLeft().isPresent()) {
- nextRecord = nextRecordInfo.getKey().get();
+ if (!nextRecordInfo.isDelete()) {
+ nextRecord = nextRecordInfo.getRecord();
} else {
throw new IllegalStateException("No deletes should exist in unmerged
reading mode");
}
@@ -87,7 +86,7 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
}
@Override
- public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator()
{
+ public Iterator<BufferedRecord<T>> getLogRecordIterator() {
return records.values().iterator();
}
@@ -109,16 +108,15 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
T nextRecord = recordIterator.next();
- Map<String, Object> metadata = readerContext.generateMetadataForRecord(
- nextRecord, schema);
- processNextDataRecord(nextRecord, metadata, putIndex++);
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext,
orderingFieldName, false);
+ processNextDataRecord(bufferedRecord, putIndex++);
}
}
}
@Override
- public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable index) {
- records.put(index, Pair.of(Option.ofNullable(readerContext.seal(record)),
metadata));
+ public void processNextDataRecord(BufferedRecord<T> record, Serializable
index) {
+ records.put(index, record.toBinary(readerContext));
}
@Override
@@ -129,9 +127,7 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
index) {
// never used for now
- records.put(index, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
- deleteRecord.getRecordKey(), deleteRecord.getPartitionPath(),
- getOrderingValue(readerContext, deleteRecord))));
+ records.put(index, BufferedRecord.forDeleteRecord(deleteRecord,
getOrderingValue(readerContext, deleteRecord)));
}
@Override
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index db3846b459b..daac251d187 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -44,21 +44,20 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
import static
org.apache.hudi.common.table.read.FileGroupRecordBuffer.getOrderingValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -315,7 +314,7 @@ class TestFileGroupRecordBuffer {
}
@Test
- void testProcessCustomDeleteRecord() {
+ void testProcessCustomDeleteRecord() throws IOException {
String customDeleteKey = "op";
String customDeleteValue = "d";
when(schemaHandler.getCustomDeleteMarkerKeyValue())
@@ -337,17 +336,16 @@ class TestFileGroupRecordBuffer {
record.put("ts", System.currentTimeMillis());
record.put("op", "d");
record.put("_hoodie_is_deleted", false);
+ when(readerContext.getOrderingValue(any(), any(), any())).thenReturn(1);
+ when(readerContext.convertValueToEngineType(any())).thenReturn(1);
+ BufferedRecord<GenericRecord> bufferedRecord =
BufferedRecord.forRecordWithContext(record, schema, readerContext,
Option.of("ts"), true);
- Map<String, Object> metadata = new HashMap<>();
- metadata.put(INTERNAL_META_RECORD_KEY, "12345");
- metadata.put(INTERNAL_META_PARTITION_PATH, "partition1");
- when(readerContext.getOrderingValue(any(), any(), any(),
any())).thenReturn(1);
- when(readerContext.generateMetadataForRecord(any(), any(),
any())).thenReturn(metadata);
- keyBasedBuffer.processDeleteRecord(record, metadata);
- Map<Serializable, Pair<Option<GenericRecord>, Map<String, Object>>>
records =
- keyBasedBuffer.getLogRecords();
+ keyBasedBuffer.processNextDataRecord(bufferedRecord, "12345");
+ Map<Serializable, BufferedRecord<GenericRecord>> records =
keyBasedBuffer.getLogRecords();
assertEquals(1, records.size());
- assertEquals(Pair.of(Option.empty(), metadata), records.get("12345"));
+ BufferedRecord<GenericRecord> deleteRecord = records.get("12345");
+ assertNull(deleteRecord.getRecordKey(), "The record key metadata field is
missing");
+ assertEquals(1, deleteRecord.getOrderingValue());
// CASE 2: With _hoodie_is_deleted is true.
GenericRecord anotherRecord = new GenericData.Record(schema);
@@ -355,14 +353,13 @@ class TestFileGroupRecordBuffer {
anotherRecord.put("ts", System.currentTimeMillis());
anotherRecord.put("op", "i");
anotherRecord.put("_hoodie_is_deleted", true);
+ bufferedRecord = BufferedRecord.forRecordWithContext(anotherRecord,
schema, readerContext, Option.of("ts"), true);
- Map<String, Object> anotherMetadata = new HashMap<>();
- anotherMetadata.put(INTERNAL_META_RECORD_KEY, "54321");
- anotherMetadata.put(INTERNAL_META_PARTITION_PATH, "partition2");
- when(readerContext.generateMetadataForRecord(any(), any(),
any())).thenReturn(anotherMetadata);
- keyBasedBuffer.processDeleteRecord(anotherRecord, anotherMetadata);
+ keyBasedBuffer.processNextDataRecord(bufferedRecord, "54321");
records = keyBasedBuffer.getLogRecords();
assertEquals(2, records.size());
- assertEquals(Pair.of(Option.empty(), anotherMetadata),
records.get("54321"));
+ deleteRecord = records.get("54321");
+ assertNull(deleteRecord.getRecordKey(), "The record key metadata field is
missing");
+ assertEquals(1, deleteRecord.getOrderingValue());
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 10f401b20b9..c5a3ba02f69 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -70,9 +70,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -193,27 +190,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<T> records = readRecordsFromFileGroup(getStorageConf(),
getBasePath(), metaClient, fileSlices,
avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
HoodieReaderContext<T> readerContext =
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
- Comparable orderingFieldValue = "100";
for (Boolean isCompressionEnabled : new boolean[] {true, false}) {
- try (ExternalSpillableMap<Serializable, Pair<Option<T>, Map<String,
Object>>> spillableMap =
+ try (ExternalSpillableMap<Serializable, BufferedRecord<T>>
spillableMap =
new ExternalSpillableMap<>(16L, baseMapPath, new
DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(avroSchema), diskMapType,
new DefaultSerializer<>(), isCompressionEnabled, getClass().getSimpleName())) {
Long position = 0L;
for (T record : records) {
String recordKey = readerContext.getRecordKey(record, avroSchema);
//test key based
- spillableMap.put(recordKey,
- Pair.of(
- Option.ofNullable(readerContext.seal(record)),
- readerContext.generateMetadataForRecord(record,
avroSchema)));
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(record, avroSchema, readerContext,
Option.of("timestamp"), false);
+ spillableMap.put(recordKey,
bufferedRecord.toBinary(readerContext));
//test position based
- spillableMap.put(position++,
- Pair.of(
- Option.ofNullable(readerContext.seal(record)),
- readerContext.generateMetadataForRecord(
- recordKey, dataGen.getPartitionPaths()[0],
-
readerContext.convertValueToEngineType(orderingFieldValue))));
+ spillableMap.put(position++,
bufferedRecord.toBinary(readerContext));
}
assertEquals(records.size() * 2, spillableMap.size());
@@ -221,20 +210,18 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
position = 0L;
for (T record : records) {
String recordKey = readerContext.getRecordKey(record, avroSchema);
- Pair<Option<T>, Map<String, Object>> keyBased =
spillableMap.get(recordKey);
+ BufferedRecord<T> keyBased = spillableMap.get(recordKey);
assertNotNull(keyBased);
- Pair<Option<T>, Map<String, Object>> positionBased =
spillableMap.get(position++);
+ BufferedRecord<T> positionBased = spillableMap.get(position++);
assertNotNull(positionBased);
- assertRecordsEqual(avroSchema, record, keyBased.getLeft().get());
- assertRecordsEqual(avroSchema, record,
positionBased.getLeft().get());
- assertEquals(keyBased.getRight().get(INTERNAL_META_RECORD_KEY),
recordKey);
-
assertEquals(positionBased.getRight().get(INTERNAL_META_RECORD_KEY), recordKey);
- assertEquals(avroSchema,
readerContext.getSchemaFromMetadata(keyBased.getRight()));
- assertEquals(dataGen.getPartitionPaths()[0],
positionBased.getRight().get(INTERNAL_META_PARTITION_PATH));
-
assertEquals(readerContext.convertValueToEngineType(orderingFieldValue),
- positionBased.getRight().get(INTERNAL_META_ORDERING_FIELD));
+ assertRecordsEqual(avroSchema, record, keyBased.getRecord());
+ assertRecordsEqual(avroSchema, record, positionBased.getRecord());
+ assertEquals(keyBased.getRecordKey(), recordKey);
+ assertEquals(positionBased.getRecordKey(), recordKey);
+ assertEquals(avroSchema,
readerContext.getSchemaFromBufferRecord(keyBased));
+ // generate field value is hardcoded as 0 for ordering field:
timestamp, see HoodieTestDataGenerator#generateRandomValue
+ assertEquals(readerContext.convertValueToEngineType(0L),
positionBased.getOrderingValue());
}
-
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index ad4f9aaba4b..ac89ffd0184 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -31,12 +31,12 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.storage.HoodieStorage;
@@ -51,6 +51,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
@@ -75,7 +76,6 @@ import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
private final List<Predicate> predicates;
private final Supplier<InternalSchemaManager> internalSchemaManager;
- private RowDataSerializer rowDataSerializer;
private final boolean utcTimezone;
public FlinkRowDataReaderContext(
@@ -148,46 +148,45 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
}
@Override
- public HoodieRecord<RowData> constructHoodieRecord(Option<RowData>
recordOption, Map<String, Object> metadataMap) {
- HoodieKey hoodieKey = new HoodieKey(
- (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
- (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
- RowData rowData = recordOption.get();
+ public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData>
bufferedRecord) {
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
// delete record
- if (recordOption.isEmpty()) {
- Comparable orderingValue;
- if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
- orderingValue = (Comparable)
metadataMap.get(INTERNAL_META_ORDERING_FIELD);
- } else {
- throw new HoodieException("There should be ordering value in
metadataMap.");
- }
- return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE,
orderingValue, HoodieRecord.HoodieRecordType.FLINK);
+ if (bufferedRecord.isDelete()) {
+ return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE,
bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.FLINK);
}
- return new HoodieFlinkRecord(hoodieKey, rowData);
+ RowData rowData = bufferedRecord.getRecord();
+ HoodieOperation operation =
HoodieOperation.fromValue(rowData.getRowKind().toByteValue());
+ return new HoodieFlinkRecord(hoodieKey, operation,
bufferedRecord.getOrderingValue(), rowData);
}
@Override
- public Comparable getOrderingValue(Option<RowData> recordOption, Map<String,
Object> metadataMap, Schema schema, Option<String> orderingFieldName) {
- if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
- return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
- }
- if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
+ public Comparable getOrderingValue(
+ RowData record,
+ Schema schema,
+ Option<String> orderingFieldName) {
+ if (orderingFieldName.isEmpty()) {
return DEFAULT_ORDERING_VALUE;
}
RowDataAvroQueryContexts.FieldQueryContext context =
RowDataAvroQueryContexts.fromAvroSchema(schema,
utcTimezone).getFieldQueryContext(orderingFieldName.get());
- Comparable finalOrderingVal = (Comparable)
context.getValAsJava(recordOption.get(), false);
- metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
+ Comparable finalOrderingVal = (Comparable) context.getValAsJava(record,
false);
return finalOrderingVal;
}
@Override
public RowData seal(RowData rowData) {
- if (rowDataSerializer == null) {
- RowType requiredRowType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(getSchemaHandler().getRequiredSchema()).getRowType().getLogicalType();
- rowDataSerializer = new RowDataSerializer(requiredRowType);
+ if (rowData instanceof BinaryRowData) {
+ return ((BinaryRowData) rowData).copy();
+ }
+ return rowData;
+ }
+
+ @Override
+ public RowData toBinaryRow(Schema avroSchema, RowData record) {
+ if (record instanceof BinaryRowData) {
+ return record;
}
- // copy is unnecessary if there is no caching in subsequent processing.
- return rowDataSerializer.copy(rowData);
+ RowDataSerializer rowDataSerializer =
RowDataAvroQueryContexts.getRowDataSerializer(avroSchema);
+ return rowDataSerializer.toBinaryRow(record);
}
@Override
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index b37c7d910dd..8abf5134ad6 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -222,13 +223,15 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
}
@Override
- public HoodieRecord<ArrayWritable>
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object>
metadataMap) {
- if (!recordOption.isPresent()) {
- return new HoodieEmptyRecord<>(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)),
HoodieRecord.HoodieRecordType.HIVE);
+ public HoodieRecord<ArrayWritable>
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+ if (bufferedRecord.isDelete()) {
+ return new HoodieEmptyRecord<>(
+ new HoodieKey(bufferedRecord.getRecordKey(), null),
+ HoodieRecord.HoodieRecordType.HIVE);
}
- Schema schema = getSchemaFromMetadata(metadataMap);
- ArrayWritable writable = recordOption.get();
- return new HoodieHiveRecord(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema,
objectInspectorCache);
+ Schema schema = getSchemaFromBufferRecord(bufferedRecord);
+ ArrayWritable writable = bufferedRecord.getRecord();
+ return new HoodieHiveRecord(new HoodieKey(bufferedRecord.getRecordKey(),
null), writable, schema, objectInspectorCache);
}
@Override
@@ -236,6 +239,11 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(),
record.get().length));
}
+ @Override
+ public ArrayWritable toBinaryRow(Schema schema, ArrayWritable record) {
+ return record;
+ }
+
@Override
public ClosableIterator<ArrayWritable>
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
Schema
skeletonRequiredSchema,
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index 34e23edc146..c4ef58d689c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -102,7 +102,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
}
@Override
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
String orderingField = ConfigUtils.getOrderingField(props);
if (isNullOrEmpty(orderingField)) {
return DEFAULT_ORDERING_VALUE;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 9b987937276..e0a59b563c8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -62,8 +62,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
@@ -204,9 +202,9 @@ public class TestPositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupR
if (sameBaseInstantTime) {
// If the log block's base instant time of record positions match the
base file
// to merge, the log records are stored based on the position
-
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY),
+ assertNotNull(buffer.getLogRecords().get(0L).getRecordKey(),
"the record key is set up for fallback handling");
-
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_ORDERING_FIELD),
+ assertNotNull(buffer.getLogRecords().get(0L).getOrderingValue(),
"the ordering value is set up for fallback handling");
} else {
// If the log block's base instant time of record positions does not
match the
@@ -222,7 +220,7 @@ public class TestPositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupR
HoodieDeleteBlock deleteBlock =
getDeleteBlockWithPositions(baseFileInstantTime);
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
-
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+ assertNotNull(buffer.getLogRecords().get(0L).getRecordKey());
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 59f7375565c..b65aa9b70f2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -137,16 +137,6 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
+ "{\"name\": \"col2\", \"type\": \"long\" },"
+ "{ \"name\": \"col3\", \"type\": [\"null\", \"string\"],
\"default\": null}]}")
val row = InternalRow("item", 1000L, "blue")
- val metadataMap = Map(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD ->
100L)
- assertEquals(100L, sparkReaderContext.getOrderingValue(
- HOption.empty(), metadataMap.asJava.asInstanceOf[java.util.Map[String,
Object]],
- avroSchema, HOption.of(orderingFieldName)))
- assertEquals(DEFAULT_ORDERING_VALUE, sparkReaderContext.getOrderingValue(
- HOption.empty(), Map().asJava.asInstanceOf[java.util.Map[String,
Object]],
- avroSchema, HOption.of(orderingFieldName)))
- assertEquals(DEFAULT_ORDERING_VALUE, sparkReaderContext.getOrderingValue(
- HOption.of(row), Map().asJava.asInstanceOf[java.util.Map[String,
Object]],
- avroSchema, HOption.empty()))
testGetOrderingValue(sparkReaderContext, row, avroSchema,
orderingFieldName, 1000L)
testGetOrderingValue(
sparkReaderContext, row, avroSchema, "col3",
UTF8String.fromString("blue"))
@@ -264,11 +254,8 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
avroSchema: Schema,
orderingColumn: String,
expectedOrderingValue: Comparable[_]): Unit
= {
- val metadataMap = new util.HashMap[String, Object]()
assertEquals(expectedOrderingValue, sparkReaderContext.getOrderingValue(
- HOption.of(row), metadataMap, avroSchema, HOption.of(orderingColumn)))
- assertEquals(expectedOrderingValue,
- metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
+ row, avroSchema, HOption.of(orderingColumn)))
}
}