This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b2057b356ca [HUDI-8378] Fix Avro schema deserializer failing with
schema evolution (#12111)
b2057b356ca is described below
commit b2057b356caf4e54c43ef6f34b5db8b5546ea39a
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Feb 24 23:30:15 2025 +0530
[HUDI-8378] Fix Avro schema deserializer failing with schema evolution
(#12111)
Co-authored-by: Vamsi <[email protected]>
---
.../hudi/utilities/sources/AvroKafkaSource.java | 13 +----
.../utilities/sources/debezium/DebeziumSource.java | 6 ++
.../utilities/sources/helpers/KafkaSourceUtil.java | 50 ++++++++++++++++
.../utilities/sources/TestAvroKafkaSource.java | 35 ++++++++++++
.../sources/helpers/TestKafkaSourceUtil.java | 66 ++++++++++++++++++++++
.../schema/evolved-test-with-default-value.avsc | 42 ++++++++++++++
6 files changed, 202 insertions(+), 10 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index f5a076cafe8..7e17b0ff0b2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -28,6 +28,7 @@ import
org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
@@ -48,7 +49,6 @@ import static
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREF
import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
-import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
@@ -92,7 +92,7 @@ public class AvroKafkaSource extends
KafkaSource<JavaRDD<GenericRecord>> {
}
if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
- configureSchemaDeserializer();
+ KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
}
offsetGen = new KafkaOffsetGen(props);
}
@@ -100,7 +100,7 @@ public class AvroKafkaSource extends
KafkaSource<JavaRDD<GenericRecord>> {
@Override
protected InputBatch<JavaRDD<GenericRecord>>
readFromCheckpoint(Option<Checkpoint> lastCheckpoint, long sourceLimit) {
if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
- configureSchemaDeserializer();
+ KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
offsetGen = new KafkaOffsetGen(props);
}
return super.readFromCheckpoint(lastCheckpoint, sourceLimit);
@@ -135,11 +135,4 @@ public class AvroKafkaSource extends
KafkaSource<JavaRDD<GenericRecord>> {
return kafkaRDD.map(consumerRecord -> (GenericRecord)
consumerRecord.value());
}
}
-
- private void configureSchemaDeserializer() {
- if (schemaProvider == null) {
- throw new HoodieReadFromSourceException("SchemaProvider has to be set to
use KafkaAvroSchemaDeserializer");
- }
- props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(),
schemaProvider.getSourceSchema().toString());
- }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index acb6e925681..d161acce0dc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -34,6 +35,7 @@ import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+import org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
@@ -105,6 +107,10 @@ public abstract class DebeziumSource extends RowSource {
schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider;
}
+ if
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
+ KafkaSourceUtil.configureSchemaDeserializer(schemaRegistryProvider,
props);
+ }
+
offsetGen = new KafkaOffsetGen(props);
this.metrics = metrics;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
new file mode 100644
index 00000000000..bbd5379668e
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaSourceUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import com.google.crypto.tink.subtle.Base64;
+
+import java.util.Objects;
+
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
+
+public class KafkaSourceUtil {
+
+ public static final String NATIVE_KAFKA_CONSUMER_GROUP_ID = "group.id";
+ public static final int GROUP_ID_MAX_BYTES_LENGTH = 255;
+
+ public static void configureSchemaDeserializer(SchemaProvider
schemaProvider, TypedProperties props) {
+ if (schemaProvider == null ||
Objects.isNull(schemaProvider.getSourceSchema())) {
+ throw new HoodieReadFromSourceException("SchemaProvider has to be set to
use KafkaAvroSchemaDeserializer");
+ }
+ props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(),
schemaProvider.getSourceSchema().toString());
+ // assign consumer group id based on the schema, since if there's a change
in the schema we ensure KafkaRDDIterator doesn't use cached Kafka Consumer
+ String groupId = props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+ String schemaHash =
Base64.encode(HashID.hash(schemaProvider.getSourceSchema().toString(),
HashID.Size.BITS_128));
+ String updatedConsumerGroup = groupId.isEmpty() ? schemaHash
+ : StringUtils.concatenateWithThreshold(String.format("%s_", groupId),
schemaHash, GROUP_ID_MAX_BYTES_LENGTH);
+ props.put(NATIVE_KAFKA_CONSUMER_GROUP_ID, updatedConsumerGroup);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index baa1b82aa97..0f5e8f9a36f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -22,10 +22,13 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.HoodieStreamerConfig;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
+import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -33,6 +36,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import com.google.crypto.tink.subtle.Base64;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -63,9 +67,13 @@ import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
+import static
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.GROUP_ID_MAX_BYTES_LENGTH;
+import static
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.NATIVE_KAFKA_CONSUMER_GROUP_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
@@ -274,4 +282,31 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
.getBatch().get();
assertEquals(numMessages,
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
}
+
+ @Test
+ void testConfigureSchemaDeserializer() throws IOException {
+ final String topic = TEST_TOPIC_PREFIX + "testAvroSchemaDeserializer";
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+
+ props.put("hoodie.streamer.source.kafka.value.deserializer.class",
+ KafkaAvroSchemaDeserializer.class.getName());
+ assertThrows(HoodieReadFromSourceException.class, () -> new
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics));
+
+ String schemaFilePath =
TestAvroKafkaSource.class.getClassLoader().getResource("schema/simple-test-with-default-value.avsc").getPath();
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ SchemaProvider schemaProvider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
+
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(),
props, jsc()), props, jsc(), new ArrayList<>());
+ AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(),
spark(), schemaProvider, metrics);
+
assertTrue(avroKafkaSource.props.containsKey(NATIVE_KAFKA_CONSUMER_GROUP_ID));
+ String groupId =
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+ assertTrue(groupId.length() <= GROUP_ID_MAX_BYTES_LENGTH);
+
+ schemaFilePath =
TestAvroKafkaSource.class.getClassLoader().getResource("schema/evolved-test-with-default-value.avsc").getPath();
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(),
schemaProvider, metrics);
+ String newGroupId =
avroKafkaSource.props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "");
+ assertNotEquals(groupId, newGroupId);
+ String schemaHash =
Base64.encode(HashID.hash(schemaProvider.getSourceSchema().toString(),
HashID.Size.BITS_128));
+ assertEquals(StringUtils.concatenateWithThreshold(String.format("%s_",
groupId), schemaHash, GROUP_ID_MAX_BYTES_LENGTH), newGroupId);
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
new file mode 100644
index 00000000000..7a1d5172bf8
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaSourceUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import com.google.crypto.tink.subtle.Base64;
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_VALUE_DESERIALIZER_SCHEMA;
+import static
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.GROUP_ID_MAX_BYTES_LENGTH;
+import static
org.apache.hudi.utilities.sources.helpers.KafkaSourceUtil.NATIVE_KAFKA_CONSUMER_GROUP_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestKafkaSourceUtil {
+
+ @Mock
+ SchemaProvider schemaProvider;
+
+ @Test
+ void testConfigureSchemaDeserializer() {
+ TypedProperties props = new TypedProperties();
+ // should throw exception when schema provider is null.
+ assertThrows(HoodieReadFromSourceException.class, () ->
KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props));
+
+ String avroSchemaJson =
+
"{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},"
+ +
"{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ + "{\"name\":\"isEmployed\",\"type\":\"boolean\"}]}";
+ Schema schema = new Schema.Parser().parse(avroSchemaJson);
+ when(schemaProvider.getSourceSchema()).thenReturn(schema);
+ KafkaSourceUtil.configureSchemaDeserializer(schemaProvider, props);
+ assertTrue(props.containsKey(NATIVE_KAFKA_CONSUMER_GROUP_ID));
+ assertTrue(props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, "").length() <=
GROUP_ID_MAX_BYTES_LENGTH);
+ assertEquals(props.getString(KAFKA_VALUE_DESERIALIZER_SCHEMA.key()),
avroSchemaJson);
+ String schemaHash = Base64.encode(HashID.hash(avroSchemaJson,
HashID.Size.BITS_128));
+ assertEquals(props.getString(NATIVE_KAFKA_CONSUMER_GROUP_ID, ""),
schemaHash);
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc
b/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc
new file mode 100644
index 00000000000..a8490649c75
--- /dev/null
+++
b/hudi-utilities/src/test/resources/schema/evolved-test-with-default-value.avsc
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": "int"},
+ {
+ "name": "age",
+ "type": "int",
+ "default": 30
+ },
+ {"name": "favorite_color", "type": "string"},
+ {
+ "name": "email",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "phone",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+}