This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d99668881f [HUDI-9499] Add customized SizeEstimator and Serializer 
for Avro buffered record in FileGroup reader (#13408)
2d99668881f is described below

commit 2d99668881f3f83c224fc787367711ad73e34c27
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 12 09:10:26 2025 +0800

    [HUDI-9499] Add customized SizeEstimator and Serializer for Avro buffered 
record in FileGroup reader (#13408)
---
 .../org/apache/hudi/avro/AvroRecordSerializer.java | 53 +++++++++++++
 .../apache/hudi/avro/AvroRecordSizeEstimator.java  | 50 ++++++++++++
 .../apache/hudi/avro/HoodieAvroReaderContext.java  | 13 ++++
 .../hudi/common/engine/HoodieReaderContext.java    | 14 +++-
 .../common/serialization/RecordSerializer.java     | 46 +++++++++++
 .../hudi/common/table/read/BufferedRecord.java     | 21 ++++-
 .../table/read/BufferedRecordSerializer.java       | 82 ++++++++++++++++++++
 .../common/table/read/FileGroupRecordBuffer.java   |  4 +-
 .../hudi/common/util/SerializationUtils.java       |  2 +-
 .../hudi/avro/TestAvroRecordSizeEstimator.java     | 58 ++++++++++++++
 .../TestBufferedRecordSerializer.java              | 90 ++++++++++++++++++++++
 .../table/read/TestFileGroupRecordBuffer.java      |  4 +
 12 files changed, 431 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java
new file mode 100644
index 00000000000..a7e8f20bba0
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSerializer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.serialization.RecordSerializer;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * An implementation of {@link RecordSerializer} for Avro {@link 
IndexedRecord}.
+ */
+public class AvroRecordSerializer implements RecordSerializer<IndexedRecord> {
+  private final Function<Integer, Schema> schemaFunc;
+
+  public AvroRecordSerializer(Function<Integer, Schema> schemaFunc) {
+    this.schemaFunc = schemaFunc;
+  }
+
+  @Override
+  public byte[] serialize(IndexedRecord input) {
+    return HoodieAvroUtils.avroToBytes(input);
+  }
+
+  @Override
+  public IndexedRecord deserialize(byte[] bytes, int schemaId) {
+    try {
+      return HoodieAvroUtils.bytesToAvro(bytes, schemaFunc.apply(schemaId));
+    } catch (IOException e) {
+      throw new HoodieException("Failed to deserialize Avro record bytes.",e);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java
new file mode 100644
index 00000000000..d2e7920fc1a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordSizeEstimator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.SizeEstimator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+/**
+ * An implementation of {@link SizeEstimator} for Avro {@link BufferedRecord}, 
which estimates the size of
+ * Avro record excluding the internal {@link Schema}.
+ */
+public class AvroRecordSizeEstimator implements 
SizeEstimator<BufferedRecord<IndexedRecord>> {
+  private final long sizeOfSchema;
+
+  public AvroRecordSizeEstimator(Schema recordSchema) {
+    sizeOfSchema = ObjectSizeCalculator.getObjectSize(recordSchema);
+  }
+
+  @Override
+  public long sizeEstimate(BufferedRecord<IndexedRecord> record) {
+    long sizeOfRecord = ObjectSizeCalculator.getObjectSize(record);
+    // generated record do not contain Schema field, so do not need minus size 
of Schema.
+    if (record.getRecord() instanceof SpecificRecord) {
+      return sizeOfRecord;
+    }
+    // do not contain size of Avro schema as the schema is reused among records
+    return sizeOfRecord - sizeOfSchema + 8;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d15f78e18d4..64678cc07aa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -29,11 +29,14 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestMerger;
+import org.apache.hudi.common.serialization.CustomSerializer;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordSerializer;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -171,6 +174,16 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     return record;
   }
 
+  @Override
+  public SizeEstimator<BufferedRecord<IndexedRecord>> getRecordSizeEstimator() 
{
+    return new AvroRecordSizeEstimator(getSchemaHandler().getRequiredSchema());
+  }
+
+  @Override
+  public CustomSerializer<BufferedRecord<IndexedRecord>> getRecordSerializer() 
{
+    return new BufferedRecordSerializer<>(new 
AvroRecordSerializer(this::decodeAvroSchema));
+  }
+
   @Override
   public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
                                                                Schema 
skeletonRequiredSchema,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 86ba617b436..e226f26c339 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,12 +23,16 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableFilterIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -183,6 +187,14 @@ public abstract class HoodieReaderContext<T> {
     return keyFilterOpt;
   }
 
+  public SizeEstimator<BufferedRecord<T>> getRecordSizeEstimator() {
+    return new HoodieRecordSizeEstimator<>(schemaHandler.getRequiredSchema());
+  }
+
+  public CustomSerializer<BufferedRecord<T>> getRecordSerializer() {
+    return new DefaultSerializer<>();
+  }
+
   /**
    * Gets the record iterator based on the type of engine-specific record 
representation from the
    * file.
@@ -476,7 +488,7 @@ public abstract class HoodieReaderContext<T> {
    * Decodes the avro schema with given version ID.
    */
   @Nullable
-  private Schema decodeAvroSchema(Object versionId) {
+  protected Schema decodeAvroSchema(Object versionId) {
     return this.localAvroSchemaCache.getSchema((Integer) 
versionId).orElse(null);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
new file mode 100644
index 00000000000..68ef36628fc
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/serialization/RecordSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.serialization;
+
+import java.io.Serializable;
+
+/**
+ * Serializer for engine-specific record, e.g., {@code InternalRow} in Spark,
+ * {@code RowData} in Flink and {@code IndexedRecord} for Avro.
+ */
+public interface RecordSerializer<T> extends Serializable {
+  /**
+   * Serialize the given record into bytes.
+   *
+   * @param record Engine-specific record.
+   *
+   * @return Serialized bytes for the record.
+   */
+  byte[] serialize(T record);
+
+  /**
+   * Deserialize the given bytes into engine-specific record with given schema.
+   *
+   * @param bytes Bytes for record.
+   * @param schemaId Encoded id for the record schema.
+   *
+   * @return Engine-specific record.
+   */
+  T deserialize(byte[] bytes, int schemaId);
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index 3972fd4257e..21515392624 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -29,6 +29,7 @@ import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.Properties;
 
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -45,7 +46,7 @@ public class BufferedRecord<T> implements Serializable {
   private final Integer schemaId;
   private final boolean isDelete;
 
-  private BufferedRecord(String recordKey, Comparable orderingValue, T record, 
Integer schemaId, boolean isDelete) {
+  public BufferedRecord(String recordKey, Comparable orderingValue, T record, 
Integer schemaId, boolean isDelete) {
     this.recordKey = recordKey;
     this.orderingValue = orderingValue;
     this.record = record;
@@ -107,4 +108,22 @@ public class BufferedRecord<T> implements Serializable {
     }
     return this;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BufferedRecord<?> that = (BufferedRecord<?>) o;
+    return isDelete == that.isDelete && Objects.equals(recordKey, 
that.recordKey) && Objects.equals(orderingValue, that.orderingValue)
+        && Objects.equals(record, that.record) && Objects.equals(schemaId, 
that.schemaId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(recordKey, orderingValue, record, schemaId, isDelete);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
new file mode 100644
index 00000000000..3d522e5a683
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.serialization.RecordSerializer;
+import org.apache.hudi.common.util.SerializationUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of {@link CustomSerializer} for {@link BufferedRecord}.
+ *
+ */
+public class BufferedRecordSerializer<T> implements 
CustomSerializer<BufferedRecord<T>> {
+  public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+  private final Kryo kryo;
+  // Caching ByteArrayOutputStream to avoid recreating it for every operation
+  private final ByteArrayOutputStream baos;
+  private final RecordSerializer<T> recordSerializer;
+
+  public BufferedRecordSerializer(RecordSerializer<T> recordSerializer) {
+    SerializationUtils.KryoInstantiator kryoInstantiator = new 
SerializationUtils.KryoInstantiator();
+    this.kryo = kryoInstantiator.newKryo();
+    this.baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+    this.kryo.setRegistrationRequired(false);
+    this.recordSerializer = recordSerializer;
+  }
+
+  @Override
+  public byte[] serialize(BufferedRecord<T> record) throws IOException {
+    kryo.reset();
+    baos.reset();
+    try (Output output = new Output(baos)) {
+      output.writeString(record.getRecordKey());
+      output.writeInt(record.getSchemaId());
+      output.writeBoolean(record.isDelete());
+      kryo.writeClassAndObject(output, record.getOrderingValue());
+
+      byte[] avroBytes = recordSerializer.serialize(record.getRecord());
+      output.writeInt(avroBytes.length);
+      output.writeBytes(avroBytes);
+    }
+    return baos.toByteArray();
+  }
+
+  @Override
+  public BufferedRecord<T> deserialize(byte[] bytes) {
+    try (Input input = new Input(bytes)) {
+      String recordKey = input.readString();
+      int schemaId = input.readInt();
+      boolean isDelete = input.readBoolean();
+      Comparable orderingValue = (Comparable) kryo.readClassAndObject(input);
+
+      int recordLength = input.readInt();
+      byte[] recordBytes = input.readBytes(recordLength);
+      T record = recordSerializer.deserialize(recordBytes, schemaId);
+      return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, 
isDelete);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 020e19cf0fa..b6b8df51681 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -31,13 +31,11 @@ import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
@@ -126,7 +124,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
-          new HoodieRecordSizeEstimator<>(readerSchema), diskMapType, new 
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
+          readerContext.getRecordSizeEstimator(), diskMapType, 
readerContext.getRecordSerializer(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 1046ce646a0..05f31524a9d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -114,7 +114,7 @@ public class SerializationUtils {
    * This class has a no-arg constructor, suitable for use with reflection 
instantiation. For Details checkout
    * com.twitter.chill.KryoBase.
    */
-  private static class KryoInstantiator implements Serializable {
+  public static class KryoInstantiator implements Serializable {
 
     public Kryo newKryo() {
       Kryo kryo = new Kryo();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
new file mode 100644
index 00000000000..20e1849bdff
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordSizeEstimator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.SizeEstimator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TestAvroRecordSizeEstimator {
+  @Test
+  void testEstimatingRecord() throws IOException {
+    Schema schema = SchemaTestUtil.getSimpleSchema();
+    // testing Avro builtin IndexedRecord
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("name", "lily");
+    record.put("favorite_number", 100);
+    record.put("favorite_color", "red");
+    SizeEstimator<BufferedRecord<IndexedRecord>> estimator = new 
AvroRecordSizeEstimator(schema);
+    BufferedRecord<IndexedRecord> bufferedRecord = new BufferedRecord<>("id", 
100, record, 1, false);
+    long size = estimator.sizeEstimate(bufferedRecord);
+    // size can be various for different OS / JVM version
+    Assertions.assertTrue(size < 400 && size > 0);
+
+    // testing generated IndexedRecord
+    HoodieMetadataRecord metadataRecord = new 
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null, 
null, null);
+    bufferedRecord = new BufferedRecord<>("__all_partitions__", 0, 
metadataRecord, 1, false);
+    size = estimator.sizeEstimate(bufferedRecord);
+    // size can be various for different OS / JVM version
+    Assertions.assertTrue(size < 400 && size > 0);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
new file mode 100644
index 00000000000..1a96c4bf5ff
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/serialization/TestBufferedRecordSerializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.serialization;
+
+import org.apache.hudi.avro.AvroRecordSerializer;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordSerializer;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class TestBufferedRecordSerializer {
+  @Test
+  void testAvroRecordSerAndDe() throws IOException {
+    Schema schema = SchemaTestUtil.getSimpleSchema();
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("name", "lily");
+    record.put("favorite_number", 100);
+    record.put("favorite_color", "red");
+    AvroRecordSerializer avroRecordSerializer = new 
AvroRecordSerializer(integer -> schema);
+    byte[] avroBytes = avroRecordSerializer.serialize(record);
+    IndexedRecord result = avroRecordSerializer.deserialize(avroBytes, 1);
+    Assertions.assertEquals(record, result);
+
+    avroRecordSerializer = new AvroRecordSerializer(integer -> 
HoodieMetadataRecord.SCHEMA$);
+    HoodieMetadataRecord metadataRecord = new 
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null, 
null, null);
+    avroBytes = avroRecordSerializer.serialize(metadataRecord);
+    result = avroRecordSerializer.deserialize(avroBytes, 1);
+    for (int i = 0; i < metadataRecord.getSchema().getFields().size(); i++) {
+      Assertions.assertEquals(metadataRecord.get(i), result.get(i));
+    }
+  }
+
+  @Test
+  void testBufferedRecordSerAndDe() throws IOException {
+    Schema schema = SchemaTestUtil.getSimpleSchema();
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("name", "lily");
+    record.put("favorite_number", 100);
+    record.put("favorite_color", "red");
+    BufferedRecord<IndexedRecord> bufferedRecord = new BufferedRecord<>("id", 
100, record, 1, false);
+
+    AvroRecordSerializer avroRecordSerializer = new 
AvroRecordSerializer(integer -> schema);
+    BufferedRecordSerializer<IndexedRecord> bufferedRecordSerializer = new 
BufferedRecordSerializer<>(avroRecordSerializer);
+
+    byte[] bytes = bufferedRecordSerializer.serialize(bufferedRecord);
+    BufferedRecord<IndexedRecord> result = 
bufferedRecordSerializer.deserialize(bytes);
+    Assertions.assertEquals(bufferedRecord, result);
+
+    avroRecordSerializer = new AvroRecordSerializer(integer -> 
HoodieMetadataRecord.SCHEMA$);
+    bufferedRecordSerializer = new 
BufferedRecordSerializer<>(avroRecordSerializer);
+    HoodieMetadataRecord metadataRecord = new 
HoodieMetadataRecord("__all_partitions__", 1, new HashMap<>(), null, null, 
null, null);
+    bufferedRecord = new BufferedRecord<>("__all_partitions__", 0, 
metadataRecord, 1, false);
+    bytes = bufferedRecordSerializer.serialize(bufferedRecord);
+    result = bufferedRecordSerializer.deserialize(bytes);
+
+    Assertions.assertEquals(bufferedRecord.getRecordKey(), 
result.getRecordKey());
+    Assertions.assertEquals(bufferedRecord.getOrderingValue(), 
result.getOrderingValue());
+    Assertions.assertEquals(bufferedRecord.getSchemaId(), 
result.getSchemaId());
+    Assertions.assertEquals(bufferedRecord.isDelete(), result.isDelete());
+    for (int i = 0; i < metadataRecord.getSchema().getFields().size(); i++) {
+      Assertions.assertEquals(metadataRecord.get(i), 
result.getRecord().get(i));
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 71f0b359999..57044c991c5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -28,9 +28,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -92,6 +94,8 @@ class TestFileGroupRecordBuffer {
     when(readerContext.getSchemaHandler()).thenReturn(schemaHandler);
     when(schemaHandler.getRequiredSchema()).thenReturn(schema);
     when(readerContext.getRecordMerger()).thenReturn(Option.empty());
+    when(readerContext.getRecordSerializer()).thenReturn(new 
DefaultSerializer<>());
+    when(readerContext.getRecordSizeEstimator()).thenReturn(new 
DefaultSizeEstimator<>());
   }
 
   @Test

Reply via email to