This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b2057b356ca [HUDI-8378] Fix Avro schema deserializer failing with 
schema evolution (#12111)
b2057b356ca is described below

commit b2057b356caf4e54c43ef6f34b5db8b5546ea39a
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Feb 24 23:30:15 2025 +0530

    [HUDI-8378] Fix Avro schema deserializer failing with schema evolution 
(#12111)
    
    Co-authored-by: Vamsi <[email protected]>
---
 .../hudi/utilities/sources/AvroKafkaSource.java    | 13 +----
 .../utilities/sources/debezium/DebeziumSource.java |  6 ++
 .../utilities/sources/helpers/KafkaSourceUtil.java | 50 ++++++++++++++++
 .../utilities/sources/TestAvroKafkaSource.java     | 35 ++++++++++++
 .../sources/helpers/TestKafkaSourceUtil.java       | 66 ++++++++++++++++++++++
 .../schema/evolved-test-with-default-value.avsc    | 42 ++++++++++++++
 6 files changed, 202 insertions(+), 10 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index f5a076cafe8..7e17b0ff0b2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.StreamContext;
 
@@ -48,7 +49,6 @@ import static 
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREF
 import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
-import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
 
 /**
  * Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -92,7 +92,7 @@ public class AvroKafkaSource extends 
KafkaSource<JavaRDD<GenericRecord>> {
     }
 
     if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
-      configureSchemaDeserializer();
+      KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
     }
     offsetGen = new KafkaOffsetGen(props);
   }
@@ -100,7 +100,7 @@ public class AvroKafkaSource extends 
KafkaSource<JavaRDD<GenericRecord>> {
   @Override
   protected InputBatch<JavaRDD<GenericRecord>> 
readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
     if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
-      configureSchemaDeserializer();
+      KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
       offsetGen = new KafkaOffsetGen(props);
     }
     return super.readFromCheckpoint(lastCheckpoint, sourceLimit);
@@ -135,11 +135,4 @@ public class AvroKafkaSource extends 
KafkaSource<JavaRDD<GenericRecord>> {
       return kafkaRDD.map(consumerRecord -> (GenericRecord) 
consumerRecord.value());
     }
   }
-
-  private void configureSchemaDeserializer() {
-    if (schemaProvider == null) {
-      throw new HoodieReadFromSourceException("SchemaProvider has to be set to 
use KafkaAvroSchemaDeserializer");
-    }
-    props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), 
schemaProvider.getSourceSchema().toString());
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index acb6e925681..d161acce0dc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -34,6 +35,7 @@ import org.apache.hudi.utilities.sources.RowSource;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
@@ -105,6 +107,10 @@ public abstract class DebeziumSource extends RowSource {
       schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider;
     }
 
+    if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+      KafkaSourceUtil.configureSchemaDeserializer(schemaRegistryProvider, 
props);
+    }
+
     offsetGen = new KafkaOffsetGen(props);
     this.metrics = metrics;
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
new file mode 100644
index 00000000000..bbd5379668e
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import com.google.crypto.tink.subtle.Base64;
+
+import java.util.Objects;
+
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
+
+public class KafkaSourceUtil {
+
+  public static final String NATIVE_KAFKA_CONSUMER_GROUP_ID = "group.id";
+  public static final int GROUP_ID_MAX_BYTES_LENGTH = 255;
+
+  public static void configureSchemaDeserializer(SchemaProvider 
schemaProvider, TypedProperties props) {
+    if (schemaProvider == null || 
Objects.isNull(schemaProvider.getSourceSchema())) {
+      throw new HoodieReadFromSourceException("SchemaProvider has to be set to 
use KafkaAvroSchemaDeserializer");
+    }
+    props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), 
schemaProvider.getSourceSchema().toString());
+    // assign consumer group id based on the schema, since if there's a change 
in the schema we ensure KafkaRDDIterator doesn't use cached Kafka Consumer
+    String groupId = props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+    String schemaHash = 
Base64.encode(HashID.hash(schemaProvider.getSourceSchema().toString(), 
HashID.Size.BITS_128));
+    String updatedConsumerGroup = groupId.isEmpty() ? schemaHash
+        : StringUtils.concatenateWithThreshold(String.format("%s_", groupId), 
schemaHash, GROUP_ID_MAX_BYTES_LENGTH);
+    props.put(NATIVE_KAFKA_CONSUMER_GROUP_ID, updatedConsumerGroup);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index baa1b82aa97..0f5e8f9a36f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -22,10 +22,13 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.hash.HashID;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -33,6 +36,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
+import com.google.crypto.tink.subtle.Base64;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -63,9 +67,13 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.GROUP_ID_MAX_BYTES_LENGTH;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.NATIVE_KAFKA_CONSUMER_GROUP_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
@@ -274,4 +282,31 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
             .getBatch().get();
     assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
   }
+
+  @Test
+  void testConfigureSchemaDeserializer() throws IOException {
+    final String topic = TEST_TOPIC_PREFIX + "testAvroSchemaDeserializer";
+    TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+    props.put("hoodie.streamer.source.kafka.value.deserializer.class",
+        KafkaAvroSchemaDeserializer.class.getName());
+    assertThrows(HoodieReadFromSourceException.class, () -> new 
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics));
+
+    String schemaFilePath = 
TestAvroKafkaSource.class.getClassLoader().getResource("schema/simple-test-with-default-value.avsc").getPath();
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    SchemaProvider schemaProvider = 
UtilHelpers.wrapSchemaProviderWithPostProcessor(
+        
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
+    AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), 
spark(), schemaProvider, metrics);
+    
assertTrue(avroKafkaSource.props.containsKey(NATIVE_KAFKA_CONSUMER_GROUP_ID));
+    String groupId = 
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+    assertTrue(groupId.length() <= GROUP_ID_MAX_BYTES_LENGTH);
+
+    schemaFilePath = 
TestAvroKafkaSource.class.getClassLoader().getResource("schema/evolved-test-with-default-value.avsc").getPath();
+    props.put("hoodie.streamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
+    String newGroupId = 
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+    assertNotEquals(groupId, newGroupId);
+    String schemaHash = 
Base64.encode(HashID.hash(schemaProvider.getSourceSchema().toString(), 
HashID.Size.BITS_128));
+    assertEquals(StringUtils.concatenateWithThreshold(String.format("%s_", 
groupId), schemaHash, GROUP_ID_MAX_BYTES_LENGTH), newGroupId);
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
new file mode 100644
index 00000000000..7a1d5172bf8
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import com.google.crypto.tink.subtle.Base64;
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.GROUP_ID_MAX_BYTES_LENGTH;
+import static 
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.NATIVE_KAFKA_CONSUMER_GROUP_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestKafkaSourceUtil {
+
+  @Mock
+  SchemaProvider schemaProvider;
+
+  @Test
+  void testConfigureSchemaDeserializer() {
+    TypedProperties props = new TypedProperties();
+    // should throw exception when schema provider is null.
+    assertThrows(HoodieReadFromSourceException.class, () -> 
KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props));
+
+    String avroSchemaJson =
+        
"{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},"
+            + 
"{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null},"
+            + "{\"name\":\"isEmployed\",\"type\":\"boolean\"}]}";
+    Schema schema = new Schema.Parser().parse(avroSchemaJson);
+    when(schemaProvider.getSourceSchema()).thenReturn(schema);
+    KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
+    assertTrue(props.containsKey(NATIVE_KAFKA_CONSUMER_GROUP_ID));
+    assertTrue(props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "").length() <= 
GROUP_ID_MAX_BYTES_LENGTH);
+    assertEquals(props.getString(KAFKA_VALUE_DESERIALIZER_SCHEMA.key()), 
avroSchemaJson);
+    String schemaHash = Base64.encode(HashID.hash(avroSchemaJson, 
HashID.Size.BITS_128));
+    assertEquals(props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, ""), 
schemaHash);
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc 
b/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc
new file mode 100644
index 00000000000..a8490649c75
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "example.avro",
+  "type": "record",
+  "name": "User",
+  "fields": [
+    {"name": "name", "type": "string"},
+    {"name": "favorite_number",  "type": "int"},
+    {
+      "name": "age",
+      "type": "int",
+      "default": 30
+    },
+    {"name": "favorite_color", "type": "string"},
+    {
+      "name": "email",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "phone",
+      "type": ["null", "string"],
+      "default": null
+    }
+  ]
+}

Reply via email to