Re: Bugs with joins and SQL in Structured Streaming

2024-03-11 Thread Andrzej Zera
Hi,

Do you think there is any chance for this issue to get resolved? Should I
create another bug report? As mentioned in my message, there is one open
already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
only one of the problems.

Andrzej

wt., 27 lut 2024 o 09:58 Andrzej Zera  napisał(a):

> Hi,
>
> Yes, I tested all of them on spark 3.5.
>
> Regards,
> Andrzej
>
>
> pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
> napisał(a):
>
>> Hi,
>>
>> These are all on spark 3.5, correct?
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:
>>
>>> Hey all,
>>>
>>> I've been using Structured Streaming in production for almost a year
>>> already and I want to share the bugs I found in this time. I created a test
>>> for each of the issues and put them all here:
>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>
>>> I split the issues into three groups: outer joins on event time,
>>> interval joins and Spark SQL.
>>>
>>> Issues related to outer joins:
>>>
>>>- When joining three or more input streams on event time, if two or
>>>more streams don't contain an event for a join key (which is event time),
>>>no row will be output even if other streams contain an event for this 
>>> join
>>>key. Tests that check for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>and
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>- When joining aggregated stream with raw events with a stream with
>>>already aggregated events (aggregation made outside of Spark), then no 
>>> row
>>>will be output if that second stream don't contain a corresponding event.
>>>Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>>- When joining two aggregated streams (aggregated in Spark), no
>>>result is produced. Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>>I've already reported this one here:
>>>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>>>handled yet.
>>>
>>> Issues related to interval joins:
>>>
>>>- When joining three streams (A, B, C) using interval join on event
>>>time, in the way that B.eventTime is conditioned on A.eventTime and
>>>C.eventTime is also conditioned on A.eventTime, and then doing window
>>>aggregation based on A's event time, the result is output only after
>>>watermark crosses the window end + interval(A, B) + interval (A, C).
>>>However, I'd expect results to be output faster, i.e. when the watermark
>>>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case 
>>> is
>>>that event B can happen 3 minutes after event A and event C can happen 5
>>>minutes after A, there is no point to suspend reporting output for 8
>>>minutes (3+5) after the end of the window if we know that no more event 
>>> can
>>>be matched after 5 min from the window end (assuming window end is based 
>>> on
>>>A's event time). Test that checks for this:
>>>
>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>>
>>> SQL issues:
>>>
>

Re: Bugs with joins and SQL in Structured Streaming

2024-02-27 Thread Andrzej Zera
Hi,

Yes, I tested all of them on spark 3.5.

Regards,
Andrzej


pon., 26 lut 2024 o 23:24 Mich Talebzadeh 
napisał(a):

> Hi,
>
> These are all on spark 3.5, correct?
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera  wrote:
>
>> Hey all,
>>
>> I've been using Structured Streaming in production for almost a year
>> already and I want to share the bugs I found in this time. I created a test
>> for each of the issues and put them all here:
>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>
>> I split the issues into three groups: outer joins on event time, interval
>> joins and Spark SQL.
>>
>> Issues related to outer joins:
>>
>>- When joining three or more input streams on event time, if two or
>>more streams don't contain an event for a join key (which is event time),
>>no row will be output even if other streams contain an event for this join
>>key. Tests that check for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>and
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>- When joining aggregated stream with raw events with a stream with
>>already aggregated events (aggregation made outside of Spark), then no row
>>will be output if that second stream don't contain a corresponding event.
>>Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>- When joining two aggregated streams (aggregated in Spark), no
>>result is produced. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>I've already reported this one here:
>>https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
>>handled yet.
>>
>> Issues related to interval joins:
>>
>>- When joining three streams (A, B, C) using interval join on event
>>time, in the way that B.eventTime is conditioned on A.eventTime and
>>C.eventTime is also conditioned on A.eventTime, and then doing window
>>aggregation based on A's event time, the result is output only after
>>watermark crosses the window end + interval(A, B) + interval (A, C).
>>However, I'd expect results to be output faster, i.e. when the watermark
>>crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
>>that event B can happen 3 minutes after event A and event C can happen 5
>>minutes after A, there is no point to suspend reporting output for 8
>>minutes (3+5) after the end of the window if we know that no more event 
>> can
>>be matched after 5 min from the window end (assuming window end is based 
>> on
>>A's event time). Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>
>> SQL issues:
>>
>>- WITH clause (in contrast to subquery) seems to create a static
>>DataFrame that can't be used in streaming joins. Test that checks for 
>> this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>- Two subqueries, each aggregating data using window() functio,
>>breaks the output schema. Test that checks for this:
>>
>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>
>> I'm a beginner with Scala (I'm using Structured Streaming with PySpark)
>> so won't be able to provide fixes. But I hope the test cases I provided can
>> be of some help.
>>
>> Regards,
>> Andrzej
>>
>


Bugs with joins and SQL in Structured Streaming

2024-02-26 Thread Andrzej Zera
Hey all,

I've been using Structured Streaming in production for almost a year
already and I want to share the bugs I found in this time. I created a test
for each of the issues and put them all here:
https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala

I split the issues into three groups: outer joins on event time, interval
joins and Spark SQL.

Issues related to outer joins:

   - When joining three or more input streams on event time, if two or more
   streams don't contain an event for a join key (which is event time), no row
   will be output even if other streams contain an event for this join key.
   Tests that check for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
   and
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
   - When joining aggregated stream with raw events with a stream with
   already aggregated events (aggregation made outside of Spark), then no row
   will be output if that second stream don't contain a corresponding event.
   Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
   - When joining two aggregated streams (aggregated in Spark), no result
   is produced. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
   I've already reported this one here:
   https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't been
   handled yet.

Issues related to interval joins:

   - When joining three streams (A, B, C) using interval join on event
   time, in the way that B.eventTime is conditioned on A.eventTime and
   C.eventTime is also conditioned on A.eventTime, and then doing window
   aggregation based on A's event time, the result is output only after
   watermark crosses the window end + interval(A, B) + interval (A, C).
   However, I'd expect results to be output faster, i.e. when the watermark
   crosses window end + MAX(interval(A, B) + interval (A, C)). If our case is
   that event B can happen 3 minutes after event A and event C can happen 5
   minutes after A, there is no point to suspend reporting output for 8
   minutes (3+5) after the end of the window if we know that no more event can
   be matched after 5 min from the window end (assuming window end is based on
   A's event time). Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32

SQL issues:

   - WITH clause (in contrast to subquery) seems to create a static
   DataFrame that can't be used in streaming joins. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
   - Two subqueries, each aggregating data using window() functio, breaks
   the output schema. Test that checks for this:
   
https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122

I'm a beginner with Scala (I'm using Structured Streaming with PySpark) so
won't be able to provide fixes. But I hope the test cases I provided can be
of some help.

Regards,
Andrzej


Re: [Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-24 Thread Andrzej Zera
Hi,

I'm sorry but I got confused about the inner workings of late events
watermark. You're completely right. Thanks for clarifying.

Regards,
Andrzej

czw., 11 sty 2024 o 13:02 Jungtaek Lim 
napisał(a):

> Hi,
>
> The time window is closed and evicted as long as "eviction watermark"
> passes the end of the window. Late events watermark only deals with
> discarding late events from "inputs". We did not introduce additional delay
> on the work of multiple stateful operators. We just allowed more late
> events to be accepted.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Jan 11, 2024 at 6:13 AM Andrzej Zera 
> wrote:
>
>> I'm struggling with the following issue in Spark >=3.4, related to
>> multiple stateful operations.
>>
>> When spark.sql.streaming.statefulOperator.allowMultiple is enabled,
>> Spark keeps track of two types of watermarks:
>> eventTimeWatermarkForEviction and eventTimeWatermarkForLateEvents.
>> Introducing them allowed chaining multiple stateful operations but also
>> introduced an additional delay for getting the output out of the streaming
>> query.
>>
>> I'll show this on the example. Assume we have a stream of click events
>> and we aggregate it first by 1-min window and then by 5-min window. If we
>> have a trigger interval of 30s, then in most cases we'll get output 30s
>> later compared to single stateful operations queries. To find out how,
>> let's look at the following examples:
>>
>> Example 1. Single stateful operation (aggregation by 5-min window, assume
>> watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp
>> at the time of getting data from Kafka
>> Global watermark Output
>> 14:10:00 14:09:56 0 -
>> 14:10:30 14:10:26 14:09:56 -
>> 14:11:00 14:10:56 14:10:26 window <14:05, 14:10)
>>
>> Example 2. Mutliple stateful operations (aggregation by 1-min window
>> followed by aggregation by 5-min window, assume watermark is 0 seconds)
>>
>> Wall clock
>> (microbatch processing starts) Max event timestamp at the time of
>> getting data from Kafka Late events watermark Eviction watermark Output
>> 14:10:00 14:09:56 0 0 -
>> 14:10:30 14:10:26 0 14:09:56 -
>> 14:11:00 14:10:56 14:09:56 14:10:26 -
>> 14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)
>>
>> In Example 2, we need to wait until both watermarks cross the end of the
>> window to get the output for that window, which happens one iteration later
>> compared to Example 1.
>>
>> Now, in use cases that require near-real-time processing, this one
>> iteration delay can be quite a significant difference.
>>
>> Do we have any option to make streaming queries with multiple stateful
>> operations output data without waiting this extra iteration? One of my
>> ideas was to force an empty microbatch to run and propagate late events
>> watermark without any new data. While this conceptually works, I didn't
>> find a way to trigger an empty microbatch while being connected to Kafka
>> that constantly receives new data and while having a constant 30s trigger
>> interval.
>>
>> Thanks,
>> Andrzej
>>
>


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Yes, I agree. But apart from maintaining this state internally (in memory
or in memory+disk as in case of RocksDB), every trigger it saves some
information about this state in a checkpoint location. I'm afraid we can't
do much about this checkpointing operation. I'll continue looking for
information on how I can decrease the number of LIST requests (ListBucket
operations) made in this process.

Thank you for your input so far!
Andrzej

śr., 10 sty 2024 o 16:33 Mich Talebzadeh 
napisał(a):

> Hi,
>
> You may have a point on scenario 2.
>
> Caching Streaming DataFrames: In Spark Streaming, each batch of data is
> processed incrementally, and it may not fit the typical caching we
> discussed. Instead, Spark Streaming has its mechanisms to manage and
> optimize the processing of streaming data. Case in point for caching
> partial results, one often relies on maintaining state by using stateful
> operations (see below) on Structured Streaming DataFrames. In such
> scenarios, Spark maintains state internally based on the operations
> performed. For example, if you are doing a groupBy followed by an
> aggregation, Spark Streaming will manage the state of the keys and update
> them incrementally.
>
> Just to clarify, in the context of Spark Structured Streaming stateful
> operation refers to an operation that maintains and updates some form of
> state across batches of streaming data. Unlike stateless operations, which
> process each batch independently, stateful operations retain information
> from previous batches and use it to produce results for the current batch.
>
> So, bottom line, while one may not explicitly cache a streaming data
> frame, Spark internally optimizes the processing by maintaining the
> necessary state.
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 14:20, Andrzej Zera  wrote:
>
>> Hey,
>>
>> Yes, that's how I understood it (scenario 1). However, I'm not sure if
>> scenario 2 is possible. I think cache on streaming DataFrame is supported
>> only in forEachBatch (in which it's actually no longer a streaming DF).
>>
>> śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
>> napisał(a):
>>
>>> Hi,
>>>
>>>  With regard to your point
>>>
>>> - Caching: Can you please explain what you mean by caching? I know that
>>> when you have batch and streaming sources in a streaming query, then you
>>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>>> you mean, and I don't know how to apply what you suggest to streaming data.
>>>
>>> Let us visit this
>>>
>>> Caching purpose in Structured Streaming is to store frequently accessed
>>> data in memory or disk for faster retrieval, reducing repeated reads from
>>> sources.
>>>
>>> - Types:
>>>
>>>- Memory Caching: Stores data in memory for extremely fast access.
>>>- Disk Caching: Stores data on disk for larger datasets or
>>>persistence across triggers
>>>
>>>
>>> - Scenarios:
>>>
>>> Joining Streaming Data with Static Data: Cache static datasets
>>> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>>>
>>>-
>>>- Reusing Intermediate Results: Cache intermediate dataframes that
>>>are expensive to compute and used multiple times within the query.
>>>- Window Operations: Cache data within a window to avoid re-reading
>>>for subsequent aggregations or calculations within that window.
>>>
>>> - Benefits:
>>>
>>>- Performance: Faster query execution by reducing I/O operations and
>>>computation overhead.
>>>- Cost Optimization: Reduced reads from external sources can lower
>>>costs, especially for cloud-based sources.
>>>- Scalability: Helps handle higher data volumes and throughput by
>>>minimizing expensive re-computations.
>>>
>>>
>>> Example codec
>>>
>>> scenario 1
>>&

[Structured Streaming] Avoid one microbatch delay with multiple stateful operations

2024-01-10 Thread Andrzej Zera
I'm struggling with the following issue in Spark >=3.4, related to multiple
stateful operations.

When spark.sql.streaming.statefulOperator.allowMultiple is enabled, Spark
keeps track of two types of watermarks: eventTimeWatermarkForEviction and
eventTimeWatermarkForLateEvents. Introducing them allowed chaining multiple
stateful operations but also introduced an additional delay for getting the
output out of the streaming query.

I'll show this on the example. Assume we have a stream of click events and
we aggregate it first by 1-min window and then by 5-min window. If we have
a trigger interval of 30s, then in most cases we'll get output 30s later
compared to single stateful operations queries. To find out how, let's look
at the following examples:

Example 1. Single stateful operation (aggregation by 5-min window, assume
watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp
at the time of getting data from Kafka
Global watermark Output
14:10:00 14:09:56 0 -
14:10:30 14:10:26 14:09:56 -
14:11:00 14:10:56 14:10:26 window <14:05, 14:10)

Example 2. Mutliple stateful operations (aggregation by 1-min window
followed by aggregation by 5-min window, assume watermark is 0 seconds)

Wall clock
(microbatch processing starts) Max event timestamp at the time of getting
data from Kafka Late events watermark Eviction watermark Output
14:10:00 14:09:56 0 0 -
14:10:30 14:10:26 0 14:09:56 -
14:11:00 14:10:56 14:09:56 14:10:26 -
14:11:30 14:11:26 14:10:26 14:10:56 window <14:05, 14:10)

In Example 2, we need to wait until both watermarks cross the end of the
window to get the output for that window, which happens one iteration later
compared to Example 1.

Now, in use cases that require near-real-time processing, this one
iteration delay can be quite a significant difference.

Do we have any option to make streaming queries with multiple stateful
operations output data without waiting this extra iteration? One of my
ideas was to force an empty microbatch to run and propagate late events
watermark without any new data. While this conceptually works, I didn't
find a way to trigger an empty microbatch while being connected to Kafka
that constantly receives new data and while having a constant 30s trigger
interval.

Thanks,
Andrzej


Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Hey,

Yes, that's how I understood it (scenario 1). However, I'm not sure if
scenario 2 is possible. I think cache on streaming DataFrame is supported
only in forEachBatch (in which it's actually no longer a streaming DF).

śr., 10 sty 2024 o 15:01 Mich Talebzadeh 
napisał(a):

> Hi,
>
>  With regard to your point
>
> - Caching: Can you please explain what you mean by caching? I know that
> when you have batch and streaming sources in a streaming query, then you
> can try to cache batch ones to save on reads. But I'm not sure if it's what
> you mean, and I don't know how to apply what you suggest to streaming data.
>
> Let us visit this
>
> Caching purpose in Structured Streaming is to store frequently accessed
> data in memory or disk for faster retrieval, reducing repeated reads from
> sources.
>
> - Types:
>
>- Memory Caching: Stores data in memory for extremely fast access.
>- Disk Caching: Stores data on disk for larger datasets or persistence
>across triggers
>
>
> - Scenarios:
>
> Joining Streaming Data with Static Data: Cache static datasets
> (e.g., reference tables) to avoid repeated reads for each micro-batch.
>
>-
>- Reusing Intermediate Results: Cache intermediate dataframes that are
>expensive to compute and used multiple times within the query.
>- Window Operations: Cache data within a window to avoid re-reading
>for subsequent aggregations or calculations within that window.
>
> - Benefits:
>
>- Performance: Faster query execution by reducing I/O operations and
>computation overhead.
>- Cost Optimization: Reduced reads from external sources can lower
>costs, especially for cloud-based sources.
>- Scalability: Helps handle higher data volumes and throughput by
>minimizing expensive re-computations.
>
>
> Example codec
>
> scenario 1
>
> static_data = spark.read.load("path/to/static/data") static_data.cache()
> streaming_data = spark.readStream.format("...").load() joined_data =
> streaming_data.join(static_data, ...) # Static data is cached for
> efficient joins
>
> scenario 2
>
> intermediate_df = streaming_data.groupBy(...).count()
> intermediate_df.cache()
> # Use cached intermediate_df for further transformations or actions
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 10 Jan 2024 at 13:10, Andrzej Zera  wrote:
>
>> Thank you very much for your suggestions. Yes, my main concern is
>> checkpointing costs.
>>
>> I went through your suggestions and here're my comments:
>>
>> - Caching: Can you please explain what you mean by caching? I know that
>> when you have batch and streaming sources in a streaming query, then you
>> can try to cache batch ones to save on reads. But I'm not sure if it's what
>> you mean, and I don't know how to apply what you suggest to streaming data.
>>
>> - Optimize Checkpointing Frequency: I'm already using changelog
>> checkpointing with RocksDB and increased trigger interval to a maximum
>> acceptable value.
>>
>> - Minimize LIST Request: That's where I can get most savings. My LIST
>> requests account for ~70% of checkpointing costs. From what I see, LIST
>> requests are ~2.5x the number of PUT requests. Unfortunately, when I
>> changed to checkpoting location DBFS, it didn't help with minimizing LIST
>> requests. They are roughly at the same level. From what I see, S3 Optimized
>> Committer is EMR-specific so I can't use it in Databricks. The fact that I
>> don't see a difference between S3 and DBFS checkpoint location suggests
>> that both must implement the same or similar committer.
>>
>> - Optimizing RocksDB: I still need to do this but I don't suspect it will
>> help much. From what I understand, these settings shouldn't have a
>> significant impact on the number of requests to S3.
>>
>> Any other ideas how to limit the number of LIST requests are appreciated
>>
>> niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
>> napisał(a):
>>
>>> OK I assume that your main c

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-10 Thread Andrzej Zera
Thank you very much for your suggestions. Yes, my main concern is
checkpointing costs.

I went through your suggestions and here're my comments:

- Caching: Can you please explain what you mean by caching? I know that
when you have batch and streaming sources in a streaming query, then you
can try to cache batch ones to save on reads. But I'm not sure if it's what
you mean, and I don't know how to apply what you suggest to streaming data.

- Optimize Checkpointing Frequency: I'm already using changelog
checkpointing with RocksDB and increased trigger interval to a maximum
acceptable value.

- Minimize LIST Request: That's where I can get most savings. My LIST
requests account for ~70% of checkpointing costs. From what I see, LIST
requests are ~2.5x the number of PUT requests. Unfortunately, when I
changed to checkpoting location DBFS, it didn't help with minimizing LIST
requests. They are roughly at the same level. From what I see, S3 Optimized
Committer is EMR-specific so I can't use it in Databricks. The fact that I
don't see a difference between S3 and DBFS checkpoint location suggests
that both must implement the same or similar committer.

- Optimizing RocksDB: I still need to do this but I don't suspect it will
help much. From what I understand, these settings shouldn't have a
significant impact on the number of requests to S3.

Any other ideas how to limit the number of LIST requests are appreciated

niedz., 7 sty 2024 o 15:38 Mich Talebzadeh 
napisał(a):

> OK I assume that your main concern is checkpointing costs.
>
> - Caching: If your queries read the same data multiple times, caching the
> data might reduce the amount of data that needs to be checkpointed.
>
> - Optimize Checkpointing Frequency i.e
>
>- Consider Changelog Checkpointing with RocksDB.  This can
>potentially reduce checkpoint size and duration by only storing state
>changes, rather than the entire state.
>- Adjust Trigger Interval (if possible): While not ideal for your
>near-real time requirement, even a slight increase in the trigger interval
>(e.g., to 7-8 seconds) can reduce checkpoint frequency and costs.
>
> - Minimize LIST Requests:
>
>- Enable S3 Optimized Committer: or as you stated consider DBFS
>
> You can also optimise RocksDB. Set your state backend to RocksDB, if not
> already. Here are what I use
>
>   # Add RocksDB configurations here
> spark.conf.set("spark.sql.streaming.stateStore.providerClass",
> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelog",
> "true")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
> "64")  # Example configuration
>
>  spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.style",
> "level")
>
> spark.conf.set("spark.sql.streaming.stateStore.rocksdb.compaction.level.targetFileSizeBase",
> "67108864")
>
> These configurations provide a starting point for tuning RocksDB.
> Depending on your specific use case and requirements, of course, your
> mileage varies.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 Jan 2024 at 08:07, Andrzej Zera  wrote:
>
>> Usually one or two topics per query. Each query has its own checkpoint
>> directory. Each topic has a few partitions.
>>
>> Performance-wise I don't experience any bottlenecks in terms of
>> checkpointing. It's all about the number of requests (including a high
>> number of LIST requests) and the associated cost.
>>
>> sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
>> napisał(a):
>>
>>> How many topics and checkpoint directories are you dealing with?
>>>
>>> Does each topic has its own checkpoint  on S3?
>>>
>>> All these checkpoints are sequential writes so even SSD would not really
>>> help
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>

Re: [Structured Streaming] Keeping checkpointing cost under control

2024-01-07 Thread Andrzej Zera
Usually one or two topics per query. Each query has its own checkpoint
directory. Each topic has a few partitions.

Performance-wise I don't experience any bottlenecks in terms of
checkpointing. It's all about the number of requests (including a high
number of LIST requests) and the associated cost.

sob., 6 sty 2024 o 13:30 Mich Talebzadeh 
napisał(a):

> How many topics and checkpoint directories are you dealing with?
>
> Does each topic has its own checkpoint  on S3?
>
> All these checkpoints are sequential writes so even SSD would not really
> help
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 6 Jan 2024 at 08:19, Andrzej Zera  wrote:
>
>> Hey,
>>
>> I'm running a few Structured Streaming jobs (with Spark 3.5.0) that
>> require near-real time accuracy with trigger intervals in the level of 5-10
>> seconds. I usually run 3-6 streaming queries as part of the job and each
>> query includes at least one stateful operation (and usually two or more).
>> My checkpoint location is S3 bucket and I use RocksDB as a state store.
>> Unfortunately, checkpointing costs are quite high. It's the main cost item
>> of the system and it's roughly 4-5 times the cost of compute.
>>
>> To save on compute costs, the following things are usually recommended:
>>
>>- increase trigger interval (as mentioned, I don't have much space
>>here)
>>- decrease the number of shuffle partitions (I have 2x the number of
>>workers)
>>
>> I'm looking for some other recommendations that I can use to save on
>> checkpointing costs. I saw that most requests are LIST requests. Can we cut
>> them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
>> will it help in any way?
>>
>> Thank you!
>> Andrzej
>>
>>


[Structured Streaming] Keeping checkpointing cost under control

2024-01-05 Thread Andrzej Zera
Hey,

I'm running a few Structured Streaming jobs (with Spark 3.5.0) that require
near-real time accuracy with trigger intervals in the level of 5-10
seconds. I usually run 3-6 streaming queries as part of the job and each
query includes at least one stateful operation (and usually two or more).
My checkpoint location is S3 bucket and I use RocksDB as a state store.
Unfortunately, checkpointing costs are quite high. It's the main cost item
of the system and it's roughly 4-5 times the cost of compute.

To save on compute costs, the following things are usually recommended:

   - increase trigger interval (as mentioned, I don't have much space here)
   - decrease the number of shuffle partitions (I have 2x the number of
   workers)

I'm looking for some other recommendations that I can use to save on
checkpointing costs. I saw that most requests are LIST requests. Can we cut
them down somehow? I'm using Databricks. If I replace S3 bucket with DBFS,
will it help in any way?

Thank you!
Andrzej


Re: [Structured Streaming] Joins after aggregation don't work in streaming

2023-10-27 Thread Andrzej Zera
Hi, thank you very much for an update!

Thanks,
Andrzej

On 2023/10/27 01:50:35 Jungtaek Lim wrote:

> Hi, we are aware of your ticket and plan to look into it. We can't say
> about ETA but just wanted to let you know that we are going to look into
> it. Thanks for reporting!
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Oct 27, 2023 at 5:22 AM Andrzej Zera 
> wrote:
>
>> Hey All,
>>
>> I'm trying to reproduce the following streaming operation: "Time window
>> aggregation in separate streams followed by stream-stream join". According
>> to documentation, this should be possible in Spark 3.5.0 but I had no
>> success despite different tries.
>>
>> Here is a documentation snippet I'm trying to reproduce:
>> https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995
>>
>> I created an issue with more details but no one responded yet:
>> https://issues.apache.org/jira/browse/SPARK-45637
>>
>> Thank you!
>> Andrzej
>>
>


[Structured Streaming] Joins after aggregation don't work in streaming

2023-10-26 Thread Andrzej Zera
Hey All,

I'm trying to reproduce the following streaming operation: "Time window
aggregation in separate streams followed by stream-stream join". According
to documentation, this should be possible in Spark 3.5.0 but I had no
success despite different tries.

Here is a documentation snippet I'm trying to reproduce:
https://github.com/apache/spark/blob/261b281e6e57be32eb28bf4e50bea24ed22a9f21/docs/structured-streaming-programming-guide.md?plain=1#L1939-L1995

I created an issue with more details but no one responded yet:
https://issues.apache.org/jira/browse/SPARK-45637

Thank you!
Andrzej