ashishmgofficial commented on issue #2149:
URL: https://github.com/apache/hudi/issues/2149#issuecomment-706075094
Avro Payload :
```
package org.apache.hudi.common.model;
import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.GenericRecord;
public class DebeziumAvroPayload extends OverwriteWithLatestAvroPayload {
// Field is prefixed with a underscore by transformer to indicate metadata
field
public static final String OP_FIELD = "_op";
public static final String DELETE_OP = "d";
public DebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public DebeziumAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
@Override
protected boolean isDeleteRecord(GenericRecord genericRecord) {
return (genericRecord.get(OP_FIELD) != null &&
genericRecord.get(OP_FIELD).toString().equalsIgnoreCase(
DELETE_OP));
}
}
```
SchemaProvider :
```
package org.apache.hudi.utilities.schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.spark.api.java.JavaSparkContext;
public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider {
public DebeziumSchemaRegistryProvider(TypedProperties props,
JavaSparkContext jssc) {
super(props, jssc);
}
/**
* Debezium target schema is a nested structure with many metadata fields.
This will
* flatten the schema structure and only require necessary metadata
information
* @return
*/
@Override
public Schema getTargetSchema() {
Schema registrySchema = super.getTargetSchema();
Field dataField = registrySchema.getField("after");
Field tsField = registrySchema.getField("ts_ms");
Field opField = registrySchema.getField("op");
// Initialize with metadata columns
FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
.record("formatted_debezium_payload")
.fields()
.name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
.name("_" + opField.name()).type(tsField.schema()).withDefault(null);
// Add data columns to schema
dataField.schema()
.getTypes()
// "after" field is a union with data schema and null schema, so we
need to extract only the data schema portion
.get(dataField.schema().getIndexNamed(registrySchema.getNamespace()
+ ".Value"))
.getFields()
.forEach(field -> {
payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
});
return payloadFieldAssembler.endRecord();
}
}
```
Transformer :
```
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DebeziumTransformer implements Transformer {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
TypedProperties properties) {
Dataset<Row> insertedOrUpdatedData = rowDataset
.select("op", "ts_ms", "after.*")
.withColumnRenamed("op", "_op")
.withColumnRenamed("ts_ms", "_ts_ms")
.filter(rowDataset.col("op").notEqual("d"));
Dataset<Row> deletedData = rowDataset
.select("op", "ts_ms", "before.*")
.withColumnRenamed("op", "_op")
.withColumnRenamed("ts_ms", "_ts_ms")
.filter(rowDataset.col("op").equalTo("d"));
Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
System.out.println(transformedData.showString(20, 50, false));
return transformedData;
}
}
```
I have added these three classes to hudi-utilities and hudi-common and build
the jar.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]