shahiidiqbal commented on issue #4839:
URL: https://github.com/apache/hudi/issues/4839#issuecomment-1044122188


   @nsivabalan 
   We use Kafka-connect to get changes (CDC) from MongoDB and after that we use 
Spark streaming to read topics from Kafka and store data into our data lake 
through Hudi. 
   
   Here is the spark streaming code to read kafka topic and then write data 
through Hudi:
   
   def write_hudi_table(df, epoch_id):
           #we do some cleansing here
           df.write.format('org.apache.hudi') \
                   .options(**tableHudiOptions) \
                   .mode('append') \
                   .save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')
   
   rawDf = spark.readStream \
                   .format("kafka") \
                   .options(**kafkaOptions) \
                   .option("subscribe", TOPIC_NAME) \
                   .load() \
                   .select(F.col("value").cast("string"))
   
   query = finalDf.writeStream \
                   .queryName(f"Writing Table {TABLE_NAME}") \
                   .foreachBatch(write_hudi_table) \
                   .option("checkpointLocation", 
f"{CHECKPOINT_BASE_PATH}/{TABLE_NAME}/checkpoint/") \
                   .start()
   
   query.awaitTermination()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to