This is an automated email from the ASF dual-hosted git repository. schang pushed a commit to branch ctty/deb-schema-provider in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d607bf8629e90693a951cd5f4f0dc92c6c1ab75b Author: Shawn Chang <[email protected]> AuthorDate: Fri Dec 26 14:53:29 2025 -0800 Allow non-registry schema provider in debezium source --- .../hudi/utilities/sources/debezium/DebeziumSource.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java index ab0d09dd9b4b..64904055561e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java @@ -80,7 +80,7 @@ public abstract class DebeziumSource extends RowSource { private final KafkaOffsetGen offsetGen; private final HoodieIngestionMetrics metrics; - private final SchemaRegistryProvider schemaRegistryProvider; + private final SchemaProvider schemaProvider; private final String deserializerClassName; public DebeziumSource(TypedProperties props, JavaSparkContext sparkContext, @@ -100,15 +100,14 @@ public abstract class DebeziumSource extends RowSource { throw new HoodieReadFromSourceException(error, e); } - // Currently, debezium source requires Confluent/Kafka schema-registry to fetch the latest schema. - if (schemaProvider == null || !(schemaProvider instanceof SchemaRegistryProvider)) { - schemaRegistryProvider = new SchemaRegistryProvider(props, sparkContext); + if (schemaProvider == null) { + this.schemaProvider = new SchemaRegistryProvider(props, sparkContext); } else { - schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider; + this.schemaProvider = schemaProvider; } if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { - KafkaSourceUtil.configureSchemaDeserializer(schemaRegistryProvider, props); + KafkaSourceUtil.configureSchemaDeserializer(this.schemaProvider, props); } offsetGen = new KafkaOffsetGen(props); @@ -124,7 +123,7 @@ public abstract class DebeziumSource extends RowSource { LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); try { - String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(getStringWithAltKeys(props, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL)); + String schemaStr = schemaProvider.getSourceHoodieSchema().getAvroSchema().toString(); Dataset<Row> dataset = toDataset(offsetRanges, offsetGen, schemaStr); LOG.info("Spark schema of Kafka Payload for topic {}:\n{}", offsetGen.getTopicName(), dataset.schema().treeString()); LOG.info("New checkpoint string: {}", CheckpointUtils.offsetsToStr(offsetRanges));
