[ https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853298#comment-15853298 ]
Daniel Halperin edited comment on BEAM-1396 at 2/5/17 5:54 PM: --------------------------------------------------------------- To add a little more flavor: I'm not sure what right thing to do with unused or little-used code in {{runners-core}}. These are meant to be illustrative and/or useful to show runners how key functionality can be implemented. So, among many courses of action: 1. More clearly document expectations in {{GroupAlsoByWindowsViaOutputBufferDoFn}} 2. Possibly add another version ({{GroupAlsoByWindowsUsingSortedInputDoFn}} or a flag that indicates whether input is already sorted. (ViaSortedOutputBuffer)? 3. ..... was (Author: dhalp...@google.com): To add a little more flavor: I'm not sure what right thing to do with unused or little-used code in {{runners-core}}. These are meant to be illustrative and/or useful to show runners how key functionality can be implemented. So, among many courses of action: 1. More clearly document expectations in {{GroupAlsoByWindowsViaOutputBufferDoFn}} 1. Possibly add another version ({{GroupAlsoByWindowsUsingSortedInputDoFn}} or a flag that indicates whether input is already sorted. (ViaSortedOutputBuffer)? 1 ..... > GABWVOBDoFn expects grouped values to be ordered by their timestamp but there > is no such guarantee > -------------------------------------------------------------------------------------------------- > > Key: BEAM-1396 > URL: https://issues.apache.org/jira/browse/BEAM-1396 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Reporter: Amit Sela > Assignee: Kenneth Knowles > > GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but > nothing in the SDK guarantees this: > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86 > If such a chunk of timestamped values will be processed out-of-order I assume > we'd end up with an {{IllegalStateException}} thrown here: > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191 > I suggest we go ahead and add sorting before processing the bundle in chunks > - this might prove expensive in extreme cases where a very large bundle with > very few keys is processed, but it seems that timestamp order is necessary. > As for runners who provide order guarantee, since GABW is optional I don't > see an issue here, though [~dhalp...@google.com] suggested we add a > "shouldSort" flag. > Also, probably worth creating a test for this, though it would prove > difficult since we would have to preset the order which is the problem to > begin with :-) -- This message was sent by Atlassian JIRA (v6.3.15#6346)