This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3eecb381f3fb412df6844cfc13b12bf265253926 Author: Aljoscha Krettek <[email protected]> AuthorDate: Fri Jan 5 14:17:49 2018 +0100 [BEAM-2140] Block DoFnOperator.close() if we have pending timers It can happen that the input operation finishes while we still have pending processing-time timers, for example from processing a Splittable DoFn. This change makes sure that we block as long as we have pending timers. This change also makes sure that we forward a +Inf watermark in close(). We have to do this because it can happen that we get a +Inf watermark on input while we still have active watermark holds (which will get resolved when all pending timers are gone). With this change we make sure to send a +Inf watermark downstream once everything is resolved. --- .../wrappers/streaming/DoFnOperator.java | 43 +++++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index dd2f9c4..37f56f5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -371,18 +371,41 @@ public class DoFnOperator<InputT, OutputT> @Override public void close() throws Exception { - super.close(); - - // sanity check: these should have been flushed out by +Inf watermarks - if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) { - BagState<WindowedValue<InputT>> pushedBack = - nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); + try { - Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); - if (pushedBackContents != null && !Iterables.isEmpty(pushedBackContents)) { - String pushedBackString = Joiner.on(",").join(pushedBackContents); + // This is our last change to block shutdown of this operator while + // there are still remaining processing-time timers. Flink will ignore pending + // processing-time timers when upstream operators have shut down and will also + // shut down this operator with pending processing-time timers. + while (this.numProcessingTimeTimers() > 0) { + getContainingTask().getCheckpointLock().wait(100); + } + if (this.numProcessingTimeTimers() > 0) { throw new RuntimeException( - "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + "There are still processing-time timers left, this indicates a bug"); + } + + // make sure we send a +Inf watermark downstream. It can happen that we receive +Inf + // in processWatermark*() but have holds, so we have to re-evaluate here. + processWatermark(new Watermark(Long.MAX_VALUE)); + if (currentOutputWatermark < Long.MAX_VALUE) { + throw new RuntimeException("There are still watermark holds. Watermark held at " + + keyedStateInternals.watermarkHold().getMillis() + "."); + } + } finally { + super.close(); + + // sanity check: these should have been flushed out by +Inf watermarks + if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) { + BagState<WindowedValue<InputT>> pushedBack = + nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null && !Iterables.isEmpty(pushedBackContents)) { + String pushedBackString = Joiner.on(",").join(pushedBackContents); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } } } } -- To stop receiving notification emails like this one, please contact [email protected].
