Balaji Varadarajan created HUDI-1343:
----------------------------------------
Summary: Add standard schema postprocessor which would rewrite the
schema using spark-avro conversion
Key: HUDI-1343
URL: https://issues.apache.org/jira/browse/HUDI-1343
Project: Apache Hudi
Issue Type: Improvement
Components: DeltaStreamer
Reporter: Balaji Varadarajan
When we use Transformer, the final Schema which we use to convert avro record
to bytes is auto generated by spark. This could be different (due to the way
Avro treats it) from the target schema that is being used to write (as the
target schema could be coming from Schema Registry).
For example :
Schema generated by spark-avro when converting Row to avro
{
"type" : "record",
"name" : "hoodie_source",
"namespace" : "hoodie.source",
"fields" : [ {
"name" : "_ts_ms",
"type" : [ "long", "null" ]
}, {
"name" : "_op",
"type" : "string"
}, {
"name" : "inc_id",
"type" : "int"
}, {
"name" : "year",
"type" : [ "int", "null" ]
}, {
"name" : "violation_desc",
"type" : [ "string", "null" ]
}, {
"name" : "violation_code",
"type" : [ "string", "null" ]
}, {
"name" : "case_individual_id",
"type" : [ "int", "null" ]
}, {
"name" : "flag",
"type" : [ "string", "null" ]
}, {
"name" : "last_modified_ts",
"type" : "long"
} ]
}
is not compatible with the Avro Schema:
{
"type" : "record",
"name" : "formatted_debezium_payload",
"fields" : [ {
"name" : "_ts_ms",
"type" : [ "null", "long" ],
"default" : null
}, {
"name" : "_op",
"type" : "string",
"default" : null
}, {
"name" : "inc_id",
"type" : "int",
"default" : null
}, {
"name" : "year",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "violation_desc",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "violation_code",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "case_individual_id",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "flag",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "last_modified_ts",
"type" : "long",
"default" : null
} ]
}
Note that the type order is different for individual fields :
"type" : [ "null", "string" ], vs "type" : [ "string", "null" ]
Unexpectedly, Avro decoding fails when bytes written with first schema is read
using second schema.
One way to fix is to use configured target schema when generating record bytes
but this is not easy without breaking Record payload constructor API used by
deltastreamer.
The other option is to apply a post-processor on target schema to make it
schema consistent with Transformer generated records.
This ticket is to use the later approach of creating a standard schema
post-processor and adding it by default when Transformer is used.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)