cajil opened a new issue #5180:
URL: https://github.com/apache/hudi/issues/5180
While trying to use HUDI delta streamer to read JSON data from Kafka source,
couldn't find a suitable configuration that could upsert sparse events from
source when the target schema is fixed. I am using docker demo environment to
test this.
For example, my initial insert/ bootstrap data will be of the form:
{"pos":"00000000000000008084","partition_key": 1234,"after":{"id":
3,"name":"SILICONES","plcname":"silicones","quantity":8}}
From this, I ran the following command inside adhoc-1 docker container -
spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 --class
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
$HUDI_UTILITIES_BUNDLE --table-type MERGE_ON_READ --source-class
org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field pos
--target-base-path /user/hive/warehouse/test_data_mor --target-table
test_data_mor --props /var/demo/config/kafka-source.properties
--disable-compaction --schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider --payload-class
org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload
--transformer-class
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --hoodie-conf
hoodie.deltastreamer.transformer.sql="SELECT a.pos,a.context,a.after.id as
id,a.after.name as name,a.after.plcname as plcname,a.after.quantity as quantity
FROM <SRC> a"
kafka-source.properties
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=partition_key
hoodie.datasource.write.precombine.field=pos
hoodie.datasource.write.operation=upsert
# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/src_schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/target_schema.avsc
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=test_topic
#Kafka props
bootstrap.servers=kafkabroker:9092
auto.offset.reset=earliest
src_schema.avsc
{
"name": "test_data",
"type": "record",
"fields": [
{
"name": "pos",
"type": "string"
},
{
"name": "partition_key",
"type": "int"
},
{
"name": "after",
"type": {
"name": "after",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "plcname",
"type": "string"
},
{
"name": "quantity",
"type": "int"
}
]
}
}
]
}
target_schema.avsc
{
"name": "test_data",
"type": "record",
"fields": [
{
"name": "pos",
"type": "string"
},
{
"name": "partition_key",
"type": "int"
},
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "plcname",
"type": "string"
},
{
"name": "quantity",
"type": "int"
}
]
}
And update data will be like:
{"pos":"00000000000000009949","partition_key":1234,"after":{"id":3,"name":"update_1","plcname":"updated_1"}}
{"pos":"00000000000000009950","partition_key":1234,"after":{"id":3,"name":"update_2"}}
Expected final result in hudi dataset:
"pos":"00000000000000008084",
"partition_key": 1234,
"id": 3,
"name":"update_2",
"plcname":"update_1",
"quantity":8
For this upsert case, I can't use FilebasedSchemaProvider since the update
events are sparse which contains only selected column updates. I also tried
other SchemaProvider like RowBasedSchemaProvider but it errors out with
org.apache.spark.sql.avro.IncompatibleSchemaException: Unexpected type null.
Please advise on the right configs to use when doing sparse event upserts
from JSONKafkaSource with Hudi DeltaStreamer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]