This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e421ed86d8 [HUDI-9322] HoodieRecord#getAvroBytes returns
ByteArrayOutputStream to reduce bytes copy (#13186)
4e421ed86d8 is described below
commit 4e421ed86d86d414d66284681171bf80c236d97e
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Apr 21 17:09:34 2025 +0800
[HUDI-9322] HoodieRecord#getAvroBytes returns ByteArrayOutputStream to
reduce bytes copy (#13186)
---
.../hudi/client/utils/LegacyArchivedMetaEntryReader.java | 2 +-
.../org/apache/hudi/client/model/HoodieFlinkRecord.java | 13 ++++++-------
.../org/apache/hudi/common/model/HoodieSparkRecord.java | 3 ++-
.../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 12 ++++++------
.../apache/hudi/common/model/HoodieAvroIndexedRecord.java | 5 +++--
.../java/org/apache/hudi/common/model/HoodieAvroRecord.java | 10 +++++++---
.../org/apache/hudi/common/model/HoodieEmptyRecord.java | 5 +++--
.../common/model/HoodieRecordCompatibilityInterface.java | 3 ++-
.../hudi/common/table/log/block/HoodieAvroDataBlock.java | 6 +++---
.../table/timeline/versioning/v1/ArchivedTimelineV1.java | 2 +-
.../main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java | 3 ++-
.../src/test/java/org/apache/hudi/TestDataSourceUtils.java | 2 +-
12 files changed, 37 insertions(+), 29 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index d0fbce93233..c75561b61d0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -100,7 +100,7 @@ public class LegacyArchivedMetaEntryReader {
Object actionData = record.get(key);
if (actionData != null) {
if (actionData instanceof IndexedRecord) {
- return HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)
actionData);
+ return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
} else {
// should be json bytes.
try {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 7f56f0b0cb2..7bd0ebf1296 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import java.io.ByteArrayOutputStream;
import java.util.Map;
import java.util.Properties;
@@ -88,9 +89,7 @@ public class HoodieFlinkRecord extends HoodieRecord<RowData> {
if (isNullOrEmpty(orderingField)) {
this.orderingValue = DEFAULT_ORDERING_VALUE;
} else {
- boolean utcTimezone =
Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true"));
- RowDataAvroQueryContexts.FieldQueryContext context =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema,
utcTimezone).getFieldQueryContext(orderingField);
- this.orderingValue = (Comparable<?>) context.getValAsJava(this.data);
+ this.orderingValue = (Comparable<?>)
getColumnValueAsJava(recordSchema, orderingField, props);
}
}
return this.orderingValue;
@@ -127,11 +126,11 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
}
@Override
- public Object getColumnValueAsJava(Schema recordSchema, String columns,
Properties props) {
+ public Object getColumnValueAsJava(Schema recordSchema, String column,
Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
- return
rowDataQueryContext.getFieldQueryContext(columns).getValAsJava(data);
+ return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data);
}
@Override
@@ -213,11 +212,11 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) {
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
IndexedRecord indexedRecord = (IndexedRecord)
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema,
getData());
- return HoodieAvroUtils.avroToBytes(indexedRecord);
+ return HoodieAvroUtils.avroToBytesStream(indexedRecord);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 76cbeec3e8c..e6b336f9870 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@@ -309,7 +310,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) throws
IOException {
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) throws IOException {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 41a3f779d7b..ca91a51b892 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -171,31 +171,31 @@ public class HoodieAvroUtils {
* TODO serialize other type of record.
*/
public static Option<byte[]> recordToBytes(HoodieRecord record, Schema
schema) throws IOException {
- return
Option.of(HoodieAvroUtils.indexedRecordToBytes(record.toIndexedRecord(schema,
new Properties()).get().getData()));
+ return
Option.of(HoodieAvroUtils.indexedRecordToBytesStream(record.toIndexedRecord(schema,
new Properties()).get().getData()).toByteArray());
}
/**
* Convert a given avro record to bytes.
*/
public static byte[] avroToBytes(IndexedRecord record) {
- return indexedRecordToBytes(record);
+ return indexedRecordToBytesStream(record).toByteArray();
}
/**
* Convert a given avro record to bytes.
*/
- public static byte[] avroToBytes(GenericRecord record) {
- return indexedRecordToBytes(record);
+ public static ByteArrayOutputStream avroToBytesStream(IndexedRecord record) {
+ return indexedRecordToBytesStream(record);
}
- public static <T extends IndexedRecord> byte[] indexedRecordToBytes(T
record) {
+ public static <T extends IndexedRecord> ByteArrayOutputStream
indexedRecordToBytesStream(T record) {
GenericDatumWriter<T> writer = new
GenericDatumWriter<>(record.getSchema(), ConvertingGenericData.INSTANCE);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
BINARY_ENCODER.get());
BINARY_ENCODER.set(encoder);
writer.write(record, encoder);
encoder.flush();
- return out.toByteArray();
+ return out;
} catch (IOException e) {
throw new HoodieIOException("Cannot convert GenericRecord to bytes", e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index c4d8640e686..f4c53543103 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -34,6 +34,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@@ -220,8 +221,8 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) {
- return HoodieAvroUtils.avroToBytes(data);
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) {
+ return HoodieAvroUtils.avroToBytesStream(data);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 71c263240a9..e4782ff5b2c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -34,6 +34,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@@ -227,12 +228,15 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) throws
IOException {
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) throws IOException {
if (data instanceof BaseAvroPayload) {
- return ((BaseAvroPayload) getData()).getRecordBytes();
+ byte[] data = ((BaseAvroPayload) getData()).getRecordBytes();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+ baos.write(data);
+ return baos;
} else {
Option<IndexedRecord> avroData = getData().getInsertValue(recordSchema,
props);
- return avroData.map(HoodieAvroUtils::avroToBytes).orElse(new byte[0]);
+ return avroData.map(HoodieAvroUtils::avroToBytesStream).orElse(new
ByteArrayOutputStream(0));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index a4e1e44b758..f482b21e56e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@@ -155,8 +156,8 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) {
- return new byte[0];
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) {
+ return new ByteArrayOutputStream(0);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
index 1687fe20406..990940b30e3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java
@@ -24,6 +24,7 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.Schema;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
@@ -53,5 +54,5 @@ public interface HoodieRecordCompatibilityInterface {
Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) throws IOException;
- byte[] getAvroBytes(Schema recordSchema, Properties props) throws
IOException;
+ ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties props)
throws IOException;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 48e13894101..0e0fcb5804e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -117,11 +117,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
try {
// Encode the record into bytes
// Spark Record not support write avro log
- byte[] data = s.getAvroBytes(schema, props);
+ ByteArrayOutputStream data = s.getAvroBytes(schema, props);
// Write the record size
- output.writeInt(data.length);
+ output.writeInt(data.size());
// Write the content
- output.write(data);
+ data.writeTo(output);
} catch (IOException e) {
throw new HoodieIOException("IOException converting
HoodieAvroDataBlock to bytes", e);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
index bf4a03cdcfb..0d4d4d62414 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
@@ -282,7 +282,7 @@ public class ArchivedTimelineV1 extends BaseTimelineV1
implements HoodieArchived
if (actionData != null) {
this.readCommits.computeIfAbsent(instantTime, k -> new HashMap<>());
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
- readCommits.get(instantTime).put(hoodieInstant.getState(),
HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) actionData));
+ readCommits.get(instantTime).put(hoodieInstant.getState(),
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
} else {
readCommits.get(instantTime).put(hoodieInstant.getState(),
actionData.toString().getBytes(StandardCharsets.UTF_8));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index d17b8eca6c1..34e23edc146 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@@ -218,7 +219,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
}
@Override
- public byte[] getAvroBytes(Schema recordSchema, Properties props) throws
IOException {
+ public ByteArrayOutputStream getAvroBytes(Schema recordSchema, Properties
props) throws IOException {
throw new UnsupportedOperationException("Not supported for
HoodieHiveRecord");
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index f8a3d3497f7..3001cde81a6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -291,7 +291,7 @@ public class TestDataSourceUtils {
.findFirst().get();
IndexedRecord record =
hoodieMetadataPayload.getData().getInsertValue(null).get();
- byte[] recordToBytes = HoodieAvroUtils.indexedRecordToBytes(record);
+ byte[] recordToBytes = HoodieAvroUtils.avroToBytes(record);
GenericRecord genericRecord = HoodieAvroUtils.bytesToAvro(recordToBytes,
record.getSchema());
HoodieMetadataPayload genericRecordHoodieMetadataPayload = new
HoodieMetadataPayload(Option.of(genericRecord));