nsivabalan commented on issue #6475:
URL: https://github.com/apache/hudi/issues/6475#issuecomment-1288003004

   not sure I understand you question. 
   Here is what you can do. 
   
   You can read from kinesis using spark structured streaming (eg: 
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html)
   And then write to hudi via streaming sink.
   
   Eg: 
   ```
   streamingInput
                 .writeStream
                 .format("org.apache.hudi")
                 .options(hudiOptions)
                 .option("checkpointLocation", basePath + "/checkpoint")
                 .mode(Append)
                 .start()
                 .awaitTermination(10000)
   ```
   Ref link: 
https://hudi.apache.org/blog/2021/08/23/async-clustering/#spark-structured-streaming
   
   Here is simple script that I use locally to test spark structured streaming 
sink to hudi which used kafka as the source. You just need to replace it w/ 
kinesis read stream.
   
   ```
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
   import org.apache.spark.sql.avro.to_avro
   import org.apache.spark.sql.types.{IntegerType, StringType, LongType, 
StructType}
   import java.time.LocalDateTime
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.spark.sql.streaming.OutputMode;
   import org.apache.spark.sql.streaming.ProcessingTime;
   
   
   // Define kafka flow
       val dataStreamReader = spark.
         readStream.
         format("kafka").
         option("kafka.bootstrap.servers", "localhost:9092").
         option("subscribe", "impressions").
         option("startingOffsets", "earliest").
         option("maxOffsetsPerTrigger", 5000).
         option("failOnDataLoss", false)
   
    val schema = new StructType().
         add("impresssiontime",LongType).
         add("impressionid",StringType).
         add("userid",StringType).
         add("adid",StringType)
   
    val df = dataStreamReader.load().
    selectExpr(
           "topic as kafka_topic",
           "CAST(partition AS STRING) kafka_partition",
           "cast(timestamp as String) kafka_timestamp",
           "CAST(offset AS STRING) kafka_offset",
           "CAST(key AS STRING) kafka_key",
           "CAST(value AS STRING) kafka_value",
           "current_timestamp() current_time").
           selectExpr(
           "kafka_topic",
           "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
           "kafka_offset",
           "kafka_timestamp",
           "kafka_key",
           "kafka_value",
           "substr(current_time,1,10) partition_date").
           
select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"),
 
schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid",
 "data.userid","data.adid","partition_date")
   
   val writer = df.
     writeStream.format("org.apache.hudi").
     option(TABLE_TYPE.key, "COPY_ON_WRITE").
                option(PRECOMBINE_FIELD.key, "impresssiontime").
                option(RECORDKEY_FIELD.key, "adid").
                option(PARTITIONPATH_FIELD.key, "userid").
                option(HIVE_SYNC_ENABLED.key, false).
                option(HIVE_STYLE_PARTITIONING.key, true).
                option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
                option(STREAMING_IGNORE_FAILED_BATCH.key, false).
                option(STREAMING_RETRY_CNT.key, 0).
                option("hoodie.table.name", "copy_on_write_table").
             option("hoodie.cleaner.commits.retained","6").
             option("hoodie.keep.min.commits","10").
             option("hoodie.keep.max.commits","15").
             
option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
             
option("hoodie.clustering.plan.strategy.small.file.limit","200000000").
             
option("hoodie.clustering.plan.strategy.sort.columns","kafka_partition_offset").
             option("hoodie.clustering.async.max.commits","2").
             option("hoodie.datasource.clustering.async.enable","true").
             option("hoodie.clustering.async.enabled","true").
             
option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
                option("checkpointLocation", 
"/tmp/hudi_streaming_kafka/checkpoint/").
                outputMode(OutputMode.Append());
   
    writer.trigger(new 
ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
   
   ```
   This is just a sample script that I use locally. but should help you write a 
similar one for kinesis stream.
   
   Feel free to close the issue if you get what you were looking for. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to