cajil opened a new issue #5180:
URL: https://github.com/apache/hudi/issues/5180


   While trying to use HUDI delta streamer to read JSON data from Kafka source, 
couldn't find a suitable configuration that could upsert sparse events from 
source when the target schema is fixed. I am using docker demo environment to 
test this.
   
   For example, my initial insert/ bootstrap data will be of the form:
   {"pos":"00000000000000008084","partition_key": 1234,"after":{"id": 
3,"name":"SILICONES","plcname":"silicones","quantity":8}}
   
   From this, I ran the following command inside adhoc-1 docker container -
   spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4  --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
$HUDI_UTILITIES_BUNDLE   --table-type MERGE_ON_READ   --source-class 
org.apache.hudi.utilities.sources.JsonKafkaSource   --source-ordering-field pos 
  --target-base-path /user/hive/warehouse/test_data_mor   --target-table 
test_data_mor   --props /var/demo/config/kafka-source.properties   
--disable-compaction --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider --payload-class 
org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload 
--transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --hoodie-conf 
hoodie.deltastreamer.transformer.sql="SELECT a.pos,a.context,a.after.id as 
id,a.after.name as name,a.after.plcname as plcname,a.after.quantity as quantity 
FROM <SRC> a"
   
   kafka-source.properties
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=partition_key
   hoodie.datasource.write.precombine.field=pos
   hoodie.datasource.write.operation=upsert
   # Schema provider props (change to absolute path based on your installation)
   
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/src_schema.avsc
   
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/target_schema.avsc
   # Kafka Source
   hoodie.deltastreamer.source.kafka.topic=test_topic
   #Kafka props
   bootstrap.servers=kafkabroker:9092
   auto.offset.reset=earliest
   
   src_schema.avsc
   {
     "name": "test_data",
     "type": "record",
     "fields": [
       {
         "name": "pos",
         "type": "string"
       },
       {
         "name": "partition_key",
         "type": "int"
       },
       {
         "name": "after",
         "type": {
           "name": "after",
           "type": "record",
           "fields": [
             {
               "name": "id",
               "type": "int"
             },
             {
               "name": "name",
               "type": "string"
             },
             {
               "name": "plcname",
               "type": "string"
             },
             {
               "name": "quantity",
               "type": "int"
             }
           ]
         }
       }
     ]
   }
   
   target_schema.avsc
   {
     "name": "test_data",
     "type": "record",
     "fields": [
       {
         "name": "pos",
         "type": "string"
       },
       {
         "name": "partition_key",
         "type": "int"
       },
       {
         "name": "id",
         "type": "int"
       },
       {
         "name": "name",
         "type": "string"
       },
       {
         "name": "plcname",
         "type": "string"
       },
       {
         "name": "quantity",
         "type": "int"
       }
     ]
   }
   
   
   
   And update data will be like:
   
{"pos":"00000000000000009949","partition_key":1234,"after":{"id":3,"name":"update_1","plcname":"updated_1"}}
   
{"pos":"00000000000000009950","partition_key":1234,"after":{"id":3,"name":"update_2"}}
   
   Expected final result in hudi dataset:
   "pos":"00000000000000008084",
   "partition_key": 1234,
   "id": 3,
   "name":"update_2",
   "plcname":"update_1",
   "quantity":8
   
   For this upsert case, I can't use FilebasedSchemaProvider since the update 
events are sparse which contains only selected column updates. I also tried 
other SchemaProvider like RowBasedSchemaProvider but it errors out with 
org.apache.spark.sql.avro.IncompatibleSchemaException: Unexpected type null.
   
   Please advise on the right configs to use when doing sparse event upserts 
from JSONKafkaSource with Hudi DeltaStreamer.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to