Hi, Folks,

    I am trying to understand the behavior of TimestampCombiner. I have a
test like this:


   1. class TimestampCombinerTest(unittest.TestCase):
   2.
   3.   def test_combiner_latest(self):
   4.     """Test TimestampCombiner with LATEST."""
   5.     options = PipelineOptions()
   6.     options.view_as(StandardOptions).streaming = True
   7.     p = TestPipeline(options=options)
   8.
   9.     main_stream = (p
   10.                    | 'main TestStream' >> TestStream()
   11.                    .add_elements([window.TimestampedValue(('k',
100), 0)])
   12.                    .add_elements([window.TimestampedValue(('k',
400), 9)])
   13.                    .advance_watermark_to_infinity()
   14.                    | 'main windowInto' >> beam.WindowInto(
   15.                       window.FixedWindows(10),
   16.
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
   17.                    | 'Combine' >> beam.CombinePerKey(sum))
   18.
   19.     class RecordFn(beam.DoFn):
   20.       def process(self,
   21.                   elm=beam.DoFn.ElementParam,
   22.                   ts=beam.DoFn.TimestampParam):
   23.         yield (elm, ts)
   24.
   25.     records = (main_stream | beam.ParDo(RecordFn()))
   26.
   27.     expected_window_to_elements = {
   28.         window.IntervalWindow(0, 10): [
   29.             (('k', 500),  Timestamp(9)),
   30.         ],
   31.     }
   32.
   33.     assert_that(
   34.         records,
   35.         equal_to_per_window(expected_window_to_elements),
   36.         use_global_window=False,
   37.         label='assert per window')
   38.
   39.     p.run()


I expect the result to be following (based on various TimestampCombiner
strategy):
LATEST:    (('k', 500), Timestamp(9)),
EARLIEST:    (('k', 500), Timestamp(0)),
END_OF_WINDOW: (('k', 500), Timestamp(10)),
The above outcome is partially confirmed by Java side test : [1]

However, from beam python, the outcome is like this:
LATEST:    (('k', 500), Timestamp(10)),
EARLIEST:    (('k', 500), Timestamp(10)),
END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),

What did I miss? what should be the right expected behavior? or this looks
like a bug?

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390

Cheers,

Reply via email to