Hi, Folks,
I am trying to understand the behavior of TimestampCombiner. I have a
test like this:
1. class TimestampCombinerTest(unittest.TestCase):
2.
3. def test_combiner_latest(self):
4. """Test TimestampCombiner with LATEST."""
5. options = PipelineOptions()
6. options.view_as(StandardOptions).streaming = True
7. p = TestPipeline(options=options)
8.
9. main_stream = (p
10. | 'main TestStream' >> TestStream()
11. .add_elements([window.TimestampedValue(('k',
100), 0)])
12. .add_elements([window.TimestampedValue(('k',
400), 9)])
13. .advance_watermark_to_infinity()
14. | 'main windowInto' >> beam.WindowInto(
15. window.FixedWindows(10),
16.
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
17. | 'Combine' >> beam.CombinePerKey(sum))
18.
19. class RecordFn(beam.DoFn):
20. def process(self,
21. elm=beam.DoFn.ElementParam,
22. ts=beam.DoFn.TimestampParam):
23. yield (elm, ts)
24.
25. records = (main_stream | beam.ParDo(RecordFn()))
26.
27. expected_window_to_elements = {
28. window.IntervalWindow(0, 10): [
29. (('k', 500), Timestamp(9)),
30. ],
31. }
32.
33. assert_that(
34. records,
35. equal_to_per_window(expected_window_to_elements),
36. use_global_window=False,
37. label='assert per window')
38.
39. p.run()
I expect the result to be following (based on various TimestampCombiner
strategy):
LATEST: (('k', 500), Timestamp(9)),
EARLIEST: (('k', 500), Timestamp(0)),
END_OF_WINDOW: (('k', 500), Timestamp(10)),
The above outcome is partially confirmed by Java side test : [1]
However, from beam python, the outcome is like this:
LATEST: (('k', 500), Timestamp(10)),
EARLIEST: (('k', 500), Timestamp(10)),
END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
What did I miss? what should be the right expected behavior? or this looks
like a bug?
[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
Cheers,