This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84990aebe80 Fix schema refresh for KafkaAvroSchemaDeserializer (#10118)
84990aebe80 is described below

commit 84990aebe80fcda8813283189956ce565227b32e
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Mon Nov 20 11:17:45 2023 -0800

    Fix schema refresh for KafkaAvroSchemaDeserializer (#10118)
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../hudi/utilities/sources/AvroKafkaSource.java    | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index e9353bb2666..2bf92280faf 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
@@ -78,18 +79,25 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
 
     try {
       props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, 
Class.forName(deserializerClassName).getName());
-      if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
-        if (schemaProvider == null) {
-          throw new HoodieReadFromSourceException("SchemaProvider has to be 
set to use KafkaAvroSchemaDeserializer");
-        }
-        props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), 
schemaProvider.getSourceSchema().toString());
-      }
     } catch (ClassNotFoundException e) {
       String error = "Could not load custom avro kafka deserializer: " + 
deserializerClassName;
       LOG.error(error);
       throw new HoodieReadFromSourceException(error, e);
     }
-    this.offsetGen = new KafkaOffsetGen(props);
+
+    if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+      configureSchemaDeserializer();
+    }
+    offsetGen = new KafkaOffsetGen(props);
+  }
+
+  @Override
+  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+      configureSchemaDeserializer();
+      offsetGen = new KafkaOffsetGen(props);
+    }
+    return super.fetchNewData(lastCheckpointStr, sourceLimit);
   }
 
   @Override
@@ -121,4 +129,11 @@ public class AvroKafkaSource extends 
KafkaSource<GenericRecord> {
       return kafkaRDD.map(consumerRecord -> (GenericRecord) 
consumerRecord.value());
     }
   }
+
+  private void configureSchemaDeserializer() {
+    if (schemaProvider == null) {
+      throw new HoodieReadFromSourceException("SchemaProvider has to be set to 
use KafkaAvroSchemaDeserializer");
+    }
+    props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), 
schemaProvider.getSourceSchema().toString());
+  }
 }

Reply via email to