[ 
https://issues.apache.org/jira/browse/BEAM-11050?focusedWorklogId=508156&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-508156
 ]

ASF GitHub Bot logged work on BEAM-11050:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/20 15:31
            Start Date: 05/Nov/20 15:31
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on pull request #13061:
URL: https://github.com/apache/beam/pull/13061#issuecomment-722451553


   @lukecwik I don't see why this change is necessary because of 2 reasons:
   1. all the validates runner tests including multiple window (eg. sliding 
windows) already passed.
   2. when I wrote this code, I already took some safety mesures about the 
modification of the (first only) accumulator during the 
`combineFn.mergeAccumulator` by creating a new first accumulator for each 
merged window see initial code below:
   
   `   // merge the accumulators for each mergedWindow
       ...
       for (Map.Entry<W, List<Tuple2<AccumT, Instant>>> entry :
           mergedWindowToAccumulators.entrySet()) {
          ...
         // we need to create the first accumulator because 
combineFn.mergerAccumulators can modify the
         // first accumulator
         AccumT first = combineFn.createAccumulator();
         Iterable<AccumT> accumulatorsToMerge =
             Iterables.concat(
                 Collections.singleton(first),
                 accumsAndInstantsForMergedWindow.stream()
                     .map(x -> x._1())
                     .collect(Collectors.toList()));
                  ...
                 combineFn.mergeAccumulators(accumulatorsToMerge),
                ...
     }
    `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 508156)
    Time Spent: 1h 40m  (was: 1.5h)

> AggregatorCombiner reuses mutable accumT across multiple merges leading to 
> incorrect results
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11050
>                 URL: https://issues.apache.org/jira/browse/BEAM-11050
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>             Fix For: 2.26.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Example failure:
>  
> [https://scans.gradle.com/s/lsf5y44b36pyc/tests/:runners:spark:validatesStructuredStreamingRunnerBatch/org.apache.beam.sdk.transforms.CombineTest$WindowingTests/testSlidingWindowsCombine#1]
> The test passes occassionaly and it depends on the order of merge/reduce 
> steps that Spark does. A good run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[a], 
> timestamp=1970-01-01T00:00:00.001Z, 
> windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, 
> timestamp=1970-01-01T00:00:00.002Z, 
> windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: 
> [TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, 
> timestamp=1970-01-01T00:00:00.001Z, 
> windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: 
> [TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, 
> timestamp=1970-01-01T00:00:00.003Z, 
> windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: 
> [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, 
> c], timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> {noformat}
> Bad run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, 
> timestamp=1970-01-01T00:00:00.003Z, 
> windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: 
> [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, 
> timestamp=1970-01-01T00:00:00.002Z, 
> windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: 
> [TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}] accum2: 
> [TimestampedValueInMultipleWindows{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: 
> [TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, 
> timestamp=1970-01-01T00:00:00.001Z, 
> windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], 
> pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: 
> [TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}] result: [TimestampedValueInSingleWindow{value=[a, 
> c], timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, 
> a, c], timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], 
> timestamp=1970-01-01T00:00:00.001Z, 
> window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], 
> timestamp=1970-01-01T00:00:00.005Z, 
> window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], 
> timestamp=1970-01-01T00:00:00.003Z, 
> window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], 
> timestamp=1970-01-01T00:00:00.004Z, 
> window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), 
> pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], 
> timestamp=1970-01-01T00:00:00.002Z, 
> window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), 
> pane=PaneInfo.NO_FIRING}]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to