This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f99e84 [HUDI-3521] Fixing kakfa key and value serializer value type
from class to string (#4919)
2f99e84 is described below
commit 2f99e8458ac3cd51000227fce90576e8e1e056be
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 27 11:13:13 2022 -0500
[HUDI-3521] Fixing kakfa key and value serializer value type from class to
string (#4919)
---
.../main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java | 4 ++--
.../main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java | 4 ++--
.../org/apache/hudi/utilities/sources/debezium/DebeziumSource.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 2e4caa0..84c6fd8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -65,12 +65,12 @@ public class AvroKafkaSource extends AvroSource {
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP,
StringDeserializer.class.getName());
deserializerClassName =
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName));
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName).getName());
if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieIOException("SchemaProvider has to be set to use
KafkaAvroSchemaDeserializer");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index e8bd577..d6152a1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -52,8 +52,8 @@ public class JsonKafkaSource extends JsonSource {
SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) {
super(properties, sparkContext, sparkSession, schemaProvider);
this.metrics = metrics;
- properties.put("key.deserializer", StringDeserializer.class);
- properties.put("value.deserializer", StringDeserializer.class);
+ properties.put("key.deserializer", StringDeserializer.class.getName());
+ properties.put("value.deserializer", StringDeserializer.class.getName());
offsetGen = new KafkaOffsetGen(properties);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7018419..d9be692 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -82,12 +82,12 @@ public abstract class DebeziumSource extends RowSource {
HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP,
StringDeserializer.class.getName());
deserializerClassName =
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());
try {
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName));
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName).getName());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " +
deserializerClassName;
LOG.error(error);