Hi devs,

While helping user in user mailing list, I start to suspect that chained
streaming-streaming joins works incorrectly but Structured Streaming
doesn't prevent it. The reason is actually similar to why chained streaming
aggregations is not supported in Structured Streaming, global watermark.

Suppose we're running below query with append mode,

A.join(B, "A_ID = B_ID AND A_EVT_TIME = B_EVT_TIME")
 .join(C, "B_ID = C_ID AND A_EVT_TIME = C_EVT_TIME")

with below dataframes:

- Dataframe A

A_ID | A_EVT_TIME
-----|-----------
A1   | 1
A2   | 2
A3   | 3

(Let's name as A1, A2, A3)

- Dataframe B

B_ID | B_EVT_TIME
-----|-----------
B2   | 2
B3   | 3

(Let's name as B2, B3)

- Dataframe C

C_ID | C_EVT_TIME
-----|-----------
C4   | 4
C5   | 5

(Let's name as C4, C5)

Please note that I'm intentionally making two conditions (in query and
input data):

- take equivality of event times as join condition, to not expand time
bound of eviction (it might hide the issue)
- no match will be occurred (having some matched pairs is still OK to see
the behavior, but to simplify the issue...)

Let's say batch 0 read all the rows and processed them.

> output

- none

> watermark for batch 1 (next batch)

- watermark(A) = 3
- watermark(B) = 3
- watermark(C) = 5
- watermark(min) = 3
- watermark(max) = 5

> states for version 2 (output of batch 0)

- Left State for first join: A1, A2, A3
- Right State for first join: B2, B3
- Left State for second join: (none)
- Right State for second join: C4, C5

After batch 0, empty-batch will be called out to deal with eviction due to
watermark forward - which would be batch 1.

Here global watermark is 3 in batch 1, so first join evicts all rows from
state and outputs (A1, null), (A2, null), (A3, null) as outputs.
And second join will discard all these outputs from first join because
watermark is already passed by - correctness is broken.

Below is the commit which adds test code replicating above case:
https://github.com/HeartSaVioR/spark/commit/33c8b3fbcf23d9eabaa0d4f548787cacd60bd791

I'm yet to investigate on the case for inner join, but due to the
characteristic of the issue, the possibility might be open for all stateful
operators as well.

If we agree that my investigation is correct, I would propose to prevent
multiple stream-stream joins (as we prevent multiple streaming
aggregations) for interim mitigation. For long-term solution, we may want
to visit SPARK-26655 [1] which addresses operator-wise watermarks.

Thanks,
Jungtaek Lim (HeartSaVioR)

Reply via email to