yihua commented on code in PR #12111:
URL: https://github.com/apache/hudi/pull/12111#discussion_r1963991912


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -274,4 +279,28 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() 
throws IOException {
             .getBatch().get();
     assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
   }
+
+  @Test
+  void testConfigureSchemaDeserializer() throws IOException {
+    final String topic = TEST_TOPIC_PREFIX + "testAvroSchemaDeserializer";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
KafkaAvroSchemaDeserializer.class.getName());

Review Comment:
   ```suggestion
       props.put("hoodie.streamer.source.kafka.value.deserializer.class", 
KafkaAvroSchemaDeserializer.class.getName());
   ```



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -274,4 +279,28 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() 
throws IOException {
             .getBatch().get();
     assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
   }
+
+  @Test
+  void testConfigureSchemaDeserializer() throws IOException {
+    final String topic = TEST_TOPIC_PREFIX + "testAvroSchemaDeserializer";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
KafkaAvroSchemaDeserializer.class.getName());
+    assertThrows(HoodieReadFromSourceException.class, () -> new 
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics));
+
+    String schemaFilePath = 
TestAvroKafkaSource.class.getClassLoader().getResource("schema/simple-test-with-default-value.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    SchemaProvider schemaProvider = 
UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), 
spark(), schemaProvider, metrics);
+    
assertTrue(avroKafkaSource.props.containsKey(NATIVE_KAFKA_CONSUMER_GROUP_ID));
+    String groupId = 
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+    assertTrue(groupId.length() <= GROUP_ID_MAX_BYTES_LENGTH);
+
+    schemaFilePath = 
TestAvroKafkaSource.class.getClassLoader().getResource("schema/evolved-test-with-default-value.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    String newGroupId = 
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+    assertNotEquals(groupId, newGroupId);

Review Comment:
   Let's also validate that the schema hash is in the group ID.  Also is it 
possible to assert the whole config value besides doing `contains` check?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -274,4 +279,28 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() 
throws IOException {
             .getBatch().get();
     assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
   }
+
+  @Test
+  void testConfigureSchemaDeserializer() throws IOException {
+    final String topic = TEST_TOPIC_PREFIX + "testAvroSchemaDeserializer";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
KafkaAvroSchemaDeserializer.class.getName());
+    assertThrows(HoodieReadFromSourceException.class, () -> new 
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics));
+
+    String schemaFilePath = 
TestAvroKafkaSource.class.getClassLoader().getResource("schema/simple-test-with-default-value.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);

Review Comment:
   ```suggestion
       props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to