samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542967885

   @the-other-tim-brown There's a good chance that this is caused by the 
**input Kafka topic's events** and how they're serialized/deserialized (i.e. 
the way Debezium Connector is currently shaping and publishing the change-log 
messages to Kafka).
   
   I leveraged the `kafka-avro-console-consumer` that comes with Confluent's 
Schema Registry, and here's how my dummy/test table's change-log events look 
like:
   
   ```json
   {
     "before": null,
     "after": {
       "<topic_prefix>.<schema_name>.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     "source": {
       "version": "2.1.2.Final",
       "connector": "postgresql",
       "name": "pg_dev8",
       "ts_ms": 1683734195621,
       "snapshot": {
         "string": "first_in_data_collection"
       },
       "db": "<db_name>",
       "sequence": {
         "string": "[null,\"1213462492184\"]"
       },
       "schema": "public",
       "table": "samser_customers",
       "txId": {
         "long": 806227
       },
       "lsn": {
         "long": 1213462492184
       },
       "xmin": null
     },
     "op": "r",
     "ts_ms": {
       "long": 1683734196050
     },
     "transaction": null
   }
   ```
   
   And here's the corresponding schema which was established in the Schema 
Registry:
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "namespace": "<topic_prefix>.<schema_name>.samser_customers",
     "fields": [
       {
         "name": "before",
         "type": [
           "null",
           {
             "type": "record",
             "name": "Value",
             "fields": [
               {
                 "name": "id",
                 "type": {
                   "type": "int",
                   "connect.default": 0
                 },
                 "default": 0
               },
               {
                 "name": "name",
                 "type": "string"
               },
               {
                 "name": "age",
                 "type": "int"
               },
               {
                 "name": "created_at",
                 "type": [
                   "null",
                   {
                     "type": "long",
                     "connect.version": 1,
                     "connect.name": "io.debezium.time.MicroTimestamp"
                   }
                 ],
                 "default": null
               },
               {
                 "name": "event_ts",
                 "type": [
                   "null",
                   "long"
                 ],
                 "default": null
               }
             ],
             "connect.name": 
"<topic_prefix>.<schema_name>.samser_customers.Value"
           }
         ],
         "default": null
       },
       {
         "name": "after",
         "type": [
           "null",
           "Value"
         ],
         "default": null
       },
       {
         "name": "source",
         "type": {
           "type": "record",
           "name": "Source",
           "namespace": "io.debezium.connector.postgresql",
           "fields": [
             {
               "name": "version",
               "type": "string"
             },
             {
               "name": "connector",
               "type": "string"
             },
             {
               "name": "name",
               "type": "string"
             },
             {
               "name": "ts_ms",
               "type": "long"
             },
             {
               "name": "snapshot",
               "type": [
                 {
                   "type": "string",
                   "connect.version": 1,
                   "connect.parameters": {
                     "allowed": "true,last,false,incremental"
                   },
                   "connect.default": "false",
                   "connect.name": "io.debezium.data.Enum"
                 },
                 "null"
               ],
               "default": "false"
             },
             {
               "name": "db",
               "type": "string"
             },
             {
               "name": "sequence",
               "type": [
                 "null",
                 "string"
               ],
               "default": null
             },
             {
               "name": "schema",
               "type": "string"
             },
             {
               "name": "table",
               "type": "string"
             },
             {
               "name": "txId",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             },
             {
               "name": "lsn",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             },
             {
               "name": "xmin",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             }
           ],
           "connect.name": "io.debezium.connector.postgresql.Source"
         }
       },
       {
         "name": "op",
         "type": "string"
       },
       {
         "name": "ts_ms",
         "type": [
           "null",
           "long"
         ],
         "default": null
       },
       {
         "name": "transaction",
         "type": [
           "null",
           {
             "type": "record",
             "name": "block",
             "namespace": "event",
             "fields": [
               {
                 "name": "id",
                 "type": "string"
               },
               {
                 "name": "total_order",
                 "type": "long"
               },
               {
                 "name": "data_collection_order",
                 "type": "long"
               }
             ],
             "connect.version": 1,
             "connect.name": "event.block"
           }
         ],
         "default": null
       }
     ],
     "connect.version": 1,
     "connect.name": "<topic_prefix>.<schema_name>.samser_customers.Envelope"
   }
   ```
   
   I **think** the extra nested level that we see due to the 
`<topic_prefix>.<schema_name>.samser_customers.Value` field **might** be 
causing this issue?! It looks like this:
   
   ```json
   {
     "before": null,
     "after": {
       "<topic_prefix>.<schema_name>.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     "source": {
       ...
     }
   ```
   
   However, according to [this 
line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#LL44C57-L44C156)
 in `PostgresDebeziumSource` which references the [Debezium 
docs](https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-create-events),
 it expects `before` and `after` fields won't be nested under anything else. 
IIUC, Hudi expects something like the following when it comes to `before` and 
`after` field:
   
   ```json
   {
     ...
     "after": {
       "id": 1,
       "name": "Bob",
       "age": 40,
       "created_at": {
         "long": 1683661733071814
       },
       "event_ts": {
         "long": 1681984800000
       }
     },
     ...
   }
   ```
   
   But we're giving it:
   
   ```json
   {
     ...
     "after": {
       "pg_dev8.public.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     ...
   }
   ```
   
   So when `after.*` is executed in the following snippet:
   
   
https://github.com/apache/hudi/blob/622d27a099f5dec96f992fd423b666083da4b24a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L55-L66
   
   Instead of getting all fields of `after` in a FLAT fashion (i.e. `id`, 
`name` and others), it gets **one OBJECT** named 
`<topic_prefix>.<schema_name>.samser_customers.Value` which has those fields 
nested under it.
   
   Do you think this is the issue or I'm mistaken?! Also, IF this is the case, 
I wonder what configuration I should apply to my **Debezium-Connector** so it 
won't create this extra nested layer under `after` and/or `before` fields. cc 
@sydneyhoran


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to