nsivabalan commented on issue #5565:
URL: https://github.com/apache/hudi/issues/5565#issuecomment-1125512915

   sample test script that worked for me
   ```
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
   import org.apache.spark.sql.avro.to_avro
   import org.apache.spark.sql.types.{IntegerType, StringType, LongType, 
StructType}
   import java.time.LocalDateTime
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.streaming.OutputMode;
   import org.apache.spark.sql.streaming.ProcessingTime;
   
   val dataStreamReader = spark.
         readStream.
         format("kafka").
         option("kafka.bootstrap.servers", "localhost:9092").
         option("subscribe", "impressions").
         option("startingOffsets", "earliest").
         option("maxOffsetsPerTrigger", 5000).
         option("failOnDataLoss", false)
   
    val schema = new StructType().
         add("impresssiontime",LongType).
         add("impressionid",StringType).
         add("userid",StringType).
         add("adid",StringType)
   
    val df = dataStreamReader.load().
    selectExpr(
           "topic as kafka_topic",
           "CAST(partition AS STRING) kafka_partition",
           "cast(timestamp as String) kafka_timestamp",
           "CAST(offset AS STRING) kafka_offset",
           "CAST(key AS STRING) kafka_key",
           "CAST(value AS STRING) kafka_value",
           "current_timestamp() current_time").
           selectExpr(
           "kafka_topic",
           "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
           "kafka_offset",
           "kafka_timestamp",
           "kafka_key",
           "kafka_value",
           "substr(current_time,1,10) 
partition_date").select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"),
 
schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid",
 "data.userid","data.adid","partition_date")
   
   // writing to hudi
     val writer = df.
     writeStream.format("org.apache.hudi").
     option(TABLE_TYPE.key, "COPY_ON_WRITE").
             option(PRECOMBINE_FIELD.key, "impresssiontime").
             option(RECORDKEY_FIELD.key, "adid").
             option(PARTITIONPATH_FIELD.key, "userid").
             option(HIVE_SYNC_ENABLED.key, false).
             option(HIVE_STYLE_PARTITIONING.key, true).
             option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
             option(STREAMING_IGNORE_FAILED_BATCH.key, false).
             option(STREAMING_RETRY_CNT.key, 0).
             option("hoodie.table.name", "copy_on_write_table").
             option("hoodie.cleaner.commits.retained","6").
             option("hoodie.keep.min.commits","10").
             option("hoodie.keep.max.commits","15").
             option("checkpointLocation", 
"/tmp/hudi_streaming_kafka/checkpoint/").
             outputMode(OutputMode.Append());
   
    writer.trigger(new 
ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to