nsivabalan commented on issue #6475: URL: https://github.com/apache/hudi/issues/6475#issuecomment-1288003004
not sure I understand you question. Here is what you can do. You can read from kinesis using spark structured streaming (eg: https://spark.apache.org/docs/latest/streaming-kinesis-integration.html) And then write to hudi via streaming sink. Eg: ``` streamingInput .writeStream .format("org.apache.hudi") .options(hudiOptions) .option("checkpointLocation", basePath + "/checkpoint") .mode(Append) .start() .awaitTermination(10000) ``` Ref link: https://hudi.apache.org/blog/2021/08/23/async-clustering/#spark-structured-streaming Here is simple script that I use locally to test spark structured streaming sink to hudi which used kafka as the source. You just need to replace it w/ kinesis read stream. ``` import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, from_json,to_json,struct} import org.apache.spark.sql.avro.to_avro import org.apache.spark.sql.types.{IntegerType, StringType, LongType, StructType} import java.time.LocalDateTime import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.ProcessingTime; // Define kafka flow val dataStreamReader = spark. readStream. format("kafka"). option("kafka.bootstrap.servers", "localhost:9092"). option("subscribe", "impressions"). option("startingOffsets", "earliest"). option("maxOffsetsPerTrigger", 5000). option("failOnDataLoss", false) val schema = new StructType(). add("impresssiontime",LongType). add("impressionid",StringType). add("userid",StringType). add("adid",StringType) val df = dataStreamReader.load(). selectExpr( "topic as kafka_topic", "CAST(partition AS STRING) kafka_partition", "cast(timestamp as String) kafka_timestamp", "CAST(offset AS STRING) kafka_offset", "CAST(key AS STRING) kafka_key", "CAST(value AS STRING) kafka_value", "current_timestamp() current_time"). selectExpr( "kafka_topic", "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset", "kafka_offset", "kafka_timestamp", "kafka_key", "kafka_value", "substr(current_time,1,10) partition_date"). select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"), schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid", "data.userid","data.adid","partition_date") val writer = df. writeStream.format("org.apache.hudi"). option(TABLE_TYPE.key, "COPY_ON_WRITE"). option(PRECOMBINE_FIELD.key, "impresssiontime"). option(RECORDKEY_FIELD.key, "adid"). option(PARTITIONPATH_FIELD.key, "userid"). option(HIVE_SYNC_ENABLED.key, false). option(HIVE_STYLE_PARTITIONING.key, true). option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false). option(STREAMING_IGNORE_FAILED_BATCH.key, false). option(STREAMING_RETRY_CNT.key, 0). option("hoodie.table.name", "copy_on_write_table"). option("hoodie.cleaner.commits.retained","6"). option("hoodie.keep.min.commits","10"). option("hoodie.keep.max.commits","15"). option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760"). option("hoodie.clustering.plan.strategy.small.file.limit","200000000"). option("hoodie.clustering.plan.strategy.sort.columns","kafka_partition_offset"). option("hoodie.clustering.async.max.commits","2"). option("hoodie.datasource.clustering.async.enable","true"). option("hoodie.clustering.async.enabled","true"). option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy"). option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/"). outputMode(OutputMode.Append()); writer.trigger(new ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE"); ``` This is just a sample script that I use locally. but should help you write a similar one for kinesis stream. Feel free to close the issue if you get what you were looking for. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
