+Mikhail Gryzykhin <mig...@google.com> +Rui Wang <ruw...@google.com> +Reza
Rokni <r...@google.com> who have all done some investigations here.


On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 11/22/19 7:54 PM, Reuven Lax wrote:
>
>
>
> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> I didn't investigate that particular one, but looking into that now, it
>> looks that is (same as the "classic" join library) builds around CoGBK. Is
>> that correct? If yes, then it essentially means that it:
>>
>  - works only for cases where both sides have the same windowfn (that is
>> limitation of Flatten that precedes CoGBK)
>>
> Correct. Did you want to join different windows? If so what are the
> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, what
> do you want the join semantics to be? The only thing I could imagine would
> be for the user to provide some function telling the join how to map the
> windows together, but that could be pretty complicated.
>
> I don't want to go too far into details, but generally both lhs and rhs
> can be put onto time line and then full join can be defined as each pair of
> (lhs, first preceding rhs) and (rhs, first preceding lhs). Then the end of
> window is semantically just clearing the joined value (setting it to null,
> thus at the end of window there will be pair (lhs, null) or (null, rhs) in
> case of full outer join). This way any combination of windows is possible,
> because all window does is that it "scopes" validity of respective values
> (lhs, rhs).
>

I think it is very valid to hope to do a join in the sense of a relational
join where it is row-to-row. In this case, Beam's concept of windowing may
or may not make sense. It is just a tool for the job. It is just a grouping
key that provides a time when state can be deleted. So I would say your use
case is more global window to global window join. That is what I think of
as a true stream-to-stream join anyhow. You probably don't want to wait
forever for output. So you'll need to use some knob other than Beam windows
or triggers.

> Reza has prototyped a join like you describe here:
https://github.com/apache/beam/pull/9032

If your join condition explicitly includes the event time distance between
elements, then it could "just work". If that isn't really part of your join
condition, then you will have to see this restriction as a "knob" that you
tweak on your results.

>  - when using global window, there has to be trigger and (afaik) there is
>> no trigger that would guarantee firing after each data element (for early
>> panes) (because triggers are there to express cost-latency tradeoff, not
>> semantics)
>>
>
> Can you explain the use case where this matters? If you do trigger
> elementCountAtLeast(1) on the join, then the consumer will simply see a
> continuous stream of outputs. I'm not sure I understand why the consumer
> cares that some of those outputs were in a pane that really held 3 outputs
> instead of 1.
>
> What I'm trying to solve is basically this:
>
>  - lhs is event stream
>
>  - rhs is stream of a "state updates"
>
> purpose of the join is "take each event, pair it with currently valid
> state and produce output and possibly modified state". I cannot process two
> events at a time, because first event can modify the state and the
> subsequent event should see this. It is not a "simple" stateful pardo
> either, because the state can be modified externally (not going into too
> much detail here, but e.g. by writing into kafka topic).
>
Reuven's explanation is missing some detail. If the CoGBK is in discarding
mode, then it will miss join results. If the CoGBK is in accumulating mode,
it will duplicate join results. This is a known problem and the general
solution is retractions.

Basically, CoGBK-based joins just don't work with triggers until we have
retractions.



> Moreover, I'd like to define the join semantics so that when there are
>> available elements from both sides, the fired pane should be ON_TIME, not
>> EARLY. That essentially means that the fully general case would not be
>> built around (Co)GBK, but stateful ParDo. There are specific options where
>> this fully general case "degrades" into forms that can be efficiently
>> expressed using (Co)GBK, that is true.
>>
>
> BTW building this around stateful DoFn might be a better fit. The main
> reason I didn't is because we would need a good distributed MapState
> (something discussed fairly recently on the list), and that is not yet
> built. Once we had that, I might be inclined to rewrite this join on
> stateful DoFn.
>
> Yes, the sorted state helps for streaming case. But I'd be careful about
> that for batch case, where this might lead to high pressure on the state
> (and InMemoryStateInternals might OOME for instance).
>
>
> However can you explain what you are expecting from the pane? An EARLY
> pane simply means that we are producing output before the end of the
> window. If you are in the global window triggering every element, then
> every output is EARLY. It might seem weird if you are interpreting EARLY as
> "outputting data that isn't ready," however that's not what EARLY is
> defined to be. Any change to the pane semantics would be a major breaking
> change to very fundamental semantics.
>
> I wonder if you are really objecting to the name EARLY and ON_TIME? Maybe
> we would've been better off tagging it BEFORE_WINDOW_END instead of EARLY,
> to make it clear what is meant?
>
> Essentially I don't object anything here. I'm missing solution to the
> "event vs. state" join described above. I was thinking about how to make
> these types of problems more user friendly and it essentially leads to
> creating a somewhat more generic semantics of join, where end-of-window is
> converted into "'value-delete events" and then just joining by the
> "previous" or "valid" value (yes, this relates to validity windows
> mentioned on Beam Summit Europe). It actually turns out that with some work
> we could define quite "naturally" a join on two streams with global window
> and no trigger. It would even function with lowest latency possible (but
> yes, with the highest expenses, it is actually the introduction of (same!)
> windows that enable certain optimizations). It the correctly defines
> semantics for different windows, although the result would be (probably
> unexpectedly) windowed using global window. But that doesn't seem to be any
> breaking change, because it is currently not possible (any such pipeline
> will not be validated).
>
> Maybe for reference, the unwindowed join would be what is described here
> [1]
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
>
>
>
>> Jan
>> On 11/22/19 6:47 PM, Reuven Lax wrote:
>>
>> Have you seen the Join library that is part of schemas? I'm curious
>> whether this fits your needs, or there's something lacking there.
>>
>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> based on roadmap [1], we would like to define and implement a full set
>>> of (unified) stream-stream joins. That would include:
>>>
>>>   - joins (left, right, full outer) on global window with "immediate
>>> trigger"
>>>
>>>   - joins with different windowing functions on left and right side
>>>
>>> The approach would be to define these operations in a natural way, so
>>> that the definition is aligned with how current joins work (same
>>> windows, cartesian product of values with same keys, output timestamp
>>> projected to the end of window, etc.). Because this should be a generic
>>> approach, this effort should probably be part of join library, that can
>>> the be reused by other components, too (e.g. SQL).
>>>
>>> The question is - is (or was) there any effort that we can build upon?
>>> Or should this be designed from scratch?
>>>
>>> Jan
>>>
>>> [1] https://beam.apache.org/roadmap/euphoria/
>>>
>>>

Reply via email to