This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84990aebe80 Fix schema refresh for KafkaAvroSchemaDeserializer (#10118)
84990aebe80 is described below
commit 84990aebe80fcda8813283189956ce565227b32e
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Mon Nov 20 11:17:45 2023 -0800
Fix schema refresh for KafkaAvroSchemaDeserializer (#10118)
Co-authored-by: rmahindra123 <[email protected]>
---
.../hudi/utilities/sources/AvroKafkaSource.java | 29 ++++++++++++++++------
1 file changed, 22 insertions(+), 7 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index e9353bb2666..2bf92280faf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
@@ -78,18 +79,25 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP,
Class.forName(deserializerClassName).getName());
- if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
- if (schemaProvider == null) {
- throw new HoodieReadFromSourceException("SchemaProvider has to be
set to use KafkaAvroSchemaDeserializer");
- }
- props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(),
schemaProvider.getSourceSchema().toString());
- }
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " +
deserializerClassName;
LOG.error(error);
throw new HoodieReadFromSourceException(error, e);
}
- this.offsetGen = new KafkaOffsetGen(props);
+
+ if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+ configureSchemaDeserializer();
+ }
+ offsetGen = new KafkaOffsetGen(props);
+ }
+
+ @Override
+ protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
lastCheckpointStr, long sourceLimit) {
+ if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+ configureSchemaDeserializer();
+ offsetGen = new KafkaOffsetGen(props);
+ }
+ return super.fetchNewData(lastCheckpointStr, sourceLimit);
}
@Override
@@ -121,4 +129,11 @@ public class AvroKafkaSource extends
KafkaSource<GenericRecord> {
return kafkaRDD.map(consumerRecord -> (GenericRecord)
consumerRecord.value());
}
}
+
+ private void configureSchemaDeserializer() {
+ if (schemaProvider == null) {
+ throw new HoodieReadFromSourceException("SchemaProvider has to be set to
use KafkaAvroSchemaDeserializer");
+ }
+ props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(),
schemaProvider.getSourceSchema().toString());
+ }
}