shahiidiqbal commented on issue #4839:
URL: https://github.com/apache/hudi/issues/4839#issuecomment-1044122188
@nsivabalan
We use Kafka-connect to get changes (CDC) from MongoDB and after that we use
Spark streaming to read topics from Kafka and store data into our data lake
through Hudi.
Here is the spark streaming code to read kafka topic and then write data
through Hudi:
def write_hudi_table(df, epoch_id):
#we do some cleansing here
df.write.format('org.apache.hudi') \
.options(**tableHudiOptions) \
.mode('append') \
.save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')
rawDf = spark.readStream \
.format("kafka") \
.options(**kafkaOptions) \
.option("subscribe", TOPIC_NAME) \
.load() \
.select(F.col("value").cast("string"))
query = finalDf.writeStream \
.queryName(f"Writing Table {TABLE_NAME}") \
.foreachBatch(write_hudi_table) \
.option("checkpointLocation",
f"{CHECKPOINT_BASE_PATH}/{TABLE_NAME}/checkpoint/") \
.start()
query.awaitTermination()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]