Sam-Serpoosh commented on issue #8519: URL: https://github.com/apache/hudi/issues/8519#issuecomment-1540961546
@the-other-tim-brown I've been trying to get a vanilla DeltaStreamer <> Kafka job to run in order to consume the Kafka topic which is populated by my **Debezium <> Postgres (PG) Connector**. I upgraded my [Apicurio Schema Registry](https://www.apicur.io/registry/docs/apicurio-registry/2.3.x/getting-started/assembly-intro-to-the-registry.html) to a later version `2.4.2.Final` which supports **Confluent Compatible APIs** since that's what's expected by Hudi's `SchemaRegistryProvider` as can be seen [here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L41-L45) and [here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100). I also verified that the Debezium PG Connector does the follwing: - Avro-Serialize change-log events - Register the inferred Avro schema in the Apicurio registry - Publishes said serialized events into Kafka However, when I try to ingest these Kafka events using `DeltaStreamer`, I noticed the following ERROR: ``` Exception in thread "main" org.apache.avro.SchemaParseException: "io.debezium.connector.postgresql.Source" is not a defined name. The type of the "source" field must be a defined name or a {"type": ...} expression. at org.apache.avro.Schema.parse(Schema.java:1265) at org.apache.avro.Schema$Parser.parse(Schema.java:1032) at org.apache.avro.Schema$Parser.parse(Schema.java:1020) at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSchema(SchemaRegistryProvider.java:100) at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:107) at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42) at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:868) at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:650) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:142) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:115) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` I believe this makes sense. When I inspected the **inferred Avro schema** stored in Apicurio registry, I noticed the following bit there: ```json { "type": "record", "name": "Envelope", "namespace": "<kafka_topic_name>", "fields": [ ..., { "name": "source", "type": "io.debezium.connector.postgresql.Source" }, { "name": "op", "type": "string" }, ... ], ... } ``` As you see, `source`'s type is **not** in the `{ "type": ... }` structure as expected by the Avro-Schema-Parser in [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100). Does this mean `Apicurio`'s **Avro-Serializer** is not able to properly and thoroughly serialize AND infer the Avro schema? For reference, the relevant bits of my Debezium Connector's configuration are: ```yaml ... key.converter.schemas.enable: true key.converter: io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url: http://<registry_URL>:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register: true key.converter.apicurio.registry.find-latest: true value.converter.schemas.enable: true value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: http://<registry_URL>:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register: true value.converter.apicurio.registry.find-latest: true ... ``` I wonder if `io.apicurio.registry.utils.converter.AvroConverter` is **not** able to infer the proper type for the `source` field so it ends up inferring `"type": "io.debezium.connector.postgresql.Source"` instead which leads to failure downstream. I'm curious to see if someone who's using `io.confluent.connect.avro.AvroConverter` instead have also seen this issue or in their case, the `source` field's type is inferred accurately and in the `{ "type": ... }` structure? cc @sydneyhoran -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
