nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512915
sample test script that worked for me
```
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.types.{IntegerType, StringType, LongType,
StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest").
option("maxOffsetsPerTrigger", 5000).
option("failOnDataLoss", false)
val schema = new StructType().
add("impresssiontime",LongType).
add("impressionid",StringType).
add("userid",StringType).
add("adid",StringType)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10)
partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"),
schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid",
"data.userid","data.adid","partition_date")
// writing to hudi
val writer = df.
writeStream.format("org.apache.hudi").
option(TABLE_TYPE.key, "COPY_ON_WRITE").
option(PRECOMBINE_FIELD.key, "impresssiontime").
option(RECORDKEY_FIELD.key, "adid").
option(PARTITIONPATH_FIELD.key, "userid").
option(HIVE_SYNC_ENABLED.key, false).
option(HIVE_STYLE_PARTITIONING.key, true).
option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
option(STREAMING_IGNORE_FAILED_BATCH.key, false).
option(STREAMING_RETRY_CNT.key, 0).
option("hoodie.table.name", "copy_on_write_table").
option("hoodie.cleaner.commits.retained","6").
option("hoodie.keep.min.commits","10").
option("hoodie.keep.max.commits","15").
option("checkpointLocation",
"/tmp/hudi_streaming_kafka/checkpoint/").
outputMode(OutputMode.Append());
writer.trigger(new
ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]