This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e421ed86d8 [HUDI-9322] HoodieRecord#getAvroBytes returns 
ByteArrayOutputStream to reduce bytes copy (#13186)
4e421ed86d8 is described below

commit 4e421ed86d86d414d66284681171bf80c236d97e
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Apr 21 17:09:34 2025 +0800

    [HUDI-9322] HoodieRecord#getAvroBytes returns ByteArrayOutputStream to 
reduce bytes copy (#13186)
---
 .../hudi/client/utils/LegacyArchivedMetaEntryReader.java    |  2 +-
 .../org/apache/hudi/client/model/HoodieFlinkRecord.java     | 13 ++++++-------
 .../org/apache/hudi/common/model/HoodieSparkRecord.java     |  3 ++-
 .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 12 ++++++------
 .../apache/hudi/common/model/HoodieAvroIndexedRecord.java   |  5 +++--
 .../java/org/apache/hudi/common/model/HoodieAvroRecord.java | 10 +++++++---
 .../org/apache/hudi/common/model/HoodieEmptyRecord.java     |  5 +++--
 .../common/model/HoodieRecordCompatibilityInterface.java    |  3 ++-
 .../hudi/common/table/log/block/HoodieAvroDataBlock.java    |  6 +++---
 .../table/timeline/versioning/v1/ArchivedTimelineV1.java    |  2 +-
 .../main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java  |  3 ++-
 .../src/test/java/org/apache/hudi/TestDataSourceUtils.java  |  2 +-
 12 files changed, 37 insertions(+), 29 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index d0fbce93233..c75561b61d0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -100,7 +100,7 @@ public class LegacyArchivedMetaEntryReader {
       Object actionData = record.get(key);
       if (actionData != null) {
         if (actionData instanceof IndexedRecord) {
-          return HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) 
actionData);
+          return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
         } else {
           // should be json bytes.
           try {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 7f56f0b0cb2..7bd0ebf1296 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 
+import java.io.ByteArrayOutputStream;
 import java.util.Map;
 import java.util.Properties;
 
@@ -88,9 +89,7 @@ public class HoodieFlinkRecord extends HoodieRecord<RowData> {
       if (isNullOrEmpty(orderingField)) {
         this.orderingValue = DEFAULT_ORDERING_VALUE;
       } else {
-        boolean utcTimezone = 
Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true"));
-        RowDataAvroQueryContexts.FieldQueryContext context = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, 
utcTimezone).getFieldQueryContext(orderingField);
-        this.orderingValue = (Comparable<?>) context.getValAsJava(this.data);
+        this.orderingValue = (Comparable<?>) 
getColumnValueAsJava(recordSchema, orderingField, props);
       }
     }
     return this.orderingValue;
@@ -127,11 +126,11 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   }
 
   @Override
-  public Object getColumnValueAsJava(Schema recordSchema, String columns, 
Properties props) {
+  public Object getColumnValueAsJava(Schema recordSchema, String column, 
Properties props) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
     RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
-    return 
rowDataQueryContext.getFieldQueryContext(columns).getValAsJava(data);
+    return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data);
   }
 
   @Override
@@ -213,11 +212,11 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) {
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
     RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
     IndexedRecord indexedRecord = (IndexedRecord) 
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, 
getData());
-    return HoodieAvroUtils.avroToBytes(indexedRecord);
+    return HoodieAvroUtils.avroToBytesStream(indexedRecord);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 76cbeec3e8c..e6b336f9870 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
@@ -309,7 +310,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) throws 
IOException {
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 41a3f779d7b..ca91a51b892 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -171,31 +171,31 @@ public class HoodieAvroUtils {
    * TODO serialize other type of record.
    */
   public static Option<byte[]> recordToBytes(HoodieRecord record, Schema 
schema) throws IOException {
-    return 
Option.of(HoodieAvroUtils.indexedRecordToBytes(record.toIndexedRecord(schema, 
new Properties()).get().getData()));
+    return 
Option.of(HoodieAvroUtils.indexedRecordToBytesStream(record.toIndexedRecord(schema,
 new Properties()).get().getData()).toByteArray());
   }
 
   /**
    * Convert a given avro record to bytes.
    */
   public static byte[] avroToBytes(IndexedRecord record) {
-    return indexedRecordToBytes(record);
+    return indexedRecordToBytesStream(record).toByteArray();
   }
 
   /**
    * Convert a given avro record to bytes.
    */
-  public static byte[] avroToBytes(GenericRecord record) {
-    return indexedRecordToBytes(record);
+  public static ByteArrayOutputStream avroToBytesStream(IndexedRecord record) {
+    return indexedRecordToBytesStream(record);
   }
 
-  public static <T extends IndexedRecord> byte[] indexedRecordToBytes(T 
record) {
+  public static <T extends IndexedRecord> ByteArrayOutputStream 
indexedRecordToBytesStream(T record) {
     GenericDatumWriter<T> writer = new 
GenericDatumWriter<>(record.getSchema(), ConvertingGenericData.INSTANCE);
     try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
       BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, 
BINARY_ENCODER.get());
       BINARY_ENCODER.set(encoder);
       writer.write(record, encoder);
       encoder.flush();
-      return out.toByteArray();
+      return out;
     } catch (IOException e) {
       throw new HoodieIOException("Cannot convert GenericRecord to bytes", e);
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index c4d8640e686..f4c53543103 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -34,6 +34,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
@@ -220,8 +221,8 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) {
-    return HoodieAvroUtils.avroToBytes(data);
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) {
+    return HoodieAvroUtils.avroToBytesStream(data);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 71c263240a9..e4782ff5b2c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -34,6 +34,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
@@ -227,12 +228,15 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) throws 
IOException {
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) throws IOException {
     if (data instanceof BaseAvroPayload) {
-      return ((BaseAvroPayload) getData()).getRecordBytes();
+      byte[] data = ((BaseAvroPayload) getData()).getRecordBytes();
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+      baos.write(data);
+      return baos;
     } else {
       Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema, 
props);
-      return avroData.map(HoodieAvroUtils::avroToBytes).orElse(new byte[0]);
+      return avroData.map(HoodieAvroUtils::avroToBytesStream).orElse(new 
ByteArrayOutputStream(0));
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index a4e1e44b758..f482b21e56e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.avro.Schema;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
@@ -155,8 +156,8 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) {
-    return new byte[0];
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) {
+    return new ByteArrayOutputStream(0);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
index 1687fe20406..990940b30e3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
@@ -24,6 +24,7 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import org.apache.avro.Schema;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -53,5 +54,5 @@ public interface HoodieRecordCompatibilityInterface {
 
   Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, 
Properties props) throws IOException;
 
-  byte[] getAvroBytes(Schema recordSchema, Properties props) throws 
IOException;
+  ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties props) 
throws IOException;
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 48e13894101..0e0fcb5804e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -117,11 +117,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
         try {
           // Encode the record into bytes
           // Spark Record not support write avro log
-          byte[] data = s.getAvroBytes(schema, props);
+          ByteArrayOutputStream data = s.getAvroBytes(schema, props);
           // Write the record size
-          output.writeInt(data.length);
+          output.writeInt(data.size());
           // Write the content
-          output.write(data);
+          data.writeTo(output);
         } catch (IOException e) {
           throw new HoodieIOException("IOException converting 
HoodieAvroDataBlock to bytes", e);
         }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
index bf4a03cdcfb..0d4d4d62414 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
@@ -282,7 +282,7 @@ public class ArchivedTimelineV1 extends BaseTimelineV1 
implements HoodieArchived
         if (actionData != null) {
           this.readCommits.computeIfAbsent(instantTime, k -> new HashMap<>());
           if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
-            readCommits.get(instantTime).put(hoodieInstant.getState(), 
HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
+            readCommits.get(instantTime).put(hoodieInstant.getState(), 
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
           } else {
             readCommits.get(instantTime).put(hoodieInstant.getState(), 
actionData.toString().getBytes(StandardCharsets.UTF_8));
           }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index d17b8eca6c1..34e23edc146 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
@@ -218,7 +219,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
   }
 
   @Override
-  public byte[] getAvroBytes(Schema recordSchema, Properties props) throws 
IOException {
+  public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties 
props) throws IOException {
     throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index f8a3d3497f7..3001cde81a6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -291,7 +291,7 @@ public class TestDataSourceUtils {
             .findFirst().get();
 
     IndexedRecord record = 
hoodieMetadataPayload.getData().getInsertValue(null).get();
-    byte[] recordToBytes = HoodieAvroUtils.indexedRecordToBytes(record);
+    byte[] recordToBytes = HoodieAvroUtils.avroToBytes(record);
     GenericRecord genericRecord = HoodieAvroUtils.bytesToAvro(recordToBytes, 
record.getSchema());
 
     HoodieMetadataPayload genericRecordHoodieMetadataPayload = new 
HoodieMetadataPayload(Option.of(genericRecord));

Reply via email to