This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 358ecc77a74 [HUDI-8126] Support proto messages for spark kryo 
serializer excluding DynamicMessages (#12052)
358ecc77a74 is described below

commit 358ecc77a74c2fb19f9cdebedc25afe4e8c9e4d9
Author: Vinish Reddy <[email protected]>
AuthorDate: Fri Feb 21 12:52:47 2025 +0530

    [HUDI-8126] Support proto messages for spark kryo serializer excluding 
DynamicMessages (#12052)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 hudi-client/hudi-spark-client/pom.xml                       |  6 ++++++
 .../scala/org/apache/spark/HoodieSparkKryoRegistrar.scala   | 11 ++++++++---
 .../org/apache/hudi/utilities/sources/ProtoKafkaSource.java |  7 +++++++
 .../main/java/org/apache/hudi/utilities/sources/Source.java |  8 ++++++--
 .../apache/hudi/utilities/sources/TestProtoKafkaSource.java | 13 +++++--------
 packaging/hudi-integ-test-bundle/pom.xml                    |  1 +
 packaging/hudi-spark-bundle/pom.xml                         |  1 +
 packaging/hudi-utilities-bundle/pom.xml                     |  1 +
 packaging/hudi-utilities-slim-bundle/pom.xml                |  1 +
 pom.xml                                                     |  8 ++++++++
 10 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/hudi-client/hudi-spark-client/pom.xml 
b/hudi-client/hudi-spark-client/pom.xml
index d1d67b22956..af292ccf2d9 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -93,6 +93,12 @@
       <artifactId>parquet-avro</artifactId>
     </dependency>
 
+    <!-- Used for adding kryo serializers for protobuf -->
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>chill-protobuf</artifactId>
+    </dependency>
+
     <!-- Hoodie - Test -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index d7ad4838d5d..46c898b13db 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -22,12 +22,15 @@ import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
 import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.io.HoodieKeyLookupResult
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.table.{HoodieSparkCopyOnWriteTable, 
HoodieSparkMergeOnReadTable}
+
+import com.esotericsoftware.kryo.{Kryo, Serializer}
 import com.esotericsoftware.kryo.io.{Input, Output}
 import com.esotericsoftware.kryo.serializers.JavaSerializer
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import org.apache.hudi.io.HoodieKeyLookupResult
-import org.apache.hudi.table.{HoodieSparkCopyOnWriteTable, 
HoodieSparkMergeOnReadTable}
+import com.google.protobuf.Message
+import com.twitter.chill.protobuf.ProtobufSerializer
 import org.apache.spark.serializer.KryoRegistrator
 
 /**
@@ -70,6 +73,8 @@ class HoodieSparkKryoRegistrar extends 
HoodieCommonKryoRegistrar with KryoRegist
     //       We cannot remove this entry; otherwise the ordering is changed.
     //       So we replace it with [[HadoopStorageConfiguration]] for Spark.
     kryo.register(classOf[HadoopStorageConfiguration], new JavaSerializer())
+    // NOTE: Protobuf objects are not serializable by default using kryo, need 
to register them explicitly.
+    kryo.addDefaultSerializer(classOf[Message], new ProtobufSerializer())
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index a56c991bebd..a0df75eb665 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -101,6 +101,13 @@ public class ProtoKafkaSource extends 
KafkaSource<JavaRDD<Message>> {
     }
   }
 
+  @Override
+  protected boolean allowSourcePersist() {
+    // Persisting proto messages where protobuf class is unknown, is expensive 
because of the overhead.
+    // Eg: Persisting DynamicMessage using kryo requires attaching descriptor 
info for each message.
+    return persistRdd && 
deserializerName.equals(ByteArrayDeserializer.class.getName());
+  }
+
   private static class ProtoDeserializer implements Serializable {
     private final String className;
     private transient Class protoClass;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index f85a8e9ae22..f9f5d6bfd66 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -71,7 +71,7 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
 
   private final SourceType sourceType;
   private final StorageLevel storageLevel;
-  private final boolean persistRdd;
+  protected final boolean persistRdd;
   private Either<Dataset<Row>, JavaRDD<?>> cachedSourceRdd = null;
 
   protected Source(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
@@ -196,7 +196,7 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
 
   private synchronized void persist(T data) {
     boolean isSparkRdd = data.getClass().isAssignableFrom(Dataset.class) || 
data.getClass().isAssignableFrom(JavaRDD.class);
-    if (persistRdd && isSparkRdd) {
+    if (allowSourcePersist() && isSparkRdd) {
       if (data.getClass().isAssignableFrom(Dataset.class)) {
         Dataset<Row> df = (Dataset<Row>) data;
         cachedSourceRdd = Either.left(df);
@@ -209,6 +209,10 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
     }
   }
 
+  protected boolean allowSourcePersist() {
+    return persistRdd;
+  }
+
   @Override
   public void releaseResources() {
     if (cachedSourceRdd != null && cachedSourceRdd.isLeft()) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 41d85854092..a5cabddf9a3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieErrorTableConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
@@ -58,7 +57,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -119,7 +117,6 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
     props.put("hoodie.streamer.schemaprovider.registry.url", 
MOCK_REGISTRY_URL);
     
props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
 "true");
     
props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), 
String.valueOf(persistSourceRdd));
-    
props.setProperty(HoodieWriteConfig.TAGGED_RECORD_STORAGE_LEVEL_VALUE.key(), 
"MEMORY_ONLY");
     // class name is not required so we'll remove it
     
props.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
     SchemaProvider schemaProvider = new SchemaRegistryProvider(props, jsc());
@@ -130,17 +127,17 @@ public class TestProtoKafkaSource extends 
BaseTestKafkaSource {
     JavaRDD<Message> messagesRead = protoKafkaSource.fetchNext(Option.empty(), 
1000).getBatch().get();
     
assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()),
         new HashSet<>(messagesRead.map(message -> 
PRINTER.print(message)).collect()));
-    verifyRddsArePersisted(protoKafkaSource, 
messagesRead.rdd().toDebugString(), persistSourceRdd);
   }
 
-  @Test
-  public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
-
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testProtoKafkaSourceWithFlattenWrappedPrimitives(boolean 
persistSourceRdd) {
     // topic setup.
-    final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceFlatten";
+    final String topic = TEST_TOPIC_PREFIX + 
"test_proto_kafka_source_flatten_persist_source_rdd_" + persistSourceRdd;
     testUtils.createTopic(topic, 2);
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
     
props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
 "true");
+    
props.setProperty(HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD.key(), 
Boolean.toString(persistSourceRdd));
     SchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, 
jsc());
     Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new 
SourceFormatAdapter(protoKafkaSource);
diff --git a/packaging/hudi-integ-test-bundle/pom.xml 
b/packaging/hudi-integ-test-bundle/pom.xml
index 3485bb5952e..7d51561767b 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -118,6 +118,7 @@
                   <include>com.beust:jcommander</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+                  <include>com.twitter:chill-protobuf</include>
                   <include>org.apache.parquet:parquet-avro</include>
                   <include>com.twitter:parquet-avro</include>
                   <include>com.twitter.common:objectsize</include>
diff --git a/packaging/hudi-spark-bundle/pom.xml 
b/packaging/hudi-spark-bundle/pom.xml
index d6bdb508e2d..708883c58ea 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -101,6 +101,7 @@
                   <include>org.apache.parquet:parquet-avro</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+                  <include>com.twitter:chill-protobuf</include>
 
                   <include>io.dropwizard.metrics:metrics-core</include>
                   <include>io.dropwizard.metrics:metrics-graphite</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml 
b/packaging/hudi-utilities-bundle/pom.xml
index 6f071f131c1..9d56415f376 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -126,6 +126,7 @@
                   <include>com.google.protobuf:protobuf-java</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+                  <include>com.twitter:chill-protobuf</include>
                   <include>io.confluent:kafka-avro-serializer</include>
                   <include>io.confluent:kafka-schema-serializer</include>
                   <include>io.confluent:common-config</include>
diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml 
b/packaging/hudi-utilities-slim-bundle/pom.xml
index bc094ec47d3..eadae968c20 100644
--- a/packaging/hudi-utilities-slim-bundle/pom.xml
+++ b/packaging/hudi-utilities-slim-bundle/pom.xml
@@ -114,6 +114,7 @@
                   <include>com.google.protobuf:protobuf-java</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
+                  <include>com.twitter:chill-protobuf</include>
                   <include>io.confluent:kafka-avro-serializer</include>
                   <include>io.confluent:kafka-schema-serializer</include>
                   <include>io.confluent:common-config</include>
diff --git a/pom.xml b/pom.xml
index 33bb14cd7e0..f1e87dfa888 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,7 @@
     
<presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
     <trino.bundle.bootstrap.scope>compile</trino.bundle.bootstrap.scope>
     
<trino.bundle.bootstrap.shade.prefix>org.apache.hudi.</trino.bundle.bootstrap.shade.prefix>
+    <twitter.chill.version>0.10.0</twitter.chill.version>
     <shadeSources>true</shadeSources>
     <zk-curator.version>2.7.1</zk-curator.version>
     <disruptor.version>3.4.2</disruptor.version>
@@ -1754,6 +1755,13 @@
         <scope>test</scope>
       </dependency>
 
+      <!-- Used for adding kryo serializers for protobuf -->
+      <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>chill-protobuf</artifactId>
+        <version>${twitter.chill.version}</version>
+      </dependency>
+
       <!-- Other Utils -->
       <dependency>
         <groupId>org.apache.flink</groupId>

Reply via email to