Hello Gourav,

I`’ll read this Document.


Thanks.

> On 17 Feb 2022, at 14:05, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> 
> Hi,
> 
> The following excellent documentation may help as well: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>  
> <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch>
>  
> 
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the 
> fundamental thinking behind these concepts.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:51 PM karan alang <karan.al...@gmail.com 
> <mailto:karan.al...@gmail.com>> wrote:
> Thanks, Mich .. will check it out
> 
> regds,
> Karan Alang
> 
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> wrote:
> BTW you can check this Linkedin article of mine on Processing Change Data 
> Capture with Spark Structured Streaming 
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
> 
> It covers the concept of triggers including trigger(once = True) or one-time 
> batch in Spark Structured Streaming
> 
> HTH
> 
>    view my Linkedin profile 
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com 
> <mailto:karan.al...@gmail.com>> wrote:
> Thanks, Mich .. that worked fine!
> 
> 
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebza...@gmail.com 
> <mailto:mich.talebza...@gmail.com>> wrote:
> read below
> 
>             """
>                "foreach" performs custom write logic on each row and 
> "foreachBatch" performs custom write logic on each micro-batch through 
> SendToBigQuery function
>                 foreachBatch(SendToBigQuery) expects 2 parameters, first: 
> micro-batch as DataFrame or Dataset and second: unique id for each batch --> 
> batchId
>                Using foreachBatch, we write each micro batch to storage 
> defined in our custom logic. In this case, we store the output of our 
> streaming application to Google BigQuery table.
>                Note that we are appending data and column "rowkey" is defined 
> as UUID so it can be used as the primary key
>             """
>             result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(SendToBigQuery). \
>                      trigger(processingTime='2 seconds'). \
>                      start()
> 
> now you define your function SendToBigQuery() 
> 
> def SendToBigQuery(df, batchId):
>     if(len(df.take(1))) > 0:
>         df.printSchema()
>         print(f"""batchId is {batchId}""")
>         rows = df.count()
>         print(f""" Total records processed in this run = {rows}""")
>         ......
>     else:
>         print("DataFrame is empty")
> 
> HTH
> 
>    view my Linkedin profile 
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com 
> <mailto:karan.al...@gmail.com>> wrote:
> Hello All,
> 
> I'm using StructuredStreaming to read data from Kafka, and need to do 
> transformation on each individual row.
> 
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is used ?
> 
> Also, when I use foreachBatch, seems the BatchId is available in the function 
> called ? How do I access individual rows ?
> 
> Details are in stackoverflow :   
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>  
> <https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working>
> 
> What is the best approach for this use-case ?
> 
> tia!

Reply via email to