This is an automated email from the ASF dual-hosted git repository.

schang pushed a commit to branch ctty/deb-schema-provider
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d607bf8629e90693a951cd5f4f0dc92c6c1ab75b
Author: Shawn Chang <[email protected]>
AuthorDate: Fri Dec 26 14:53:29 2025 -0800

    Allow non-registry schema provider in debezium source
---
 .../hudi/utilities/sources/debezium/DebeziumSource.java     | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index ab0d09dd9b4b..64904055561e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -80,7 +80,7 @@ public abstract class DebeziumSource extends RowSource {
 
   private final KafkaOffsetGen offsetGen;
   private final HoodieIngestionMetrics metrics;
-  private final SchemaRegistryProvider schemaRegistryProvider;
+  private final SchemaProvider schemaProvider;
   private final String deserializerClassName;
 
   public DebeziumSource(TypedProperties props, JavaSparkContext sparkContext,
@@ -100,15 +100,14 @@ public abstract class DebeziumSource extends RowSource {
       throw new HoodieReadFromSourceException(error, e);
     }
 
-    // Currently, debezium source requires Confluent/Kafka schema-registry to 
fetch the latest schema.
-    if (schemaProvider == null || !(schemaProvider instanceof 
SchemaRegistryProvider)) {
-      schemaRegistryProvider = new SchemaRegistryProvider(props, sparkContext);
+    if (schemaProvider == null) {
+      this.schemaProvider = new SchemaRegistryProvider(props, sparkContext);
     } else {
-      schemaRegistryProvider = (SchemaRegistryProvider) schemaProvider;
+      this.schemaProvider = schemaProvider;
     }
 
     if 
(deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
-      KafkaSourceUtil.configureSchemaDeserializer(schemaRegistryProvider, 
props);
+      KafkaSourceUtil.configureSchemaDeserializer(this.schemaProvider, props);
     }
 
     offsetGen = new KafkaOffsetGen(props);
@@ -124,7 +123,7 @@ public abstract class DebeziumSource extends RowSource {
     LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
 
     try {
-      String schemaStr = 
schemaRegistryProvider.fetchSchemaFromRegistry(getStringWithAltKeys(props, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
+      String schemaStr = 
schemaProvider.getSourceHoodieSchema().getAvroSchema().toString();
       Dataset<Row> dataset = toDataset(offsetRanges, offsetGen, schemaStr);
       LOG.info("Spark schema of Kafka Payload for topic {}:\n{}", 
offsetGen.getTopicName(), dataset.schema().treeString());
       LOG.info("New checkpoint string: {}", 
CheckpointUtils.offsetsToStr(offsetRanges));

Reply via email to