Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1540961546

   @the-other-tim-brown I've been trying to get a vanilla DeltaStreamer <> 
Kafka job to run in order to consume the Kafka topic which is populated by my 
**Debezium <> Postgres (PG) Connector**.
   
   I upgraded my [Apicurio Schema 
Registry](https://www.apicur.io/registry/docs/apicurio-registry/2.3.x/getting-started/assembly-intro-to-the-registry.html)
 to a later version `2.4.2.Final` which supports **Confluent Compatible APIs** 
since that's what's expected by Hudi's `SchemaRegistryProvider` as can be seen 
[here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L41-L45)
 and 
[here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100).
   
   I also verified that the Debezium PG Connector does the follwing:
   
   - Avro-Serialize change-log events
   - Register the inferred Avro schema in the Apicurio registry
   - Publishes said serialized events into Kafka
   
   However, when I try to ingest these Kafka events using `DeltaStreamer`, I 
noticed the following ERROR:
   
   ```
   Exception in thread "main" org.apache.avro.SchemaParseException: 
"io.debezium.connector.postgresql.Source" is not a defined name. The type of 
the "source" field must be a defined name or a {"type": ...} expression.
           at org.apache.avro.Schema.parse(Schema.java:1265)
           at org.apache.avro.Schema$Parser.parse(Schema.java:1032)
           at org.apache.avro.Schema$Parser.parse(Schema.java:1020)
           at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSchema(SchemaRegistryProvider.java:100)
           at 
org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:107)
           at 
org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:868)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:650)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:142)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:115)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   I believe this makes sense. When I inspected the **inferred Avro schema** 
stored in Apicurio registry, I noticed the following bit there:
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "namespace": "<kafka_topic_name>",
     "fields": [
       ...,
       {
         "name": "source",
         "type": "io.debezium.connector.postgresql.Source"
       },
       {
         "name": "op",
         "type": "string"
       },
       ...
     ],
     ...
   }
   ```
   
   As you see, `source`'s type is **not** in the `{ "type": ... }` structure as 
expected by the Avro-Schema-Parser in [this 
line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100).
 Does this mean `Apicurio`'s **Avro-Serializer** is not able to properly and 
thoroughly serialize AND infer the Avro schema? For reference, the relevant 
bits of my Debezium Connector's configuration are:
   
   ```yaml
   ...
   key.converter.schemas.enable: true
   key.converter: io.apicurio.registry.utils.converter.AvroConverter
   key.converter.apicurio.registry.url: 
http://<registry_URL>:8080/apis/registry/v2
   key.converter.apicurio.registry.auto-register: true
   key.converter.apicurio.registry.find-latest: true
   value.converter.schemas.enable: true
   value.converter: io.apicurio.registry.utils.converter.AvroConverter
   value.converter.apicurio.registry.url: 
http://<registry_URL>:8080/apis/registry/v2
   value.converter.apicurio.registry.auto-register: true
   value.converter.apicurio.registry.find-latest: true
   ...
   ```
   
   I wonder if `io.apicurio.registry.utils.converter.AvroConverter` is **not** 
able to infer the proper type for the `source` field so it ends up inferring 
`"type": "io.debezium.connector.postgresql.Source"` instead which leads to 
failure downstream. I'm curious to see if someone who's using 
`io.confluent.connect.avro.AvroConverter` instead have also seen this issue or 
in their case, the `source` field's type is inferred accurately and in the `{ 
"type": ... }` structure? cc @sydneyhoran


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to