[ https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-7995: ---------------------------------- Labels: stale-assigned (was: ) > IllegalStateException: TimestampCombiner moved element from to earlier time > in Python > ------------------------------------------------------------------------------------- > > Key: BEAM-7995 > URL: https://issues.apache.org/jira/browse/BEAM-7995 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Hai Lu > Assignee: Hai Lu > Priority: P2 > Labels: stale-assigned > Time Spent: 2h 50m > Remaining Estimate: 0h > > I'm looking into a bug I found internally when using Beam portable API > (Python) on our own Samza runner. > > The pipeline looks something like this: > > (p > | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent") > | 'transform' >> beam.Map(lambda event: process_event(event)) > | 'window' >> beam.WindowInto(FixedWindows(15)) > | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())* > ... > > The problem comes from the combiners which cause the following exception on > Java side: > > Caused by: java.lang.IllegalStateException: TimestampCombiner moved element > from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z > for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z) > at > org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117) > at > org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154) > at > org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98) > at > org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > > The exception happens here > [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116] > when we check the shifted timestamp to ensure it's before the timestamp. > > if (shifted.isBefore(timestamp)) { > throw new IllegalStateException( > String.format( > "TimestampCombiner moved element from %s to earlier time %s for > window %s", > BoundedWindow.formatTimestamp(timestamp), > BoundedWindow.formatTimestamp(shifted), > window)); > } > > As you can see from the exception, the "shifted" is "XXX 44.999" while the > "timestamp" is "XXX 45.000". The "44.999" is coming from > [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]: > > @Override > public Instant merge(BoundedWindow intoWindow, Iterable<? extends > Instant> mergingTimestamps) { > return intoWindow.maxTimestamp(); > } > > where intoWindow.maxTimestamp() is: > > /** Returns the largest timestamp that can be included in this window. */ > @Override > public Instant maxTimestamp() { > *// end not inclusive* > return *end.minus(1)*; > } > > Hence, the "44.*999*". > > And the "45.000" comes from the Python side when the combiner output results > as pre GBK operation: > [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889] > > if windows is 0: > self.output(_globally_windowed_value.with_value((key, value))) > else: > self.output(WindowedValue((key, value), *windows[0].end*, windows)) > > Here when we generate the window value, the timestamp is assigned to the > closed interval end (45.000) as opposed to open interval end (44.999) > > Clearly the "end of window" definition is a bit inconsistent across Python > and Java. I'm yet to try this on other runner so not sure whether this is > only an issue for our Samza runner. I tend to think this is a bug but would > like to confirm with you. If this has not been an issue for other runners, > where did I potentially do wrong. -- This message was sent by Atlassian Jira (v8.3.4#803005)