This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f99e84  [HUDI-3521] Fixing kakfa key and value serializer value type 
from class to string (#4919)
2f99e84 is described below

commit 2f99e8458ac3cd51000227fce90576e8e1e056be
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 27 11:13:13 2022 -0500

    [HUDI-3521] Fixing kakfa key and value serializer value type from class to 
string (#4919)
---
 .../main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java  | 4 ++--
 .../main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java  | 4 ++--
 .../org/apache/hudi/utilities/sources/debezium/DebeziumSource.java    | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2e4caa0..84c6fd8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource {
       SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
-    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
     deserializerClassName = 
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
             
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
-      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName));
+      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName).getName());
       if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
         if (schemaProvider == null) {
           throw new HoodieIOException("SchemaProvider has to be set to use 
KafkaAvroSchemaDeserializer");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index e8bd577..d6152a1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource {
                          SchemaProvider schemaProvider, 
HoodieDeltaStreamerMetrics metrics) {
     super(properties, sparkContext, sparkSession, schemaProvider);
     this.metrics = metrics;
-    properties.put("key.deserializer", StringDeserializer.class);
-    properties.put("value.deserializer", StringDeserializer.class);
+    properties.put("key.deserializer", StringDeserializer.class.getName());
+    properties.put("value.deserializer", StringDeserializer.class.getName());
     offsetGen = new KafkaOffsetGen(properties);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7018419..d9be692 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource {
                         HoodieDeltaStreamerMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider);
 
-    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+    props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, 
StringDeserializer.class.getName());
     deserializerClassName = 
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
         
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
 
     try {
-      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName));
+      props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName).getName());
     } catch (ClassNotFoundException e) {
       String error = "Could not load custom avro kafka deserializer: " + 
deserializerClassName;
       LOG.error(error);

Reply via email to