Hello Gourav, I`’ll read this Document.
Thanks. > On 17 Feb 2022, at 14:05, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > > Hi, > > The following excellent documentation may help as well: > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch > > <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch> > > > The book from Dr. Zaharia on SPARK does a fantastic job in explaining the > fundamental thinking behind these concepts. > > > Regards, > Gourav Sengupta > > > > On Wed, Feb 9, 2022 at 8:51 PM karan alang <karan.al...@gmail.com > <mailto:karan.al...@gmail.com>> wrote: > Thanks, Mich .. will check it out > > regds, > Karan Alang > > On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebza...@gmail.com > <mailto:mich.talebza...@gmail.com>> wrote: > BTW you can check this Linkedin article of mine on Processing Change Data > Capture with Spark Structured Streaming > <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D> > > It covers the concept of triggers including trigger(once = True) or one-time > batch in Spark Structured Streaming > > HTH > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > > On Mon, 7 Feb 2022 at 23:06, karan alang <karan.al...@gmail.com > <mailto:karan.al...@gmail.com>> wrote: > Thanks, Mich .. that worked fine! > > > On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebza...@gmail.com > <mailto:mich.talebza...@gmail.com>> wrote: > read below > > """ > "foreach" performs custom write logic on each row and > "foreachBatch" performs custom write logic on each micro-batch through > SendToBigQuery function > foreachBatch(SendToBigQuery) expects 2 parameters, first: > micro-batch as DataFrame or Dataset and second: unique id for each batch --> > batchId > Using foreachBatch, we write each micro batch to storage > defined in our custom logic. In this case, we store the output of our > streaming application to Google BigQuery table. > Note that we are appending data and column "rowkey" is defined > as UUID so it can be used as the primary key > """ > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.ticker").alias("ticker") \ > , col("parsed_value.timeissued").alias("timeissued") \ > , col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > foreachBatch(SendToBigQuery). \ > trigger(processingTime='2 seconds'). \ > start() > > now you define your function SendToBigQuery() > > def SendToBigQuery(df, batchId): > if(len(df.take(1))) > 0: > df.printSchema() > print(f"""batchId is {batchId}""") > rows = df.count() > print(f""" Total records processed in this run = {rows}""") > ...... > else: > print("DataFrame is empty") > > HTH > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > > On Mon, 7 Feb 2022 at 21:06, karan alang <karan.al...@gmail.com > <mailto:karan.al...@gmail.com>> wrote: > Hello All, > > I'm using StructuredStreaming to read data from Kafka, and need to do > transformation on each individual row. > > I'm trying to use 'foreach' (or foreachBatch), and running into issues. > Basic question - how is the row passed to the function when foreach is used ? > > Also, when I use foreachBatch, seems the BatchId is available in the function > called ? How do I access individual rows ? > > Details are in stackoverflow : > https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working > > <https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working> > > What is the best approach for this use-case ? > > tia!