ashishmgofficial commented on issue #2149:
URL: https://github.com/apache/hudi/issues/2149#issuecomment-706075094


   Avro Payload : 
   
   ```
   package org.apache.hudi.common.model;
   
   import org.apache.hudi.common.util.Option;
   import org.apache.avro.generic.GenericRecord;
   
   public class DebeziumAvroPayload extends OverwriteWithLatestAvroPayload {
   
     // Field is prefixed with a underscore by transformer to indicate metadata 
field
     public static final String OP_FIELD = "_op";
     public static final String DELETE_OP = "d";
   
     public DebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
       super(record, orderingVal);
     }
   
     public DebeziumAvroPayload(Option<GenericRecord> record) {
       this(record.isPresent() ? record.get() : null, 0); // natural order
     }
   
     @Override
     protected boolean isDeleteRecord(GenericRecord genericRecord) {
       return (genericRecord.get(OP_FIELD) != null && 
genericRecord.get(OP_FIELD).toString().equalsIgnoreCase(
           DELETE_OP));
     }
   }
   ```
   
   SchemaProvider : 
   
   ```
   package org.apache.hudi.utilities.schema;
   
   import org.apache.hudi.common.config.TypedProperties;
   import org.apache.avro.Schema;
   import org.apache.avro.Schema.Field;
   import org.apache.avro.SchemaBuilder;
   import org.apache.avro.SchemaBuilder.FieldAssembler;
   import org.apache.spark.api.java.JavaSparkContext;
   
   public class DebeziumSchemaRegistryProvider extends SchemaRegistryProvider { 
 
     public DebeziumSchemaRegistryProvider(TypedProperties props,
         JavaSparkContext jssc) {
       super(props, jssc);
     }
   
     /**
      * Debezium target schema is a nested structure with many metadata fields. 
This will
      * flatten the schema structure and only require necessary metadata 
information
      * @return
      */
     @Override
     public Schema getTargetSchema() {
       Schema registrySchema = super.getTargetSchema();
   
       Field dataField = registrySchema.getField("after");
       Field tsField = registrySchema.getField("ts_ms");
       Field opField = registrySchema.getField("op");
   
       // Initialize with metadata columns
       FieldAssembler<Schema> payloadFieldAssembler = SchemaBuilder.builder()
           .record("formatted_debezium_payload")
           .fields()
           .name("_" + tsField.name()).type(tsField.schema()).withDefault(null)
           .name("_" + opField.name()).type(tsField.schema()).withDefault(null);
   
       // Add data columns to schema
       dataField.schema()
           .getTypes()
           // "after" field is a union with data schema and null schema, so we 
need to extract only the data schema portion
           .get(dataField.schema().getIndexNamed(registrySchema.getNamespace() 
+ ".Value"))
           .getFields()
           .forEach(field -> {
             
payloadFieldAssembler.name(field.name()).type(field.schema()).withDefault(null);
           });
   
       return payloadFieldAssembler.endRecord();
     }
   }
   ```
   
   Transformer : 
   
   ```
   package org.apache.hudi.utilities.transform;
   
   import org.apache.hudi.common.config.TypedProperties;
   import org.apache.spark.api.java.JavaSparkContext;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.SparkSession;
   
   public class DebeziumTransformer implements Transformer {
   
     @Override
     public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
         TypedProperties properties) {
   
       Dataset<Row> insertedOrUpdatedData = rowDataset
           .select("op", "ts_ms", "after.*")
           .withColumnRenamed("op", "_op")
           .withColumnRenamed("ts_ms", "_ts_ms")
           .filter(rowDataset.col("op").notEqual("d"));
   
       Dataset<Row> deletedData = rowDataset
           .select("op", "ts_ms", "before.*")
           .withColumnRenamed("op", "_op")
           .withColumnRenamed("ts_ms", "_ts_ms")
           .filter(rowDataset.col("op").equalTo("d"));
   
       Dataset<Row> transformedData = insertedOrUpdatedData.union(deletedData);
       System.out.println(transformedData.showString(20, 50, false));
       return transformedData;
     }
   }
   ```
   
   I have added these three classes to hudi-utilities and hudi-common and build 
the jar.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to