I bet, as with the previous one, this is due to over-eager combiner lifting.

On Tue, Nov 12, 2019 at 4:17 PM Ruoyun Huang <[email protected]> wrote:
>
> Reported a tracking JIRA:  https://issues.apache.org/jira/browse/BEAM-8645
>
> On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <[email protected]> wrote:
>>
>> Thanks for confirming.
>>
>> Since it is unexpected behavior, I shall look into jira if it is already on 
>> radar, if not, will create one.
>>
>> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <[email protected]> wrote:
>>>
>>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>>
>>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <[email protected]> wrote:
>>> >
>>> > Hi, Folks,
>>> >
>>> >     I am trying to understand the behavior of TimestampCombiner. I have a 
>>> > test like this:
>>> >
>>> > class TimestampCombinerTest(unittest.TestCase):
>>> >
>>> >   def test_combiner_latest(self):
>>> >     """Test TimestampCombiner with LATEST."""
>>> >     options = PipelineOptions()
>>> >     options.view_as(StandardOptions).streaming = True
>>> >     p = TestPipeline(options=options)
>>> >
>>> >     main_stream = (p
>>> >                    | 'main TestStream' >> TestStream()
>>> >                    .add_elements([window.TimestampedValue(('k', 100), 0)])
>>> >                    .add_elements([window.TimestampedValue(('k', 400), 9)])
>>> >                    .advance_watermark_to_infinity()
>>> >                    | 'main windowInto' >> beam.WindowInto(
>>> >                       window.FixedWindows(10),
>>> >                       
>>> > timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>>> >                    | 'Combine' >> beam.CombinePerKey(sum))
>>> >
>>> >     class RecordFn(beam.DoFn):
>>> >       def process(self,
>>> >                   elm=beam.DoFn.ElementParam,
>>> >                   ts=beam.DoFn.TimestampParam):
>>> >         yield (elm, ts)
>>> >
>>> >     records = (main_stream | beam.ParDo(RecordFn()))
>>> >
>>> >     expected_window_to_elements = {
>>> >         window.IntervalWindow(0, 10): [
>>> >             (('k', 500),  Timestamp(9)),
>>> >         ],
>>> >     }
>>> >
>>> >     assert_that(
>>> >         records,
>>> >         equal_to_per_window(expected_window_to_elements),
>>> >         use_global_window=False,
>>> >         label='assert per window')
>>> >
>>> >     p.run()
>>> >
>>> >
>>> > I expect the result to be following (based on various TimestampCombiner 
>>> > strategy):
>>> > LATEST:    (('k', 500), Timestamp(9)),
>>> > EARLIEST:    (('k', 500), Timestamp(0)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>>> >
>>> > The above outcome is partially confirmed by Java side test : [1]
>>> >
>>> >
>>> > However, from beam python, the outcome is like this:
>>> > LATEST:    (('k', 500), Timestamp(10)),
>>> > EARLIEST:    (('k', 500), Timestamp(10)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>>> >
>>> > What did I miss? what should be the right expected behavior? or this 
>>> > looks like a bug?
>>> >
>>> > [1]: 
>>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>>> >
>>> > Cheers,
>>> >
>>
>>
>>
>> --
>> ================
>> Ruoyun  Huang
>>
>
>
> --
> ================
> Ruoyun  Huang
>

Reply via email to