[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853491#comment-15853491
 ] 

Kenneth Knowles edited comment on BEAM-1396 at 2/6/17 4:25 AM:
---------------------------------------------------------------

Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-))

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.



was (Author: kenn):
Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-)

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.


> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-1396
>                 URL: https://issues.apache.org/jira/browse/BEAM-1396
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Amit Sela
>            Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to