[ 
https://issues.apache.org/jira/browse/BEAM-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319536#comment-16319536
 ] 

Kenneth Knowles edited comment on BEAM-3190 at 6/10/20, 9:04 PM:
-----------------------------------------------------------------

At the moment SQL joins for unbounded inputs happen per-pane. Which means that 
for discarding mode we're joining deltas, and for accumulating mode we're 
joining full panes each time trigger fires, resulting in duplicates. This means 
it becomes hard to reason about correctness of joins results.

There are few ways to extend joins implementation to mitigate this:
# accumulation. We accumulate inputs in a persistent state and join them at 
each trigger firing:
#* this enables us to join whole streams. At each trigger firing we iterate 
over buffered inputs and emit newly matching records:
#** we need to control state/buffer expiration, so that we don't accumulate 
PCollections forever:
#*** need to configure timeouts when to clear the state. Probably separately 
per input PCollection;
#*** if we know there's a 1-1 or 1-many relation, then we can purge the state 
for one input after the first match;
#*** in case of multiple joins it is unclear how to configure this per-join. 
Currently multiple joins are converted into a tree of nested binary joins;
#** correct behavior for outer joins is unclear:
#*** we need to be able to control when we emit result if there is no matching 
record in one of the inputs;
#*** and we need to be able to control what happens if eventually such matching 
record does appear;
#* potentially Beam state cells can be used for this, but there are 
complications:
#** they are partitioned per key per window;
#** meaning it is unclear how to match records across windows;
#** if we have a single global window, then this becomes feasible:
#*** example is [Nexmark 
Query3|https://github.com/apache/beam/blob/64ff21f35ee2946d38645fb0a51678628e49e62a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java],
 which does this using GlobalWindows with multiple firings. Join implemented as 
a custom stateful ParDo;
#*** although can be implemented as a custom ParDo for a specific use case, it 
is unclear how to correctly generalize this approach;
#* potentially other mechanism can be use for buffering, not Beam state cells. 
Not investigated at the moment;
# retractions:
#* each stage of the pipeline will issue a retraction if its previous outputs 
are not valid anymore and need to be revoked:
#** for example GBKs and CoGBKs will retract previous results if new input 
arrives;
#** this will work automatically and always produce correct results under the 
hood, if implemented correctly and correct accumulation mode is configured for 
the pipeline;
#** needs more design work;
# support only specific join modes for which we know the behavior:
#* for example, if we can ensure that joins are executed only once per window:
#** we can guarantee that complete windows contents will be joined once;
#** there are known configurations with such properties which we can explicitly 
allow and reject everything else:
#*** DefaultTrigger with allowedLateness=0 in any accumulation mode;
#*** AfterWatermark.pastEndOfWindow() with allowedLateness=0 in any 
accumulation mode;
#** unclear how to enforce this for arbitrary windows and triggers;

Next step is to [implement the allowlist configurations approach|BEAM-3345] 
until we have retractions.


was (Author: kedin):
At the moment SQL joins for unbounded inputs happen per-pane. Which means that 
for discarding mode we're joining deltas, and for accumulating mode we're 
joining full panes each time trigger fires, resulting in duplicates. This means 
it becomes hard to reason about correctness of joins results.

There are few ways to extend joins implementation to mitigate this:
# accumulation. We accumulate inputs in a persistent state and join them at 
each trigger firing:
#* this enables us to join whole streams. At each trigger firing we iterate 
over buffered inputs and emit newly matching records:
#** we need to control state/buffer expiration, so that we don't accumulate 
PCollections forever:
#*** need to configure timeouts when to clear the state. Probably separately 
per input PCollection;
#*** if we know there's a 1-1 or 1-many relation, then we can purge the state 
for one input after the first match;
#*** in case of multiple joins it is unclear how to configure this per-join. 
Currently multiple joins are converted into a tree of nested binary joins;
#** correct behavior for outer joins is unclear:
#*** we need to be able to control when we emit result if there is no matching 
record in one of the inputs;
#*** and we need to be able to control what happens if eventually such matching 
record does appear;
#* potentially Beam state cells can be used for this, but there are 
complications:
#** they are partitioned per key per window;
#** meaning it is unclear how to match records across windows;
#** if we have a single global window, then this becomes feasible:
#*** example is [Nexmark 
Query3|https://github.com/apache/beam/blob/64ff21f35ee2946d38645fb0a51678628e49e62a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java],
 which does this using GlobalWindows with multiple firings. Join implemented as 
a custom stateful ParDo;
#*** although can be implemented as a custom ParDo for a specific use case, it 
is unclear how to correctly generalize this approach;
#* potentially other mechanism can be use for buffering, not Beam state cells. 
Not investigated at the moment;
# retractions:
#* each stage of the pipeline will issue a retraction if its previous outputs 
are not valid anymore and need to be revoked:
#** for example GBKs and CoGBKs will retract previous results if new input 
arrives;
#** this will work automatically and always produce correct results under the 
hood, if implemented correctly and correct accumulation mode is configured for 
the pipeline;
#** needs more design work;
# support only specific join modes for which we know the behavior:
#* for example, if we can ensure that joins are executed only once per window:
#** we can guarantee that complete windows contents will be joined once;
#** there are known configurations with such properties which we can explicitly 
whitelist and reject everything else:
#*** DefaultTrigger with allowedLateness=0 in any accumulation mode;
#*** AfterWatermark.pastEndOfWindow() with allowedLateness=0 in any 
accumulation mode;
#** unclear how to enforce this for arbitrary windows and triggers;

Next step is to [implement the whitelisted configurations approach|BEAM-3345] 
until we have retractions.

> [SQL] Join Windowing Semantics
> ------------------------------
>
>                 Key: BEAM-3190
>                 URL: https://issues.apache.org/jira/browse/BEAM-3190
>             Project: Beam
>          Issue Type: Task
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: P2
>             Fix For: Not applicable
>
>
> Should join implementation reject incorrect windowing strategies?
> Concerns: discarding mode + joins + multiple trigger firings might lead to 
> incorrect results, like missing join/data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to