[ 
https://issues.apache.org/jira/browse/BEAM-6733?focusedWorklogId=203801&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-203801
 ]

ASF GitHub Bot logged work on BEAM-6733:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/19 17:58
            Start Date: 25/Feb/19 17:58
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #7940: [BEAM-6733] Use 
Flink 1.6/1.7 prepareSnapshotPreBarrier to replace BufferedOutputManager
URL: https://github.com/apache/beam/pull/7940#discussion_r259946587
 
 

 ##########
 File path: 
runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##########
 @@ -0,0 +1,956 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.ProcessFnRunner;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OutputTag;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing {@link DoFn DoFns}.
+ *
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <OutputT> the output type of the {@link DoFn}
+ */
+public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<WindowedValue<OutputT>>
+    implements OneInputStreamOperator<WindowedValue<InputT>, 
WindowedValue<OutputT>>,
+        TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, 
WindowedValue<OutputT>>,
+        Triggerable<Object, TimerData> {
+
+  protected DoFn<InputT, OutputT> doFn;
+
+  protected final SerializablePipelineOptions serializedOptions;
+
+  protected final TupleTag<OutputT> mainOutputTag;
+  protected final List<TupleTag<?>> additionalOutputTags;
+
+  protected final Collection<PCollectionView<?>> sideInputs;
+  protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
+
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+
+  protected final OutputManagerFactory<OutputT> outputManagerFactory;
+
+  protected transient DoFnRunner<InputT, OutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> 
pushbackDoFnRunner;
+
+  protected transient SideInputHandler sideInputHandler;
+
+  protected transient SideInputReader sideInputReader;
+
+  protected transient FlinkOutputManager<OutputT> outputManager;
+
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  protected transient long currentInputWatermark;
+
+  protected transient long currentSideInputWatermark;
+
+  protected transient long currentOutputWatermark;
+
+  protected transient FlinkStateInternals<?> keyedStateInternals;
+
+  protected final String stepName;
+
+  private final Coder<WindowedValue<InputT>> windowedInputCoder;
+
+  private final Coder<InputT> inputCoder;
+
+  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+  protected final Coder<?> keyCoder;
+
+  final KeySelector<WindowedValue<InputT>, ?> keySelector;
+
+  private final TimerInternals.TimerDataCoder timerCoder;
+
+  /** Max number of elements to include in a bundle. */
+  private final long maxBundleSize;
+  /** Max duration of a bundle. */
+  private final long maxBundleTimeMills;
+
+  private final DoFnSchemaInformation doFnSchemaInformation;
+
+  protected transient InternalTimerService<TimerData> timerService;
+
+  protected transient FlinkTimerInternals timerInternals;
+
+  private transient long pushedBackWatermark;
+
+  private transient PushedBackElementsHandler<WindowedValue<InputT>> 
pushedBackElementsHandler;
+
+  /** Use an AtomicBoolean because we start/stop bundles by a timer thread 
(see below). */
+  private transient AtomicBoolean bundleStarted;
+  /** Number of processed elements in the current bundle. */
+  private transient long elementCount;
+  /** A timer that finishes the current bundle after a fixed amount of time. */
+  private transient ScheduledFuture<?> checkFinishBundleTimer;
+  /** Time that the last bundle was finished (to set the timer). */
+  private transient long lastFinishBundleTime;
+
+  public DoFnOperator(
+      DoFn<InputT, OutputT> doFn,
+      String stepName,
+      Coder<WindowedValue<InputT>> inputWindowedCoder,
+      Coder<InputT> inputCoder,
+      Map<TupleTag<?>, Coder<?>> outputCoders,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      Coder<?> keyCoder,
+      KeySelector<WindowedValue<InputT>, ?> keySelector,
+      DoFnSchemaInformation doFnSchemaInformation) {
+    this.doFn = doFn;
+    this.stepName = stepName;
+    this.windowedInputCoder = inputWindowedCoder;
+    this.inputCoder = inputCoder;
+    this.outputCoders = outputCoders;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputTagMapping = sideInputTagMapping;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputManagerFactory = outputManagerFactory;
+
+    setChainingStrategy(ChainingStrategy.ALWAYS);
+
+    this.keyCoder = keyCoder;
+    this.keySelector = keySelector;
+
+    this.timerCoder =
+        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+
+    FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+
+    this.maxBundleSize = flinkOptions.getMaxBundleSize();
+    this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
+    this.doFnSchemaInformation = doFnSchemaInformation;
+  }
+
+  // allow overriding this in WindowDoFnOperator because this one dynamically 
creates
+  // the DoFn
+  protected DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
+  }
+
+  // allow overriding this, for example SplittableDoFnOperator will not create 
a
+  // stateful DoFn runner because ProcessFn, which is used for executing a 
Splittable DoFn
+  // doesn't play by the normal DoFn rules and WindowDoFnOperator uses 
LateDataDroppingDoFnRunner
+  protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
+      DoFnRunner<InputT, OutputT> wrappedRunner) {
+
+    if (keyCoder != null) {
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, 
windowingStrategy);
+
+      // we don't know the window type
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+              doFn, keyedStateInternals, windowCoder);
+
+      return DoFnRunners.defaultStatefulDoFnRunner(
+          doFn, wrappedRunner, windowingStrategy, cleanupTimer, stateCleaner);
+
+    } else {
+      return doFnRunner;
+    }
+  }
+
+  @Override
+  public void setup(
+      StreamTask<?, ?> containingTask,
+      StreamConfig config,
+      Output<StreamRecord<WindowedValue<OutputT>>> output) {
+
+    // make sure that FileSystems is initialized correctly
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+    FileSystems.setDefaultPipelineOptions(options);
+
+    super.setup(containingTask, config, output);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor =
+        new ListStateDescriptor<>(
+            "pushed-back-elements", new 
CoderTypeSerializer<>(windowedInputCoder));
+
+    if (keySelector != null) {
+      pushedBackElementsHandler =
+          KeyedPushedBackElementsHandler.create(
+              keySelector, getKeyedStateBackend(), pushedBackStateDescriptor);
+    } else {
+      ListState<WindowedValue<InputT>> listState =
+          getOperatorStateBackend().getListState(pushedBackStateDescriptor);
+      pushedBackElementsHandler = 
NonKeyedPushedBackElementsHandler.create(listState);
+    }
+
+    setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+
+    sideInputReader = NullSideInputReader.of(sideInputs);
+
+    if (!sideInputs.isEmpty()) {
+
+      FlinkBroadcastStateInternals sideInputStateInternals =
+          new FlinkBroadcastStateInternals<>(
+              getContainingTask().getIndexInSubtaskGroup(), 
getOperatorStateBackend());
+
+      sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+
+      Stream<WindowedValue<InputT>> pushedBack = 
pushedBackElementsHandler.getElements();
+      long min =
+          pushedBack.map(v -> 
v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min);
+      setPushedBackWatermark(min);
+    } else {
+      setPushedBackWatermark(Long.MAX_VALUE);
+    }
+
+    outputManager = outputManagerFactory.create(output);
+
+    // StatefulPardo or WindowDoFn
+    if (keyCoder != null) {
+      keyedStateInternals =
+          new FlinkStateInternals<>((KeyedStateBackend) 
getKeyedStateBackend(), keyCoder);
+
+      if (timerService == null) {
+        timerService =
+            getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+      }
+
+      timerInternals = new FlinkTimerInternals();
+    }
+  }
+
+  @Override
+  public void open() throws Exception {
+    // WindowDoFnOperator need use state and timer to get DoFn.
+    // So must wait StateInternals and TimerInternals ready.
+    // This will be called after initializeState()
+    this.doFn = getDoFn();
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+    doFnRunner =
+        DoFnRunners.simpleRunner(
+            options,
+            doFn,
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            additionalOutputTags,
+            new FlinkStepContext(),
+            inputCoder,
+            outputCoders,
+            windowingStrategy,
+            doFnSchemaInformation);
+
+    doFnRunner = createWrappingDoFnRunner(doFnRunner);
+
+    if (options.getEnableMetrics()) {
+      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, 
getRuntimeContext());
+    }
+
+    bundleStarted = new AtomicBoolean(false);
+    elementCount = 0L;
+    lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
+
+    // Schedule timer to check timeout of finish bundle.
+    long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2;
+    checkFinishBundleTimer =
+        getProcessingTimeService()
+            .scheduleAtFixedRate(
+                timestamp -> checkInvokeFinishBundleByTime(), 
bundleCheckPeriod, bundleCheckPeriod);
+
+    if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
+      pushbackDoFnRunner =
+          new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, 
sideInputHandler);
+    } else {
+      pushbackDoFnRunner =
+          SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
+    }
+  }
+
+  @Override
+  public void dispose() throws Exception {
+    try {
+      checkFinishBundleTimer.cancel(true);
+      FlinkClassloading.deleteStaticCaches();
+      doFnInvoker.invokeTeardown();
+    } finally {
+      // This releases all task's resources. We need to call this last
+      // to ensure that state, timers, or output buffers can still be
+      // accessed during finishing the bundle.
+      super.dispose();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      // This is our last change to block shutdown of this operator while
+      // there are still remaining processing-time timers. Flink will ignore 
pending
+      // processing-time timers when upstream operators have shut down and 
will also
+      // shut down this operator with pending processing-time timers.
+      while (this.numProcessingTimeTimers() > 0) {
+        getContainingTask().getCheckpointLock().wait(100);
+      }
+      if (this.numProcessingTimeTimers() > 0) {
+        throw new RuntimeException(
+            "There are still processing-time timers left, this indicates a 
bug");
+      }
+
+      // make sure we send a +Inf watermark downstream. It can happen that we 
receive +Inf
+      // in processWatermark*() but have holds, so we have to re-evaluate here.
+      processWatermark(new Watermark(Long.MAX_VALUE));
+      invokeFinishBundle();
+      if (currentOutputWatermark < Long.MAX_VALUE) {
+        if (keyedStateInternals == null) {
+          throw new RuntimeException("Current watermark is still " + 
currentOutputWatermark + ".");
+
+        } else {
+          throw new RuntimeException(
+              "There are still watermark holds. Watermark held at "
+                  + keyedStateInternals.watermarkHold().getMillis()
+                  + ".");
+        }
+      }
+    } finally {
+      super.close();
+    }
+
+    // sanity check: these should have been flushed out by +Inf watermarks
+    if (!sideInputs.isEmpty()) {
+
+      List<WindowedValue<InputT>> pushedBackElements =
+          pushedBackElementsHandler.getElements().collect(Collectors.toList());
+
+      if (pushedBackElements.size() > 0) {
+        String pushedBackString = Joiner.on(",").join(pushedBackElements);
+        throw new RuntimeException(
+            "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
+      }
+    }
+  }
+
+  protected long getPushbackWatermarkHold() {
+    return pushedBackWatermark;
+  }
+
+  protected void setPushedBackWatermark(long watermark) {
+    pushedBackWatermark = watermark;
+  }
+
+  @Override
+  public final void processElement(StreamRecord<WindowedValue<InputT>> 
streamRecord) {
+    checkInvokeStartBundle();
+    doFnRunner.processElement(streamRecord.getValue());
+    checkInvokeFinishBundleByCount();
+  }
+
+  @Override
+  public final void processElement1(StreamRecord<WindowedValue<InputT>> 
streamRecord)
+      throws Exception {
+    checkInvokeStartBundle();
+    Iterable<WindowedValue<InputT>> justPushedBack =
+        
pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
+
+    long min = pushedBackWatermark;
+    for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+      pushedBackElementsHandler.pushBack(pushedBackValue);
+    }
+    setPushedBackWatermark(min);
+
+    checkInvokeFinishBundleByCount();
+  }
+
+  /**
+   * Add the side input value. Here we are assuming that views have already 
been materialized and
+   * are sent over the wire as {@link Iterable}. Subclasses may elect to 
perform materialization in
+   * state and receive side input incrementally instead.
+   *
+   * @param streamRecord
+   */
+  protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
+    @SuppressWarnings("unchecked")
+    WindowedValue<Iterable<?>> value =
+        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+    PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+    sideInputHandler.addSideInputValue(sideInput, value);
+  }
+
+  @Override
+  public final void processElement2(StreamRecord<RawUnionValue> streamRecord) 
throws Exception {
+    // we finish the bundle because the newly arrived side-input might
+    // make a view available that was previously not ready.
+    // The PushbackSideInputRunner will only reset it's cache of non-ready 
windows when
+    // finishing a bundle.
+    invokeFinishBundle();
+    checkInvokeStartBundle();
+
+    // add the side input, which may cause pushed back elements become 
eligible for processing
+    addSideInputValue(streamRecord);
+
+    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+    Iterator<WindowedValue<InputT>> it = 
pushedBackElementsHandler.getElements().iterator();
+
+    while (it.hasNext()) {
+      WindowedValue<InputT> element = it.next();
+      // we need to set the correct key in case the operator is
+      // a (keyed) window operator
+      setKeyContextElement1(new StreamRecord<>(element));
+
+      Iterable<WindowedValue<InputT>> justPushedBack =
+          pushbackDoFnRunner.processElementInReadyWindows(element);
+      Iterables.addAll(newPushedBack, justPushedBack);
+    }
+
+    pushedBackElementsHandler.clear();
+    long min = Long.MAX_VALUE;
+    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+      pushedBackElementsHandler.pushBack(pushedBackValue);
+    }
+    setPushedBackWatermark(min);
+
+    checkInvokeFinishBundleByCount();
+
+    // maybe output a new watermark
+    processWatermark1(new Watermark(currentInputWatermark));
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    processWatermark1(mark);
+  }
+
+  @Override
+  public void processWatermark1(Watermark mark) throws Exception {
+
+    checkInvokeStartBundle();
+
+    // We do the check here because we are guaranteed to at least get the +Inf 
watermark on the
+    // main input when the job finishes.
+    if (currentSideInputWatermark >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      // we also do the check here because we might have received the 
side-input MAX watermark
+      // before receiving any main-input data
+      emitAllPushedBackData();
+    }
+
+    if (keyCoder == null) {
+      setCurrentInputWatermark(mark.getTimestamp());
+      long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), 
currentInputWatermark);
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        setCurrentOutputWatermark(potentialOutputWatermark);
+        emitWatermark(currentOutputWatermark);
+      }
+    } else {
+      setCurrentInputWatermark(mark.getTimestamp());
+
+      // hold back by the pushed back values waiting for side inputs
+      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
+
+      timeServiceManager.advanceWatermark(
+          new Watermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)));
+
+      Instant watermarkHold = keyedStateInternals.watermarkHold();
+
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
getPushbackWatermarkHold());
+
+      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, 
combinedWatermarkHold);
+
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        setCurrentOutputWatermark(potentialOutputWatermark);
+        emitWatermark(currentOutputWatermark);
+      }
+    }
+  }
+
+  private void emitWatermark(long watermark) {
+    // Must invoke finishBatch before emit the +Inf watermark otherwise there 
are some late events.
+    if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      invokeFinishBundle();
+    }
+    output.emitWatermark(new Watermark(watermark));
+  }
+
+  @Override
+  public void processWatermark2(Watermark mark) throws Exception {
+    checkInvokeStartBundle();
+
+    setCurrentSideInputWatermark(mark.getTimestamp());
+    if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      emitAllPushedBackData();
+
+      // maybe output a new watermark
+      processWatermark1(new Watermark(currentInputWatermark));
+    }
+  }
+
+  /**
+   * Converts a Beam watermark to a Flink watermark. This is only relevant 
when considering what
+   * event-time timers to fire: in Beam, a watermark {@code T} says there will 
not be any elements
+   * with a timestamp {@code < T} in the future. A Flink watermark {@code T} 
says there will not be
+   * any elements with a timestamp {@code <= T} in the future. We correct this 
by subtracting {@code
+   * 1} from a Beam watermark before passing to any relevant Flink runtime 
components.
+   */
+  private static long toFlinkRuntimeWatermark(long beamWatermark) {
+    return beamWatermark - 1;
+  }
+
+  /**
+   * Emits all pushed-back data. This should be used once we know that there 
will not be any future
+   * side input, i.e. that there is no point in waiting.
+   */
+  private void emitAllPushedBackData() throws Exception {
+
+    Iterator<WindowedValue<InputT>> it = 
pushedBackElementsHandler.getElements().iterator();
+
+    while (it.hasNext()) {
+      WindowedValue<InputT> element = it.next();
+      // we need to set the correct key in case the operator is
+      // a (keyed) window operator
+      setKeyContextElement1(new StreamRecord<>(element));
+
+      doFnRunner.processElement(element);
+    }
+
+    pushedBackElementsHandler.clear();
+
+    setPushedBackWatermark(Long.MAX_VALUE);
+  }
+
+  /**
+   * Check whether invoke startBundle, if it is, need to output elements that 
were buffered as part
+   * of finishing a bundle in snapshot() first.
+   *
+   * <p>In order to avoid having {@link 
DoFnRunner#processElement(WindowedValue)} or {@link
+   * DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not 
between StartBundle and
+   * FinishBundle, this method needs to be called in each processElement and 
each processWatermark
+   * and onProcessingTime. Do not need to call in onEventTime, because it has 
been guaranteed in the
+   * processWatermark.
+   */
+  private void checkInvokeStartBundle() {
+    if (bundleStarted.compareAndSet(false, true)) {
+      pushbackDoFnRunner.startBundle();
+    }
+  }
+
+  /** Check whether invoke finishBundle by elements count. Called in 
processElement. */
+  private void checkInvokeFinishBundleByCount() {
+    elementCount++;
+    if (elementCount >= maxBundleSize) {
+      invokeFinishBundle();
+    }
+  }
+
+  /** Check whether invoke finishBundle by timeout. */
+  private void checkInvokeFinishBundleByTime() {
+    long now = getProcessingTimeService().getCurrentProcessingTime();
+    if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+      invokeFinishBundle();
+    }
+  }
+
+  protected final void invokeFinishBundle() {
+    if (bundleStarted.compareAndSet(true, false)) {
+      pushbackDoFnRunner.finishBundle();
+      elementCount = 0L;
+      lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
+    }
+  }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) {
+    // Finish the current bundle before the snapshot barrier is sent downstream
+    // This give us a clean state before taking the actual snapshot
+    invokeFinishBundle();
+  }
+
+  @Override
+  public void onEventTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
+    // We don't have to cal checkInvokeStartBundle() because it's already 
called in
+    // processWatermark*().
+    fireTimer(timer);
+  }
+
+  @Override
+  public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
+    checkInvokeStartBundle();
+    fireTimer(timer);
+  }
+
+  // allow overriding this in WindowDoFnOperator
+  public void fireTimer(InternalTimer<?, TimerData> timer) {
+    TimerInternals.TimerData timerData = timer.getNamespace();
+    StateNamespace namespace = timerData.getNamespace();
+    // This is a user timer, so namespace must be WindowNamespace
+    checkArgument(namespace instanceof WindowNamespace);
+    BoundedWindow window = ((WindowNamespace) namespace).getWindow();
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
+    pushbackDoFnRunner.onTimer(
+        timerData.getTimerId(), window, timerData.getTimestamp(), 
timerData.getDomain());
+  }
+
+  private void setCurrentInputWatermark(long currentInputWatermark) {
+    this.currentInputWatermark = currentInputWatermark;
+  }
+
+  private void setCurrentSideInputWatermark(long currentInputWatermark) {
+    this.currentSideInputWatermark = currentInputWatermark;
+  }
+
+  private void setCurrentOutputWatermark(long currentOutputWatermark) {
+    this.currentOutputWatermark = currentOutputWatermark;
+  }
+
+  /** Factory for creating an {@link FlinkOutputManager} from a Flink {@link 
Output}. */
+  interface OutputManagerFactory<OutputT> extends Serializable {
+    FlinkOutputManager<OutputT> 
create(Output<StreamRecord<WindowedValue<OutputT>>> output);
+  }
+
+  /** A {@link DoFnRunners.OutputManager} that forwards data to the Flink 
runtime. */
+  public static class FlinkOutputManager<OutputT> implements 
DoFnRunners.OutputManager {
 
 Review comment:
   The `OutputManager` could be simplified to remove any buffering.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 203801)
    Time Spent: 40m  (was: 0.5h)

> Use Flink 1.6/1.7 prepareSnapshotPreBarrier to replace BufferedOutputManager
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-6733
>                 URL: https://issues.apache.org/jira/browse/BEAM-6733
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Flink 1.6/1.7 provides a hook to execute an action before the snapshot 
> barrier is emitted by the operator. At the moment (<=1.5) the Flink Runner 
> has to buffer any elements which are emitted during a snapshot because the 
> barrier has already been emitted. This leads to a lot of code complexity.
> We can remove the buffering in favor of finishing the current bundle in 
> {{DoFnOperator}}'s {{prepareSnapshotPreBarrier}}. The 1.5/1.6/1.7 build setup 
> poses a challenge to do that in a way that does not lead to much code 
> duplication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to