This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2d99668881f [HUDI-9499] Add customized SizeEstimator and Serializer
for Avro buffered record in FileGroup reader (#13408)
2d99668881f is described below
commit 2d99668881f3f83c224fc787367711ad73e34c27
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 12 09:10:26 2025 +0800
[HUDI-9499] Add customized SizeEstimator and Serializer for Avro buffered
record in FileGroup reader (#13408)
---
.../org/apache/hudi/avro/AvroRecordSerializer.java | 53 +++++++++++++
.../apache/hudi/avro/AvroRecordSizeEstimator.java | 50 ++++++++++++
.../apache/hudi/avro/HoodieAvroReaderContext.java | 13 ++++
.../hudi/common/engine/HoodieReaderContext.java | 14 +++-
.../common/serialization/RecordSerializer.java | 46 +++++++++++
.../hudi/common/table/read/BufferedRecord.java | 21 ++++-
.../table/read/BufferedRecordSerializer.java | 82 ++++++++++++++++++++
.../common/table/read/FileGroupRecordBuffer.java | 4 +-
.../hudi/common/util/SerializationUtils.java | 2 +-
.../hudi/avro/TestAvroRecordSizeEstimator.java | 58 ++++++++++++++
.../TestBufferedRecordSerializer.java | 90 ++++++++++++++++++++++
.../table/read/TestFileGroupRecordBuffer.java | 4 +
12 files changed, 431 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java
new file mode 100644
index 00000000000..a7e8f20bba0
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.serialization.RecordSerializer;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * An implementation of {@link RecordSerializer} for Avro {@link
IndexedRecord}.
+ */
+public class AvroRecordSerializer implements RecordSerializer<IndexedRecord> {
+ private final Function<Integer, Schema> schemaFunc;
+
+ public AvroRecordSerializer(Function<Integer, Schema> schemaFunc) {
+ this.schemaFunc = schemaFunc;
+ }
+
+ @Override
+ public byte[] serialize(IndexedRecord input) {
+ return HoodieAvroUtils.avroToBytes(input);
+ }
+
+ @Override
+ public IndexedRecord deserialize(byte[] bytes, int schemaId) {
+ try {
+ return HoodieAvroUtils.bytesToAvro(bytes, schemaFunc.apply(schemaId));
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize Avro record bytes.",e);
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java
new file mode 100644
index 00000000000..d2e7920fc1a
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.SizeEstimator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+/**
+ * An implementation of {@link SizeEstimator} for Avro {@link BufferedRecord},
which estimates the size of
+ * Avro record excluding the internal {@link Schema}.
+ */
+public class AvroRecordSizeEstimator implements
SizeEstimator<BufferedRecord<IndexedRecord>> {
+ private final long sizeOfSchema;
+
+ public AvroRecordSizeEstimator(Schema recordSchema) {
+ sizeOfSchema = ObjectSizeCalculator.getObjectSize(recordSchema);
+ }
+
+ @Override
+ public long sizeEstimate(BufferedRecord<IndexedRecord> record) {
+ long sizeOfRecord = ObjectSizeCalculator.getObjectSize(record);
+ // generated record do not contain Schema field, so do not need minus size
of Schema.
+ if (record.getRecord() instanceof SpecificRecord) {
+ return sizeOfRecord;
+ }
+ // do not contain size of Avro schema as the schema is reused among records
+ return sizeOfRecord - sizeOfSchema + 8;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d15f78e18d4..64678cc07aa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -29,11 +29,14 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
+import org.apache.hudi.common.serialization.CustomSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordSerializer;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -171,6 +174,16 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return record;
}
+ @Override
+ public SizeEstimator<BufferedRecord<IndexedRecord>> getRecordSizeEstimator()
{
+ return new AvroRecordSizeEstimator(getSchemaHandler().getRequiredSchema());
+ }
+
+ @Override
+ public CustomSerializer<BufferedRecord<IndexedRecord>> getRecordSerializer()
{
+ return new BufferedRecordSerializer<>(new
AvroRecordSerializer(this::decodeAvroSchema));
+ }
+
@Override
public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
Schema
skeletonRequiredSchema,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 86ba617b436..e226f26c339 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,12 +23,16 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableFilterIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -183,6 +187,14 @@ public abstract class HoodieReaderContext<T> {
return keyFilterOpt;
}
+ public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() {
+ return new HoodieRecordSizeEstimator<>(schemaHandler.getRequiredSchema());
+ }
+
+ public CustomSerializer<BufferedRecord<T>> getRecordSerializer() {
+ return new DefaultSerializer<>();
+ }
+
/**
* Gets the record iterator based on the type of engine-specific record
representation from the
* file.
@@ -476,7 +488,7 @@ public abstract class HoodieReaderContext<T> {
* Decodes the avro schema with given version ID.
*/
@Nullable
- private Schema decodeAvroSchema(Object versionId) {
+ protected Schema decodeAvroSchema(Object versionId) {
return this.localAvroSchemaCache.getSchema((Integer)
versionId).orElse(null);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
b/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
new file mode 100644
index 00000000000..68ef36628fc
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.serialization;
+
+import java.io.Serializable;
+
+/**
+ * Serializer for engine-specific record, e.g., {@code InternalRow} in Spark,
+ * {@code RowData} in Flink and {@code IndexedRecord} for Avro.
+ */
+public interface RecordSerializer<T> extends Serializable {
+ /**
+ * Serialize the given record into bytes.
+ *
+ * @param record Engine-specific record.
+ *
+ * @return Serialized bytes for the record.
+ */
+ byte[] serialize(T record);
+
+ /**
+ * Deserialize the given bytes into engine-specific record with given schema.
+ *
+ * @param bytes Bytes for record.
+ * @param schemaId Encoded id for the record schema.
+ *
+ * @return Engine-specific record.
+ */
+ T deserialize(byte[] bytes, int schemaId);
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index 3972fd4257e..21515392624 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -29,6 +29,7 @@ import org.apache.avro.Schema;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Objects;
import java.util.Properties;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -45,7 +46,7 @@ public class BufferedRecord<T> implements Serializable {
private final Integer schemaId;
private final boolean isDelete;
- private BufferedRecord(String recordKey, Comparable orderingValue, T record,
Integer schemaId, boolean isDelete) {
+ public BufferedRecord(String recordKey, Comparable orderingValue, T record,
Integer schemaId, boolean isDelete) {
this.recordKey = recordKey;
this.orderingValue = orderingValue;
this.record = record;
@@ -107,4 +108,22 @@ public class BufferedRecord<T> implements Serializable {
}
return this;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BufferedRecord<?> that = (BufferedRecord<?>) o;
+ return isDelete == that.isDelete && Objects.equals(recordKey,
that.recordKey) && Objects.equals(orderingValue, that.orderingValue)
+ && Objects.equals(record, that.record) && Objects.equals(schemaId,
that.schemaId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(recordKey, orderingValue, record, schemaId, isDelete);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
new file mode 100644
index 00000000000..3d522e5a683
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.serialization.RecordSerializer;
+import org.apache.hudi.common.util.SerializationUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of {@link CustomSerializer} for {@link BufferedRecord}.
+ *
+ */
+public class BufferedRecordSerializer<T> implements
CustomSerializer<BufferedRecord<T>> {
+ public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+ private final Kryo kryo;
+ // Caching ByteArrayOutputStream to avoid recreating it for every operation
+ private final ByteArrayOutputStream baos;
+ private final RecordSerializer<T> recordSerializer;
+
+ public BufferedRecordSerializer(RecordSerializer<T> recordSerializer) {
+ SerializationUtils.KryoInstantiator kryoInstantiator = new
SerializationUtils.KryoInstantiator();
+ this.kryo = kryoInstantiator.newKryo();
+ this.baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+ this.kryo.setRegistrationRequired(false);
+ this.recordSerializer = recordSerializer;
+ }
+
+ @Override
+ public byte[] serialize(BufferedRecord<T> record) throws IOException {
+ kryo.reset();
+ baos.reset();
+ try (Output output = new Output(baos)) {
+ output.writeString(record.getRecordKey());
+ output.writeInt(record.getSchemaId());
+ output.writeBoolean(record.isDelete());
+ kryo.writeClassAndObject(output, record.getOrderingValue());
+
+ byte[] avroBytes = recordSerializer.serialize(record.getRecord());
+ output.writeInt(avroBytes.length);
+ output.writeBytes(avroBytes);
+ }
+ return baos.toByteArray();
+ }
+
+ @Override
+ public BufferedRecord<T> deserialize(byte[] bytes) {
+ try (Input input = new Input(bytes)) {
+ String recordKey = input.readString();
+ int schemaId = input.readInt();
+ boolean isDelete = input.readBoolean();
+ Comparable orderingValue = (Comparable) kryo.readClassAndObject(input);
+
+ int recordLength = input.readInt();
+ byte[] recordBytes = input.readBytes(recordLength);
+ T record = recordSerializer.deserialize(recordBytes, schemaId);
+ return new BufferedRecord<>(recordKey, orderingValue, record, schemaId,
isDelete);
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 020e19cf0fa..b6b8df51681 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -31,13 +31,11 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
@@ -126,7 +124,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
- new HoodieRecordSizeEstimator<>(readerSchema), diskMapType, new
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
+ readerContext.getRecordSizeEstimator(), diskMapType,
readerContext.getRecordSerializer(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
} catch (IOException e) {
throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 1046ce646a0..05f31524a9d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -114,7 +114,7 @@ public class SerializationUtils {
* This class has a no-arg constructor, suitable for use with reflection
instantiation. For Details checkout
* com.twitter.chill.KryoBase.
*/
- private static class KryoInstantiator implements Serializable {
+ public static class KryoInstantiator implements Serializable {
public Kryo newKryo() {
Kryo kryo = new Kryo();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
new file mode 100644
index 00000000000..20e1849bdff
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.SizeEstimator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TestAvroRecordSizeEstimator {
+ @Test
+ void testEstimatingRecord() throws IOException {
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ // testing Avro builtin IndexedRecord
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", "lily");
+ record.put("favorite_number", 100);
+ record.put("favorite_color", "red");
+ SizeEstimator<BufferedRecord<IndexedRecord>> estimator = new
AvroRecordSizeEstimator(schema);
+ BufferedRecord<IndexedRecord> bufferedRecord = new BufferedRecord<>("id",
100, record, 1, false);
+ long size = estimator.sizeEstimate(bufferedRecord);
+ // size can be various for different OS / JVM version
+ Assertions.assertTrue(size < 400 && size > 0);
+
+ // testing generated IndexedRecord
+ HoodieMetadataRecord metadataRecord = new
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null,
null, null);
+ bufferedRecord = new BufferedRecord<>("__all_partitions__", 0,
metadataRecord, 1, false);
+ size = estimator.sizeEstimate(bufferedRecord);
+ // size can be various for different OS / JVM version
+ Assertions.assertTrue(size < 400 && size > 0);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
b/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
new file mode 100644
index 00000000000..1a96c4bf5ff
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.serialization;
+
+import org.apache.hudi.avro.AvroRecordSerializer;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordSerializer;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TestBufferedRecordSerializer {
+ @Test
+ void testAvroRecordSerAndDe() throws IOException {
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", "lily");
+ record.put("favorite_number", 100);
+ record.put("favorite_color", "red");
+ AvroRecordSerializer avroRecordSerializer = new
AvroRecordSerializer(integer -> schema);
+ byte[] avroBytes = avroRecordSerializer.serialize(record);
+ IndexedRecord result = avroRecordSerializer.deserialize(avroBytes, 1);
+ Assertions.assertEquals(record, result);
+
+ avroRecordSerializer = new AvroRecordSerializer(integer ->
HoodieMetadataRecord.SCHEMA$);
+ HoodieMetadataRecord metadataRecord = new
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null,
null, null);
+ avroBytes = avroRecordSerializer.serialize(metadataRecord);
+ result = avroRecordSerializer.deserialize(avroBytes, 1);
+ for (int i = 0; i < metadataRecord.getSchema().getFields().size(); i++) {
+ Assertions.assertEquals(metadataRecord.get(i), result.get(i));
+ }
+ }
+
+ @Test
+ void testBufferedRecordSerAndDe() throws IOException {
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", "lily");
+ record.put("favorite_number", 100);
+ record.put("favorite_color", "red");
+ BufferedRecord<IndexedRecord> bufferedRecord = new BufferedRecord<>("id",
100, record, 1, false);
+
+ AvroRecordSerializer avroRecordSerializer = new
AvroRecordSerializer(integer -> schema);
+ BufferedRecordSerializer<IndexedRecord> bufferedRecordSerializer = new
BufferedRecordSerializer<>(avroRecordSerializer);
+
+ byte[] bytes = bufferedRecordSerializer.serialize(bufferedRecord);
+ BufferedRecord<IndexedRecord> result =
bufferedRecordSerializer.deserialize(bytes);
+ Assertions.assertEquals(bufferedRecord, result);
+
+ avroRecordSerializer = new AvroRecordSerializer(integer ->
HoodieMetadataRecord.SCHEMA$);
+ bufferedRecordSerializer = new
BufferedRecordSerializer<>(avroRecordSerializer);
+ HoodieMetadataRecord metadataRecord = new
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null,
null, null);
+ bufferedRecord = new BufferedRecord<>("__all_partitions__", 0,
metadataRecord, 1, false);
+ bytes = bufferedRecordSerializer.serialize(bufferedRecord);
+ result = bufferedRecordSerializer.deserialize(bytes);
+
+ Assertions.assertEquals(bufferedRecord.getRecordKey(),
result.getRecordKey());
+ Assertions.assertEquals(bufferedRecord.getOrderingValue(),
result.getOrderingValue());
+ Assertions.assertEquals(bufferedRecord.getSchemaId(),
result.getSchemaId());
+ Assertions.assertEquals(bufferedRecord.isDelete(), result.isDelete());
+ for (int i = 0; i < metadataRecord.getSchema().getFields().size(); i++) {
+ Assertions.assertEquals(metadataRecord.get(i),
result.getRecord().get(i));
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 71f0b359999..57044c991c5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -28,9 +28,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -92,6 +94,8 @@ class TestFileGroupRecordBuffer {
when(readerContext.getSchemaHandler()).thenReturn(schemaHandler);
when(schemaHandler.getRequiredSchema()).thenReturn(schema);
when(readerContext.getRecordMerger()).thenReturn(Option.empty());
+ when(readerContext.getRecordSerializer()).thenReturn(new
DefaultSerializer<>());
+ when(readerContext.getRecordSizeEstimator()).thenReturn(new
DefaultSizeEstimator<>());
}
@Test