[
https://issues.apache.org/jira/browse/BEAM-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319536#comment-16319536
]
Kenneth Knowles edited comment on BEAM-3190 at 6/10/20, 9:04 PM:
-----------------------------------------------------------------
At the moment SQL joins for unbounded inputs happen per-pane. Which means that
for discarding mode we're joining deltas, and for accumulating mode we're
joining full panes each time trigger fires, resulting in duplicates. This means
it becomes hard to reason about correctness of joins results.
There are few ways to extend joins implementation to mitigate this:
# accumulation. We accumulate inputs in a persistent state and join them at
each trigger firing:
#* this enables us to join whole streams. At each trigger firing we iterate
over buffered inputs and emit newly matching records:
#** we need to control state/buffer expiration, so that we don't accumulate
PCollections forever:
#*** need to configure timeouts when to clear the state. Probably separately
per input PCollection;
#*** if we know there's a 1-1 or 1-many relation, then we can purge the state
for one input after the first match;
#*** in case of multiple joins it is unclear how to configure this per-join.
Currently multiple joins are converted into a tree of nested binary joins;
#** correct behavior for outer joins is unclear:
#*** we need to be able to control when we emit result if there is no matching
record in one of the inputs;
#*** and we need to be able to control what happens if eventually such matching
record does appear;
#* potentially Beam state cells can be used for this, but there are
complications:
#** they are partitioned per key per window;
#** meaning it is unclear how to match records across windows;
#** if we have a single global window, then this becomes feasible:
#*** example is [Nexmark
Query3|https://github.com/apache/beam/blob/64ff21f35ee2946d38645fb0a51678628e49e62a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java],
which does this using GlobalWindows with multiple firings. Join implemented as
a custom stateful ParDo;
#*** although can be implemented as a custom ParDo for a specific use case, it
is unclear how to correctly generalize this approach;
#* potentially other mechanism can be use for buffering, not Beam state cells.
Not investigated at the moment;
# retractions:
#* each stage of the pipeline will issue a retraction if its previous outputs
are not valid anymore and need to be revoked:
#** for example GBKs and CoGBKs will retract previous results if new input
arrives;
#** this will work automatically and always produce correct results under the
hood, if implemented correctly and correct accumulation mode is configured for
the pipeline;
#** needs more design work;
# support only specific join modes for which we know the behavior:
#* for example, if we can ensure that joins are executed only once per window:
#** we can guarantee that complete windows contents will be joined once;
#** there are known configurations with such properties which we can explicitly
allow and reject everything else:
#*** DefaultTrigger with allowedLateness=0 in any accumulation mode;
#*** AfterWatermark.pastEndOfWindow() with allowedLateness=0 in any
accumulation mode;
#** unclear how to enforce this for arbitrary windows and triggers;
Next step is to [implement the allowlist configurations approach|BEAM-3345]
until we have retractions.
was (Author: kedin):
At the moment SQL joins for unbounded inputs happen per-pane. Which means that
for discarding mode we're joining deltas, and for accumulating mode we're
joining full panes each time trigger fires, resulting in duplicates. This means
it becomes hard to reason about correctness of joins results.
There are few ways to extend joins implementation to mitigate this:
# accumulation. We accumulate inputs in a persistent state and join them at
each trigger firing:
#* this enables us to join whole streams. At each trigger firing we iterate
over buffered inputs and emit newly matching records:
#** we need to control state/buffer expiration, so that we don't accumulate
PCollections forever:
#*** need to configure timeouts when to clear the state. Probably separately
per input PCollection;
#*** if we know there's a 1-1 or 1-many relation, then we can purge the state
for one input after the first match;
#*** in case of multiple joins it is unclear how to configure this per-join.
Currently multiple joins are converted into a tree of nested binary joins;
#** correct behavior for outer joins is unclear:
#*** we need to be able to control when we emit result if there is no matching
record in one of the inputs;
#*** and we need to be able to control what happens if eventually such matching
record does appear;
#* potentially Beam state cells can be used for this, but there are
complications:
#** they are partitioned per key per window;
#** meaning it is unclear how to match records across windows;
#** if we have a single global window, then this becomes feasible:
#*** example is [Nexmark
Query3|https://github.com/apache/beam/blob/64ff21f35ee2946d38645fb0a51678628e49e62a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java],
which does this using GlobalWindows with multiple firings. Join implemented as
a custom stateful ParDo;
#*** although can be implemented as a custom ParDo for a specific use case, it
is unclear how to correctly generalize this approach;
#* potentially other mechanism can be use for buffering, not Beam state cells.
Not investigated at the moment;
# retractions:
#* each stage of the pipeline will issue a retraction if its previous outputs
are not valid anymore and need to be revoked:
#** for example GBKs and CoGBKs will retract previous results if new input
arrives;
#** this will work automatically and always produce correct results under the
hood, if implemented correctly and correct accumulation mode is configured for
the pipeline;
#** needs more design work;
# support only specific join modes for which we know the behavior:
#* for example, if we can ensure that joins are executed only once per window:
#** we can guarantee that complete windows contents will be joined once;
#** there are known configurations with such properties which we can explicitly
whitelist and reject everything else:
#*** DefaultTrigger with allowedLateness=0 in any accumulation mode;
#*** AfterWatermark.pastEndOfWindow() with allowedLateness=0 in any
accumulation mode;
#** unclear how to enforce this for arbitrary windows and triggers;
Next step is to [implement the whitelisted configurations approach|BEAM-3345]
until we have retractions.
> [SQL] Join Windowing Semantics
> ------------------------------
>
> Key: BEAM-3190
> URL: https://issues.apache.org/jira/browse/BEAM-3190
> Project: Beam
> Issue Type: Task
> Components: dsl-sql
> Reporter: Anton Kedin
> Assignee: Anton Kedin
> Priority: P2
> Fix For: Not applicable
>
>
> Should join implementation reject incorrect windowing strategies?
> Concerns: discarding mode + joins + multiple trigger firings might lead to
> incorrect results, like missing join/data.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)