Nice to hear you're investigating the issue deeply.

Btw, if attaching code is not easy, maybe you could share logical/physical
plan on any batch: "detail" in SQL tab would show up the plan as string.
Plans from sequential batches would be much helpful - and streaming query
status in these batch (especially watermark) should be helpful too.


On Wed, Jun 5, 2019 at 11:57 PM Joe Ammann <j...@pyx.ch> wrote:

> Hi Jungtaek
>
> Thanks for your response!
>
> I actually have set watermarks on all the streams A/B/C with the
> respective event time
> column A/B/C_LAST_MOD. So I think this should not be the reason.
>
> Of course, the event time on the C stream (the "optional one") progresses
> much slower
> than on the other 2. I try to adjust for this by setting
>
>    spark.sql.streaming.multipleWatermarkPolicy=max
>
> and judging from the microbatch results, this also works. The global
> watermark seems
> to progress as expected with the event time from A/B stream.
>
> I will try to put together an isolated test case to reproduce the issue,
> that whole code
> is embedded in a larger app and hence not easily to rip out.
>
> I did some more testing, and for now these are my observations
>  - inner join followed by aggregation works as expected
>  - inner join with 1 left outer (and no aggregation) works as expected
>  - inner join with 2 left outer only produces results where both outer
> have a match
>  - inner join with 1 left outer followed by aggregation only produces the
> messages with a match
>
> Of course, all are stream-stream joins
>
> CU, Joe
>
> On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim <kabh...@gmail.com>
> wrote:
> > I would suspect that rows are never evicted in state in second join. To
> > determine whether the row is NOT matched to other side, Spark should
> check
> > whether the row is ever matched before evicted. You need to set watermark
> > either B_LAST_MOD or C_LAST_MOD.
> >
> > If you already did but not exposed to here, please paste all codes
> > (assuming you've already redacted) to gist or attach zipped file for
> > project.
> >
> > Btw, there's known "correctness" issue on streaming-streaming left/right
> > outer join. Please refer SPARK-26154 [1] for details. That's not a same
> > case, but should be good to know once you're dealing with
> > streaming-streaming join.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1. https://issues.apache.org/jira/browse/SPARK-26154
> >
> > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann <j...@pyx.ch> wrote:
> >
> > > Hi all
> > >
> > > sorry, tl;dr
> > >
> > > I'm on my first Python Spark structured streaming app, in the end
> joining
> > > messages from ~10 different Kafka topics. I've recently upgraded to
> Spark
> > > 2.4.3, which has resolved all my issues with the time handling
> (watermarks,
> > > join windows) I had before with Spark 2.3.2.
> > >
> > > My current problem happens during a leftOuter join, where messages
> from 3
> > > topics are joined, the results are then aggregated with a groupBy and
> > > finally put onto a result Kafka topic. On the 3 input topics involved,
> all
> > > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > > LAST_MOD as event timestamp on all incoming streams. Since the fields
> on
> > > the incoming messages are all named the same (ID and LAST_MOD), I
> rename
> > > them on all incoming streams with
> > >
> > >      aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> > >
> > > For those data frames, I then take the watermark with the
> A/B/C_LAST_MOD
> > > as event time, before joining. I know that the LAST_MOD timestamps are
> > > equal on the messages that I want to join together.
> > >
> > > The first join is an inner join, where a field on stream A links with
> the
> > > ID of stream B. So I have
> > >
> > >      aDf
> > >         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in
> stream A
> > >         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > >         .agg(
> > >             collect_list(struct("*")).alias("RESULTS"),
> > >             count("A_ID").alias("NUM_RESULTS"),
> > >             # just add a timestamp to watermark on, they are all the
> > >             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > >             )
> > >         .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > >         )
> > >
> > > This works perfectly and generates (on my current data set) some 10'000
> > > records. This is the expected result.
> > >
> > > When I add the leftOuter join of the third topic as follows
> > >
> > >      aDf
> > >         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in
> stream A
> > > # here the additional left join
> > >         -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > > "leftOuter)) # C_FK is the field in stream B
> > >         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > >         .agg(
> > >             collect_list(struct("*")).alias("RESULTS"),
> > >             count("A_ID").alias("NUM_RESULTS"),
> > >             # just add a timestamp to watermark on, they are all the
> > >             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > >             )
> > >         .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > >         )
> > >
> > > then what I would expect is that I get the same number of output
> records
> > > (~10'000), and some of them have the additional fields from the C
> stream.
> > >
> > > But what happens is that my output is reduced to ~1'500 records,
> exactly
> > > those which have a successful join on records on topic C. The other
> are not
> > > shown on the output.
> > >
> > > I already tried
> > >
> > >    * make sure that the optional FK on topic B is never null, by using
> an
> > > NVL2(C_FK, C_FK, 'FFFF')
> > >    * widen the time window join on the leftOuter to "B_LAST_MOD <
> > > C_LAST_LAST_MOD - interval 5 seconds ..."
> > >    * use various combinations of joinWindows and watermarkLateThreshold
> > >
> > > The result is always the same: I'm "losing" the ~8'500 records for
> which
> > > the optional join FK is NULL on topic B.
> > >
> > > Did I totally misunderstand the concept of stream-stream left outer
> join?
> > > Or what could be wrong
> > >
> > > --
> > > CU, Joe
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
>
>
>
>
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to