Hi devs, While helping user in user mailing list, I start to suspect that chained streaming-streaming joins works incorrectly but Structured Streaming doesn't prevent it. The reason is actually similar to why chained streaming aggregations is not supported in Structured Streaming, global watermark.
Suppose we're running below query with append mode, A.join(B, "A_ID = B_ID AND A_EVT_TIME = B_EVT_TIME") .join(C, "B_ID = C_ID AND A_EVT_TIME = C_EVT_TIME") with below dataframes: - Dataframe A A_ID | A_EVT_TIME -----|----------- A1 | 1 A2 | 2 A3 | 3 (Let's name as A1, A2, A3) - Dataframe B B_ID | B_EVT_TIME -----|----------- B2 | 2 B3 | 3 (Let's name as B2, B3) - Dataframe C C_ID | C_EVT_TIME -----|----------- C4 | 4 C5 | 5 (Let's name as C4, C5) Please note that I'm intentionally making two conditions (in query and input data): - take equivality of event times as join condition, to not expand time bound of eviction (it might hide the issue) - no match will be occurred (having some matched pairs is still OK to see the behavior, but to simplify the issue...) Let's say batch 0 read all the rows and processed them. > output - none > watermark for batch 1 (next batch) - watermark(A) = 3 - watermark(B) = 3 - watermark(C) = 5 - watermark(min) = 3 - watermark(max) = 5 > states for version 2 (output of batch 0) - Left State for first join: A1, A2, A3 - Right State for first join: B2, B3 - Left State for second join: (none) - Right State for second join: C4, C5 After batch 0, empty-batch will be called out to deal with eviction due to watermark forward - which would be batch 1. Here global watermark is 3 in batch 1, so first join evicts all rows from state and outputs (A1, null), (A2, null), (A3, null) as outputs. And second join will discard all these outputs from first join because watermark is already passed by - correctness is broken. Below is the commit which adds test code replicating above case: https://github.com/HeartSaVioR/spark/commit/33c8b3fbcf23d9eabaa0d4f548787cacd60bd791 I'm yet to investigate on the case for inner join, but due to the characteristic of the issue, the possibility might be open for all stateful operators as well. If we agree that my investigation is correct, I would propose to prevent multiple stream-stream joins (as we prevent multiple streaming aggregations) for interim mitigation. For long-term solution, we may want to visit SPARK-26655 [1] which addresses operator-wise watermarks. Thanks, Jungtaek Lim (HeartSaVioR)