This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 358ecc77a74 [HUDI-8126] Support proto messages for spark kryo
serializer excluding DynamicMessages (#12052)
358ecc77a74 is described below
commit 358ecc77a74c2fb19f9cdebedc25afe4e8c9e4d9
Author: Vinish Reddy <[email protected]>
AuthorDate: Fri Feb 21 12:52:47 2025 +0530
[HUDI-8126] Support proto messages for spark kryo serializer excluding
DynamicMessages (#12052)
Co-authored-by: Y Ethan Guo <[email protected]>
---
hudi-client/hudi-spark-client/pom.xml | 6 ++++++
.../scala/org/apache/spark/HoodieSparkKryoRegistrar.scala | 11 ++++++++---
.../org/apache/hudi/utilities/sources/ProtoKafkaSource.java | 7 +++++++
.../main/java/org/apache/hudi/utilities/sources/Source.java | 8 ++++++--
.../apache/hudi/utilities/sources/TestProtoKafkaSource.java | 13 +++++--------
packaging/hudi-integ-test-bundle/pom.xml | 1 +
packaging/hudi-spark-bundle/pom.xml | 1 +
packaging/hudi-utilities-bundle/pom.xml | 1 +
packaging/hudi-utilities-slim-bundle/pom.xml | 1 +
pom.xml | 8 ++++++++
10 files changed, 44 insertions(+), 13 deletions(-)
diff --git a/hudi-client/hudi-spark-client/pom.xml
b/hudi-client/hudi-spark-client/pom.xml
index d1d67b22956..af292ccf2d9 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -93,6 +93,12 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- Used for adding kryo serializers for protobuf -->
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-protobuf</artifactId>
+ </dependency>
+
<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index d7ad4838d5d..46c898b13db 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -22,12 +22,15 @@ import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.HoodieKeyLookupResult
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.table.{HoodieSparkCopyOnWriteTable,
HoodieSparkMergeOnReadTable}
+
+import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.serializers.JavaSerializer
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import org.apache.hudi.io.HoodieKeyLookupResult
-import org.apache.hudi.table.{HoodieSparkCopyOnWriteTable,
HoodieSparkMergeOnReadTable}
+import com.google.protobuf.Message
+import com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.spark.serializer.KryoRegistrator
/**
@@ -70,6 +73,8 @@ class HoodieSparkKryoRegistrar extends
HoodieCommonKryoRegistrar with KryoRegist
// We cannot remove this entry; otherwise the ordering is changed.
// So we replace it with [[HadoopStorageConfiguration]] for Spark.
kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
+ // NOTE: Protobuf objects are not serializable by default using kryo, need
to register them explicitly.
+ kryo.addDefaultSerializer(classOf[Message], new ProtobufSerializer())
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index a56c991bebd..a0df75eb665 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -101,6 +101,13 @@ public class ProtoKafkaSource extends
KafkaSource<JavaRDD<Message>> {
}
}
+ @Override
+ protected boolean allowSourcePersist() {
+ // Persisting proto messages where protobuf class is unknown, is expensive
because of the overhead.
+ // Eg: Persisting DynamicMessage using kryo requires attaching descriptor
info for each message.
+ return persistRdd &&
deserializerName.equals(ByteArrayDeserializer.class.getName());
+ }
+
private static class ProtoDeserializer implements Serializable {
private final String className;
private transient Class protoClass;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index f85a8e9ae22..f9f5d6bfd66 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -71,7 +71,7 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
private final SourceType sourceType;
private final StorageLevel storageLevel;
- private final boolean persistRdd;
+ protected final boolean persistRdd;
private Either<Dataset<Row>, JavaRDD<?>> cachedSourceRdd = null;
protected Source(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
@@ -196,7 +196,7 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
private synchronized void persist(T data) {
boolean isSparkRdd = data.getClass().isAssignableFrom(Dataset.class) ||
data.getClass().isAssignableFrom(JavaRDD.class);
- if (persistRdd && isSparkRdd) {
+ if (allowSourcePersist() && isSparkRdd) {
if (data.getClass().isAssignableFrom(Dataset.class)) {
Dataset<Row> df = (Dataset<Row>) data;
cachedSourceRdd = Either.left(df);
@@ -209,6 +209,10 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
}
}
+ protected boolean allowSourcePersist() {
+ return persistRdd;
+ }
+
@Override
public void releaseResources() {
if (cachedSourceRdd != null && cachedSourceRdd.isLeft()) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 41d85854092..a5cabddf9a3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
@@ -58,7 +57,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -119,7 +117,6 @@ public class TestProtoKafkaSource extends
BaseTestKafkaSource {
props.put("hoodie.streamer.schemaprovider.registry.url",
MOCK_REGISTRY_URL);
props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
"true");
props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(),
String.valueOf(persistSourceRdd));
-
props.setProperty(HoodieWriteConfig.TAGGED_RECORD_STORAGE_LEVEL_VALUE.key(),
"MEMORY_ONLY");
// class name is not required so we'll remove it
props.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
SchemaProvider schemaProvider = new SchemaRegistryProvider(props, jsc());
@@ -130,17 +127,17 @@ public class TestProtoKafkaSource extends
BaseTestKafkaSource {
JavaRDD<Message> messagesRead = protoKafkaSource.fetchNext(Option.empty(),
1000).getBatch().get();
assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()),
new HashSet<>(messagesRead.map(message ->
PRINTER.print(message)).collect()));
- verifyRddsArePersisted(protoKafkaSource,
messagesRead.rdd().toDebugString(), persistSourceRdd);
}
- @Test
- public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
-
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testProtoKafkaSourceWithFlattenWrappedPrimitives(boolean
persistSourceRdd) {
// topic setup.
- final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceFlatten";
+ final String topic = TEST_TOPIC_PREFIX +
"test_proto_kafka_source_flatten_persist_source_rdd_" + persistSourceRdd;
testUtils.createTopic(topic, 2);
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
"true");
+
props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(),
Boolean.toString(persistSourceRdd));
SchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props,
jsc());
Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new
SourceFormatAdapter(protoKafkaSource);
diff --git a/packaging/hudi-integ-test-bundle/pom.xml
b/packaging/hudi-integ-test-bundle/pom.xml
index 3485bb5952e..7d51561767b 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -118,6 +118,7 @@
<include>com.beust:jcommander</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+ <include>com.twitter:chill-protobuf</include>
<include>org.apache.parquet:parquet-avro</include>
<include>com.twitter:parquet-avro</include>
<include>com.twitter.common:objectsize</include>
diff --git a/packaging/hudi-spark-bundle/pom.xml
b/packaging/hudi-spark-bundle/pom.xml
index d6bdb508e2d..708883c58ea 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -101,6 +101,7 @@
<include>org.apache.parquet:parquet-avro</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+ <include>com.twitter:chill-protobuf</include>
<include>io.dropwizard.metrics:metrics-core</include>
<include>io.dropwizard.metrics:metrics-graphite</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml
b/packaging/hudi-utilities-bundle/pom.xml
index 6f071f131c1..9d56415f376 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -126,6 +126,7 @@
<include>com.google.protobuf:protobuf-java</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+ <include>com.twitter:chill-protobuf</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:kafka-schema-serializer</include>
<include>io.confluent:common-config</include>
diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml
b/packaging/hudi-utilities-slim-bundle/pom.xml
index bc094ec47d3..eadae968c20 100644
--- a/packaging/hudi-utilities-slim-bundle/pom.xml
+++ b/packaging/hudi-utilities-slim-bundle/pom.xml
@@ -114,6 +114,7 @@
<include>com.google.protobuf:protobuf-java</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+ <include>com.twitter:chill-protobuf</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:kafka-schema-serializer</include>
<include>io.confluent:common-config</include>
diff --git a/pom.xml b/pom.xml
index 33bb14cd7e0..f1e87dfa888 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,7 @@
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
<trino.bundle.bootstrap.scope>compile</trino.bundle.bootstrap.scope>
<trino.bundle.bootstrap.shade.prefix>org.apache.hudi.</trino.bundle.bootstrap.shade.prefix>
+ <twitter.chill.version>0.10.0</twitter.chill.version>
<shadeSources>true</shadeSources>
<zk-curator.version>2.7.1</zk-curator.version>
<disruptor.version>3.4.2</disruptor.version>
@@ -1754,6 +1755,13 @@
<scope>test</scope>
</dependency>
+ <!-- Used for adding kryo serializers for protobuf -->
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-protobuf</artifactId>
+ <version>${twitter.chill.version}</version>
+ </dependency>
+
<!-- Other Utils -->
<dependency>
<groupId>org.apache.flink</groupId>