Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread karan alang
Thanks, Gourav - will check out the book.

regds,
Karan Alang

On Thu, Feb 17, 2022 at 9:05 AM Gourav Sengupta 
wrote:

> Hi,
>
> The following excellent documentation may help as well:
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
> fundamental thinking behind these concepts.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:
>
>> Thanks, Mich .. will check it out
>>
>> regds,
>> Karan Alang
>>
>> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
>> wrote:
>>
>>> BTW you can check this Linkedin article of mine on Processing Change
>>> Data Capture with Spark Structured Streaming
>>> 
>>>
>>>
>>> It covers the concept of triggers including trigger(once = True) or
>>> one-time batch in Spark Structured Streaming
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>>
 Thanks, Mich .. that worked fine!


 On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> read below
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> *foreachBatch(SendToBigQuery) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch --> batchId*
>Using foreachBatch, we write each micro batch to
> storage defined in our custom logic. In this case, we store the output of
> our streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>,
> col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(SendToBigQuery)*. \
>  trigger(processingTime='2 seconds'). \
>  start()
>
> now you define your function *SendToBigQuery() *
>
>
> *def SendToBigQuery(df, batchId):*
>
> if(len(df.take(1))) > 0:
>
> df.printSchema()
>
> print(f"""batchId is {batchId}""")
>
> rows = df.count()
>
> print(f""" Total records processed in this run = {rows}""")
>
> ..
>
> else:
>
> print("DataFrame is empty")
>
> *HTH*
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 21:06, karan alang 
> wrote:
>
>> Hello All,
>>
>> I'm using StructuredStreaming to read data from Kafka, and need to do
>> transformation on each individual row.
>>
>> I'm trying to use 'foreach' (or foreachBatch), and running into
>> issues.
>> Basic question - how is the row passed to the function when foreach
>> is used ?
>>
>> Also, when I use foreachBatch, seems the BatchId is available in the
>> function called ? How do I access individual rows ?
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>
>> What is the best approach for this use-case ?
>>
>> tia!
>>
>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-21 Thread Danilo Sousa
Hello Gourav,

I`’ll read this Document.


Thanks.

> On 17 Feb 2022, at 14:05, Gourav Sengupta  wrote:
> 
> Hi,
> 
> The following excellent documentation may help as well: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>  
> 
>  
> 
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the 
> fundamental thinking behind these concepts.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:51 PM karan alang  > wrote:
> Thanks, Mich .. will check it out
> 
> regds,
> Karan Alang
> 
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh  > wrote:
> BTW you can check this Linkedin article of mine on Processing Change Data 
> Capture with Spark Structured Streaming 
> 
> 
> It covers the concept of triggers including trigger(once = True) or one-time 
> batch in Spark Structured Streaming
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 23:06, karan alang  > wrote:
> Thanks, Mich .. that worked fine!
> 
> 
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh  > wrote:
> read below
> 
> """
>"foreach" performs custom write logic on each row and 
> "foreachBatch" performs custom write logic on each micro-batch through 
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first: 
> micro-batch as DataFrame or Dataset and second: unique id for each batch --> 
> batchId
>Using foreachBatch, we write each micro batch to storage 
> defined in our custom logic. In this case, we store the output of our 
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is defined 
> as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(SendToBigQuery). \
>  trigger(processingTime='2 seconds'). \
>  start()
> 
> now you define your function SendToBigQuery() 
> 
> def SendToBigQuery(df, batchId):
> if(len(df.take(1))) > 0:
> df.printSchema()
> print(f"""batchId is {batchId}""")
> rows = df.count()
> print(f""" Total records processed in this run = {rows}""")
> ..
> else:
> print("DataFrame is empty")
> 
> HTH
> 
>view my Linkedin profile 
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 21:06, karan alang  > wrote:
> Hello All,
> 
> I'm using StructuredStreaming to read data from Kafka, and need to do 
> transformation on each individual row.
> 
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is used ?
> 
> Also, when I use foreachBatch, seems the BatchId is available in the function 
> called ? How do I access individual rows ?
> 
> Details are in stackoverflow :   
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>  
> 
> 
> What is the best approach for this use-case ?
> 
> tia!



Re: StructuredStreaming - foreach/foreachBatch

2022-02-17 Thread Gourav Sengupta
Hi,

The following excellent documentation may help as well:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
fundamental thinking behind these concepts.


Regards,
Gourav Sengupta



On Wed, Feb 9, 2022 at 8:51 PM karan alang  wrote:

> Thanks, Mich .. will check it out
>
> regds,
> Karan Alang
>
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
> wrote:
>
>> BTW you can check this Linkedin article of mine on Processing Change
>> Data Capture with Spark Structured Streaming
>> 
>>
>>
>> It covers the concept of triggers including trigger(once = True) or
>> one-time batch in Spark Structured Streaming
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>>
>>> Thanks, Mich .. that worked fine!
>>>
>>>
>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 read below

 """
"foreach" performs custom write logic on each row and
 "foreachBatch" performs custom write logic on each micro-batch through
 SendToBigQuery function
 *foreachBatch(SendToBigQuery) expects 2 parameters,
 first: micro-batch as DataFrame or Dataset and second: unique id for each
 batch --> batchId*
Using foreachBatch, we write each micro batch to storage
 defined in our custom logic. In this case, we store the output of our
 streaming application to Google BigQuery table.
Note that we are appending data and column "rowkey" is
 defined as UUID so it can be used as the primary key
 """
 result = streamingDataFrame.select( \
  col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
, col("parsed_value.timeissued").alias("timeissued")
 \
, col("parsed_value.price").alias("price")). \
  writeStream. \
  outputMode('append'). \
  option("truncate", "false"). \
  *foreachBatch(SendToBigQuery)*. \
  trigger(processingTime='2 seconds'). \
  start()

 now you define your function *SendToBigQuery() *


 *def SendToBigQuery(df, batchId):*

 if(len(df.take(1))) > 0:

 df.printSchema()

 print(f"""batchId is {batchId}""")

 rows = df.count()

 print(f""" Total records processed in this run = {rows}""")

 ..

 else:

 print("DataFrame is empty")

 *HTH*


view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:

> Hello All,
>
> I'm using StructuredStreaming to read data from Kafka, and need to do
> transformation on each individual row.
>
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is
> used ?
>
> Also, when I use foreachBatch, seems the BatchId is available in the
> function called ? How do I access individual rows ?
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>
> What is the best approach for this use-case ?
>
> tia!
>



Re: StructuredStreaming - foreach/foreachBatch

2022-02-09 Thread karan alang
Thanks, Mich .. will check it out

regds,
Karan Alang

On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh 
wrote:

> BTW you can check this Linkedin article of mine on Processing Change Data
> Capture with Spark Structured Streaming
> 
>
>
> It covers the concept of triggers including trigger(once = True) or
> one-time batch in Spark Structured Streaming
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:
>
>> Thanks, Mich .. that worked fine!
>>
>>
>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
>> wrote:
>>
>>> read below
>>>
>>> """
>>>"foreach" performs custom write logic on each row and
>>> "foreachBatch" performs custom write logic on each micro-batch through
>>> SendToBigQuery function
>>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>> batch --> batchId*
>>>Using foreachBatch, we write each micro batch to storage
>>> defined in our custom logic. In this case, we store the output of our
>>> streaming application to Google BigQuery table.
>>>Note that we are appending data and column "rowkey" is
>>> defined as UUID so it can be used as the primary key
>>> """
>>> result = streamingDataFrame.select( \
>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>, col("parsed_value.ticker").alias("ticker") \
>>>, col("parsed_value.timeissued").alias("timeissued") \
>>>, col("parsed_value.price").alias("price")). \
>>>  writeStream. \
>>>  outputMode('append'). \
>>>  option("truncate", "false"). \
>>>  *foreachBatch(SendToBigQuery)*. \
>>>  trigger(processingTime='2 seconds'). \
>>>  start()
>>>
>>> now you define your function *SendToBigQuery() *
>>>
>>>
>>> *def SendToBigQuery(df, batchId):*
>>>
>>> if(len(df.take(1))) > 0:
>>>
>>> df.printSchema()
>>>
>>> print(f"""batchId is {batchId}""")
>>>
>>> rows = df.count()
>>>
>>> print(f""" Total records processed in this run = {rows}""")
>>>
>>> ..
>>>
>>> else:
>>>
>>> print("DataFrame is empty")
>>>
>>> *HTH*
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>>>
 Hello All,

 I'm using StructuredStreaming to read data from Kafka, and need to do
 transformation on each individual row.

 I'm trying to use 'foreach' (or foreachBatch), and running into issues.
 Basic question - how is the row passed to the function when foreach is
 used ?

 Also, when I use foreachBatch, seems the BatchId is available in the
 function called ? How do I access individual rows ?

 Details are in stackoverflow :

 https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working

 What is the best approach for this use-case ?

 tia!

>>>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-08 Thread Mich Talebzadeh
BTW you can check this Linkedin article of mine on Processing Change Data
Capture with Spark Structured Streaming



It covers the concept of triggers including trigger(once = True) or
one-time batch in Spark Structured Streaming


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 23:06, karan alang  wrote:

> Thanks, Mich .. that worked fine!
>
>
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
> wrote:
>
>> read below
>>
>> """
>>"foreach" performs custom write logic on each row and
>> "foreachBatch" performs custom write logic on each micro-batch through
>> SendToBigQuery function
>> *foreachBatch(SendToBigQuery) expects 2 parameters,
>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>> batch --> batchId*
>>Using foreachBatch, we write each micro batch to storage
>> defined in our custom logic. In this case, we store the output of our
>> streaming application to Google BigQuery table.
>>Note that we are appending data and column "rowkey" is
>> defined as UUID so it can be used as the primary key
>> """
>> result = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.ticker").alias("ticker") \
>>, col("parsed_value.timeissued").alias("timeissued") \
>>, col("parsed_value.price").alias("price")). \
>>  writeStream. \
>>  outputMode('append'). \
>>  option("truncate", "false"). \
>>  *foreachBatch(SendToBigQuery)*. \
>>  trigger(processingTime='2 seconds'). \
>>  start()
>>
>> now you define your function *SendToBigQuery() *
>>
>>
>> *def SendToBigQuery(df, batchId):*
>>
>> if(len(df.take(1))) > 0:
>>
>> df.printSchema()
>>
>> print(f"""batchId is {batchId}""")
>>
>> rows = df.count()
>>
>> print(f""" Total records processed in this run = {rows}""")
>>
>> ..
>>
>> else:
>>
>> print("DataFrame is empty")
>>
>> *HTH*
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>>
>>> Hello All,
>>>
>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>> transformation on each individual row.
>>>
>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>> Basic question - how is the row passed to the function when foreach is
>>> used ?
>>>
>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>> function called ? How do I access individual rows ?
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>
>>> What is the best approach for this use-case ?
>>>
>>> tia!
>>>
>>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread karan alang
Thanks, Mich .. that worked fine!


On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh 
wrote:

> read below
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> *foreachBatch(SendToBigQuery) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch --> batchId*
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(SendToBigQuery)*. \
>  trigger(processingTime='2 seconds'). \
>  start()
>
> now you define your function *SendToBigQuery() *
>
>
> *def SendToBigQuery(df, batchId):*
>
> if(len(df.take(1))) > 0:
>
> df.printSchema()
>
> print(f"""batchId is {batchId}""")
>
> rows = df.count()
>
> print(f""" Total records processed in this run = {rows}""")
>
> ..
>
> else:
>
> print("DataFrame is empty")
>
> *HTH*
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:
>
>> Hello All,
>>
>> I'm using StructuredStreaming to read data from Kafka, and need to do
>> transformation on each individual row.
>>
>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>> Basic question - how is the row passed to the function when foreach is
>> used ?
>>
>> Also, when I use foreachBatch, seems the BatchId is available in the
>> function called ? How do I access individual rows ?
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>
>> What is the best approach for this use-case ?
>>
>> tia!
>>
>


Re: StructuredStreaming - foreach/foreachBatch

2022-02-07 Thread Mich Talebzadeh
read below

"""
   "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
*foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
--> batchId*
   Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
   Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
"""
result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 *foreachBatch(SendToBigQuery)*. \
 trigger(processingTime='2 seconds'). \
 start()

now you define your function *SendToBigQuery() *


*def SendToBigQuery(df, batchId):*

if(len(df.take(1))) > 0:

df.printSchema()

print(f"""batchId is {batchId}""")

rows = df.count()

print(f""" Total records processed in this run = {rows}""")

..

else:

print("DataFrame is empty")

*HTH*


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 21:06, karan alang  wrote:

> Hello All,
>
> I'm using StructuredStreaming to read data from Kafka, and need to do
> transformation on each individual row.
>
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is
> used ?
>
> Also, when I use foreachBatch, seems the BatchId is available in the
> function called ? How do I access individual rows ?
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>
> What is the best approach for this use-case ?
>
> tia!
>