Balaji Varadarajan created HUDI-1343:
----------------------------------------

             Summary: Add standard schema postprocessor which would rewrite the 
schema using spark-avro conversion
                 Key: HUDI-1343
                 URL: https://issues.apache.org/jira/browse/HUDI-1343
             Project: Apache Hudi
          Issue Type: Improvement
          Components: DeltaStreamer
            Reporter: Balaji Varadarajan


When we use Transformer, the final Schema which we use to convert avro record 
to bytes is auto generated by spark. This could be different (due to the way 
Avro treats it) from the target schema that is being used to write (as the 
target schema could be coming from Schema Registry). 

 

For example : 

Schema generated by spark-avro when converting Row to avro

{

  "type" : "record",

  "name" : "hoodie_source",

  "namespace" : "hoodie.source",

  "fields" : [ {

    "name" : "_ts_ms",

    "type" : [ "long", "null" ]

  }, {

    "name" : "_op",

    "type" : "string"

  }, {

    "name" : "inc_id",

    "type" : "int"

  }, {

    "name" : "year",

    "type" : [ "int", "null" ]

  }, {

    "name" : "violation_desc",

    "type" : [ "string", "null" ]

  }, {

    "name" : "violation_code",

    "type" : [ "string", "null" ]

  }, {

    "name" : "case_individual_id",

    "type" : [ "int", "null" ]

  }, {

    "name" : "flag",

    "type" : [ "string", "null" ]

  }, {

    "name" : "last_modified_ts",

    "type" : "long"

  } ]

}

 

is not compatible with the Avro Schema:

 

{

  "type" : "record",

  "name" : "formatted_debezium_payload",

  "fields" : [ {

    "name" : "_ts_ms",

    "type" : [ "null", "long" ],

    "default" : null

  }, {

    "name" : "_op",

    "type" : "string",

    "default" : null

  }, {

    "name" : "inc_id",

    "type" : "int",

    "default" : null

  }, {

    "name" : "year",

    "type" : [ "null", "int" ],

    "default" : null

  }, {

    "name" : "violation_desc",

    "type" : [ "null", "string" ],

    "default" : null

  }, {

    "name" : "violation_code",

    "type" : [ "null", "string" ],

    "default" : null

  }, {

    "name" : "case_individual_id",

    "type" : [ "null", "int" ],

    "default" : null

  }, {

    "name" : "flag",

    "type" : [ "null", "string" ],

    "default" : null

  }, {

    "name" : "last_modified_ts",

    "type" : "long",

    "default" : null

  } ]

}

 

Note that the type order is different for individual fields : 

"type" : [ "null", "string" ], vs  "type" : [ "string", "null" ]

Unexpectedly, Avro decoding fails when bytes written with first schema is read 
using second schema.

 

One way to fix is to use configured target schema when generating record bytes 
but this is not easy without breaking Record payload constructor API used by 
deltastreamer. 

The other option is to apply a post-processor on target schema to make it 
schema consistent with Transformer generated records.

 

This ticket is to use the later approach of creating a standard schema 
post-processor and adding it by default when Transformer is used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to