bvaradar commented on issue #2149:
URL: https://github.com/apache/hudi/issues/2149#issuecomment-704601712
@ashishmgofficial : Yes, you are correct. You could create custom
SchemaProvider that inherits from say Confluent Schema Registry based schema
provider. Please see below for an example implementation.
```
public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {
public DebeziumSchemaRegistryProvider(TypedProperties props,
JavaSparkContext jssc) {
super(props, jssc);
}
/**
* Debezium target schema is a nested structure with many metadata fields.
This will
* flatten the schema structure and only require necessary metadata
information
* @return
*/
@Override
public Schema getTargetSchema() {
Schema registrySchema = super.getTargetSchema();
Field dataField = registrySchema.getField("after");
Field tsField = registrySchema.getField("ts_ms");
Field opField = registrySchema.getField("op");
// Initialize with metadata columns
FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
.record("formatted_debezium_payload")
.fields()
.name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
.name("_" + opField.name()).type(tsField.schema()).withDefault(null);
// Add data columns to schema
dataField.schema()
.getTypes()
// "after" field is a union with data schema and null schema, so we
need to extract only the data schema portion
.get(dataField.schema().getIndexNamed(registrySchema.getNamespace()
+ ".Value"))
.getFields()
.forEach(field -> {
payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
});
return payloadFieldAssembler.endRecord();
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]