Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1545048150

   I can reproduce this with a much simpler schema and corresponding Kafka 
key-value messages as well. Let's say we have this schema in our Confluent 
Schema Registry (SR):
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "fields": [
       {
         "name": "before",
         "default": null,
         "type": [
           "null",
           {
             "name": "Value",
             "type": "record",
             "fields": [
               {
                 "name": "id",
                 "type": "int"
               },
               {
                 "name": "fst_name",
                 "type": "string"
               }
             ]
           }
         ]
       },
       {
         "name": "after",
         "default": null,
         "type": [
           "null",
           "Value"
         ]
       },
       {
         "name": "op",
         "type": "string"
       }
     ]
   }
   ```
   
   Then when we try to publish a message in the following format:
   
   ```json
   {
     "after": {
       "id": 10,
       "fst_name": "Bob"
     },  
     "before": null,
     "op": "c" 
   }
   ```
   
   The `kafka-avro-console-producer` throws up with this exception:
   
   ```
   Caused by: org.apache.avro.AvroTypeException: Unknown union branch id        
                                                                                
                                                                                
                 
           at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:434)    
                                                                                
                                                                                
                 
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)        
                                                                                
                                                                                
   
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
                                                                                
                                                                      
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)    
                                                                                
                                                                                
   
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
                                                                                
                                                                                
  
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
                                                                                
                                                                                
 
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
                                                                                
                                                                      
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)    
                                                                                
                                                                                
   
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)    
                                                                                
                                                                                
   
           at 
io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)
                                                                                
                                                                          
           at 
io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:124)
                                                                                
                                                                                
           ... 3 more
   ```
   
   Changing the input message to the following format leads to a successful 
serializing and publishing to Kafka (simply wrapping id & fst_name inside a 
`Value` object):
   
   ```json
   {
     "after": {
       "Value": {                                                               
                                                                                
                                                                                
                 
         "id": 10,
         "fst_name": "Bob"
       }
     },
     "before": null,
     "op": "c"
   }
   ```
   
   This is pretty much what `Debezium` is currently doing IIUC. However on the 
downstream, Hudi's expectation is something like this WRT `before` and `after` 
fields:
   
   ```json
    {
     "after": {
       "id": 10,
       "fst_name": "Bob"
     },  
     "before": null,
     "op": "c" 
   }
   ```
   
   The question is:
   
   1. How should one define an Avro schema which would allow for nullable named 
record types, so the non-Value based format mentioned above would work just 
fine?
   2. How should I get Debezium to do that instead of what it's currently doing 
which is what I've reproduced above?
   
   Regarding #2, I know others have managed to get the 
Debezium-Avro-serialization working without that extra `Value` object 
:disappointed:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to