nsivabalan commented on a change in pull request #3172:
URL: https://github.com/apache/hudi/pull/3172#discussion_r660903610



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -381,7 +381,6 @@ object DataSourceWriteOptions {
   val DEFAULT_ASYNC_COMPACT_ENABLE_OPT_VAL = "true"
 
   // Avro Kafka Source configs
-  val KAFKA_AVRO_VALUE_DESERIALIZER = 
"hoodie.deltastreamer.source.kafka.value.deserializer.class"
-  // Schema to be used in custom kakfa deserializer
-  val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = 
"hoodie.deltastreamer.source.kafka.value.deserializer.schema"
+  val KAFKA_AVRO_VALUE_DESERIALIZER_OPT_KEY = 
"hoodie.deltastreamer.source.kafka.value.deserializer.class"
+  val DEFAULT_KAFKA_AVRO_VALUE_DESERIALIZER_OPT_VAL = 
"org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer"

Review comment:
       I would prefer to make the new deserializer default in next release. by 
that time, I assume atleast you would have tested it and would be running it in 
prod and so we know it works smoothly end to end. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -60,7 +66,7 @@ public AvroKafkaSource(TypedProperties props, 
JavaSparkContext sparkContext, Spa
     super(props, sparkContext, sparkSession, schemaProvider);
 
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
-    String deserializerClassName = 
props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER(), "");
+    String deserializerClassName = 
props.getString(KAFKA_AVRO_VALUE_DESERIALIZER_OPT_KEY(), 
DEFAULT_KAFKA_AVRO_VALUE_DESERIALIZER_OPT_VAL());

Review comment:
       we need to be careful in line 71. since we have updated the default 
value, we may never go into if condition of line 71. Can you fix that please. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
##########
@@ -52,6 +54,10 @@
   // these are native kafka's config. do not change the config names.
   private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = 
"key.deserializer";
   private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = 
"value.deserializer";
+  // These are settings used to pass things to KafkaAvroDeserializer
+  public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = 
"hoodie.deltastreamer.source.kafka.value.deserializer.schema";
+  public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = 
"hoodie.deltastreamer.source.kafka.value.deserializer.";

Review comment:
       can we move this ahead of line 58 and use this to form  the other 
variables. 
   for eg:
   ```
   public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = 
KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to