Sam-Serpoosh commented on issue #8519: URL: https://github.com/apache/hudi/issues/8519#issuecomment-1544940333
@jpechane This seems to be related to `Debezium` IIUC and how it's serializing the CDC events prior to publishing them to Kafka. As detailed in [this comment](https://github.com/apache/hudi/issues/8519#issuecomment-1542967885) and [this one](https://github.com/apache/hudi/issues/8519#issuecomment-1544320455), there is an **extra/nested** object/record field named `Value` under `after` or `before` and not sure why that's the case. The `before` and `after` fields' type is a **union type** looking like: ```json { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "id", "type": { "type": "int", "connect.default": 0 }, "default": 0 }, { "name": "name", "type": "string" }, { "name": "age", "type": "int" }, { "name": "created_at", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.MicroTimestamp" } ], "default": null }, { "name": "event_ts", "type": [ "null", "long" ], "default": null } ], "connect.name": "<topic_prefix>.<schema_name>.samser_customers.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, ... } ``` However when I consume/deserialize events using Confluent's `kafka-avro-console-consumer`, I see the `before` field has an/a **OBJECT/RECORD** field named `Value` under it and then fields (e.g. `id` and `name`) are associated with that instead of directly being associated with the `before` field. According to the aforementioned Avro schema, **Value** is just the TYPE of the `before` field. But for some reason it comes out as a **field** so we end up with `before.Value.id` (or `after.Value.id`) instead of `after.id`. Any thoughts on why this is happening? We don't see this behavior in the case of the `source` field (whose types is **also** a **record**) and that field is showing the correct behavior. In case needed, here's my Debezium Connector configuration: ``` schema.include.list: public key.converter: io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: http://<REGISTYR_URL>:8081 value.converter: io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url: http://<REGISTRY_URL>:8081 table.include.list: public.samser_customers topic.creation.enable: true topic.creation.default.replication.factor: 1 topic.creation.default.partitions: 1 topic.creation.default.cleanup.policy: compact topic.creation.default.compression.type: lz4 decimal.handling.mode: double tombstones.on.delete: false ``` Thank you very much in advance appreciate your help here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
