[ 
https://issues.apache.org/jira/browse/BEAM-3905?focusedWorklogId=110474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110474
 ]

ASF GitHub Bot logged work on BEAM-3905:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Jun/18 11:59
            Start Date: 10/Jun/18 11:59
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5578: [BEAM-3905] Update 
Flink Runner to Flink 1.5.0
URL: https://github.com/apache/beam/pull/5578
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index 7dbe5acb774..ebd607c0e21 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -45,7 +45,7 @@ configurations {
   validatesRunner
 }
 
-def flink_version = "1.4.0"
+def flink_version = "1.5.0"
 
 dependencies {
   compile library.java.guava
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 2384c7bcb82..53ba4f12f04 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -31,7 +31,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <flink.version>1.4.0</flink.version>
+    <flink.version>1.5.0</flink.version>
   </properties>
 
   <profiles>
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index ff991dce7ed..bf80f7ce8b6 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -68,6 +68,7 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
@@ -327,9 +328,12 @@ public void flatMap(T t, Collector<T> collector) {
                     .returns(workItemTypeInfo)
                     .name("ToKeyedWorkItem");
 
+    WorkItemKeySelector<K, V> keySelector = new WorkItemKeySelector<>(
+        inputElementCoder.getKeyCoder());
+
     KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>
             keyedWorkItemStream =
-            workItemStream.keyBy(new 
WorkItemKeySelector<>(inputElementCoder.getKeyCoder()));
+            workItemStream.keyBy(keySelector);
 
     SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn =
             SystemReduceFn.buffering(inputElementCoder.getValueCoder());
@@ -358,7 +362,8 @@ public void flatMap(T t, Collector<T> collector) {
                     new HashMap<>(), /* side-input mapping */
                     Collections.emptyList(), /* side inputs */
                     context.getPipelineOptions(),
-                    inputElementCoder.getKeyCoder());
+                    inputElementCoder.getKeyCoder(),
+                    (KeySelector) keySelector /* key selector */);
 
     SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> 
outputDataStream =
             keyedWorkItemStream
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 563263af5e6..b958c1a016f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -415,6 +415,7 @@ public RawUnionValue map(T o) throws Exception {
           Map<TupleTag<?>, Integer> tagsToIds,
           Coder<WindowedValue<InputT>> inputCoder,
           Coder keyCoder,
+          KeySelector<WindowedValue<InputT>, ?> keySelector,
           Map<Integer, PCollectionView<?>> transformedSideInputs);
     }
 
@@ -464,6 +465,7 @@ public RawUnionValue map(T o) throws Exception {
       DataStream<WindowedValue<InputT>> inputDataStream = 
context.getInputDataStream(input);
 
       Coder keyCoder = null;
+      KeySelector keySelector = null;
       boolean stateful = false;
       DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
       if (signature.stateDeclarations().size() > 0
@@ -471,7 +473,8 @@ public RawUnionValue map(T o) throws Exception {
         // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
         // that it is also keyed
         keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
-        inputDataStream = inputDataStream.keyBy(new 
KvToByteBufferKeySelector(keyCoder));
+        keySelector = new KvToByteBufferKeySelector(keyCoder);
+        inputDataStream = inputDataStream.keyBy(keySelector);
         stateful = true;
       } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
         // we know that it is keyed on String
@@ -498,6 +501,7 @@ public RawUnionValue map(T o) throws Exception {
                 tagsToIds,
                 inputCoder,
                 keyCoder,
+                keySelector,
                 new HashMap<>() /* side-input mapping */);
 
         outputStream = inputDataStream
@@ -521,6 +525,7 @@ public RawUnionValue map(T o) throws Exception {
                 tagsToIds,
                 inputCoder,
                 keyCoder,
+                keySelector,
                 transformedSideInputs.f0);
 
         if (stateful) {
@@ -627,6 +632,7 @@ public void translateNode(
               tagsToIds,
               inputCoder,
               keyCoder,
+              keySelector,
               transformedSideInputs) ->
               new DoFnOperator<>(
                   doFn1,
@@ -640,7 +646,8 @@ public void translateNode(
                   transformedSideInputs,
                   sideInputs1,
                   context1.getPipelineOptions(),
-                  keyCoder));
+                  keyCoder,
+                  keySelector));
     }
   }
 
@@ -676,6 +683,7 @@ public void translateNode(
               tagsToIds,
               inputCoder,
               keyCoder,
+              keySelector,
               transformedSideInputs) ->
               new SplittableDoFnOperator<>(
                   doFn,
@@ -689,7 +697,8 @@ public void translateNode(
                   transformedSideInputs,
                   sideInputs,
                   context1.getPipelineOptions(),
-                  keyCoder));
+                  keyCoder,
+                  keySelector));
     }
   }
 
@@ -803,6 +812,9 @@ public void translateNode(
               .returns(workItemTypeInfo)
               .name("ToKeyedWorkItem");
 
+      WorkItemKeySelector keySelector = new WorkItemKeySelector<>(
+          inputKvCoder.getKeyCoder());
+
       KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
           keyedWorkItemStream =
               workItemStream.keyBy(new 
WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));
@@ -830,7 +842,8 @@ public void translateNode(
               new HashMap<>(), /* side-input mapping */
               Collections.emptyList(), /* side inputs */
               context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder());
+              inputKvCoder.getKeyCoder(),
+              keySelector);
 
       // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
       // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
@@ -883,9 +896,11 @@ public void translateNode(
               .returns(workItemTypeInfo)
               .name("ToKeyedWorkItem");
 
+      WorkItemKeySelector keySelector = new WorkItemKeySelector<>(
+          inputKvCoder.getKeyCoder());
       KeyedStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>, ByteBuffer>
           keyedWorkItemStream =
-              workItemStream.keyBy(new 
WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));
+              workItemStream.keyBy(keySelector);
 
       GlobalCombineFn<? super InputT, ?, OutputT> combineFn;
       try {
@@ -918,7 +933,8 @@ public void translateNode(
               new HashMap<>(), /* side-input mapping */
               Collections.emptyList(), /* side inputs */
               context.getPipelineOptions(),
-              inputKvCoder.getKeyCoder());
+              inputKvCoder.getKeyCoder(),
+              keySelector);
 
       // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
       // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 5d9e6b93ede..7479b5defd1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -20,11 +20,8 @@
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -32,9 +29,12 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -63,7 +63,6 @@
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -81,19 +80,17 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
 import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -114,7 +111,7 @@
     extends AbstractStreamOperator<WindowedValue<OutputT>>
     implements OneInputStreamOperator<WindowedValue<InputT>, 
WindowedValue<OutputT>>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, 
WindowedValue<OutputT>>,
-    KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
+      Triggerable<Object, TimerData> {
 
   protected DoFn<InputT, OutputT> doFn;
 
@@ -147,8 +144,6 @@
 
   protected transient long currentOutputWatermark;
 
-  private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
-
   protected transient FlinkStateInternals<?> keyedStateInternals;
 
   private final String stepName;
@@ -157,19 +152,23 @@
 
   private final Coder<?> keyCoder;
 
+  private final KeySelector<WindowedValue<InputT>, ?> keySelector;
+
   private final TimerInternals.TimerDataCoder timerCoder;
 
   private final long maxBundleSize;
 
   private final long maxBundleTimeMills;
 
-  protected transient HeapInternalTimerService<?, TimerInternals.TimerData> 
timerService;
+  protected transient InternalTimerService<TimerData> timerService;
 
   protected transient FlinkTimerInternals timerInternals;
 
   private transient StateInternals nonKeyedStateInternals;
 
-  private transient Optional<Long> pushedBackWatermark;
+  private transient long pushedBackWatermark;
+
+  private transient PushedBackElementsHandler<WindowedValue<InputT>> 
pushedBackElementsHandler;
 
   // bundle control
   private transient boolean bundleStarted = false;
@@ -188,7 +187,8 @@ public DoFnOperator(
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options,
-      Coder<?> keyCoder) {
+      Coder<?> keyCoder,
+      KeySelector<WindowedValue<InputT>, ?> keySelector) {
     this.doFn = doFn;
     this.stepName = stepName;
     this.inputCoder = inputCoder;
@@ -203,6 +203,7 @@ public DoFnOperator(
     setChainingStrategy(ChainingStrategy.ALWAYS);
 
     this.keyCoder = keyCoder;
+    this.keySelector = keySelector;
 
     this.timerCoder =
         
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
@@ -266,9 +267,25 @@ public void setup(
     super.setup(containingTask, config, output);
   }
 
+
   @Override
-  public void open() throws Exception {
-    super.open();
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor =
+        new ListStateDescriptor<>(
+            "pushed-back-elements", new CoderTypeSerializer<>(inputCoder));
+
+    if (keySelector != null) {
+      pushedBackElementsHandler = KeyedPushedBackElementsHandler.create(
+          keySelector,
+          getKeyedStateBackend(),
+          pushedBackStateDescriptor);
+    } else {
+      ListState<WindowedValue<InputT>> listState = getOperatorStateBackend()
+          .getListState(pushedBackStateDescriptor);
+      pushedBackElementsHandler = 
NonKeyedPushedBackElementsHandler.create(listState);
+    }
 
     setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
@@ -291,8 +308,6 @@ public void open() throws Exception {
 
     if (!sideInputs.isEmpty()) {
 
-      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-
       FlinkBroadcastStateInternals sideInputStateInternals =
           new FlinkBroadcastStateInternals<>(
               getContainingTask().getIndexInSubtaskGroup(), 
getOperatorStateBackend());
@@ -300,7 +315,13 @@ public void open() throws Exception {
       sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
       sideInputReader = sideInputHandler;
 
-      pushedBackWatermark = Optional.absent();
+      Stream<WindowedValue<InputT>> pushedBack = 
pushedBackElementsHandler.getElements();
+      long min = pushedBack
+          .map(v -> v.getTimestamp().getMillis())
+          .reduce(Long.MAX_VALUE, Math::min);
+      setPushedBackWatermark(min);
+    } else {
+      setPushedBackWatermark(Long.MAX_VALUE);
     }
 
     outputManager = outputManagerFactory.create(output, 
nonKeyedStateInternals);
@@ -311,8 +332,8 @@ public void open() throws Exception {
           keyCoder);
 
       if (timerService == null) {
-        timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
-            getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+        timerService = getInternalTimerService(
+            "beam-timer", new CoderTypeSerializer<>(timerCoder), this);
       }
 
       timerInternals = new FlinkTimerInternals();
@@ -396,8 +417,13 @@ public void close() throws Exception {
       // in processWatermark*() but have holds, so we have to re-evaluate here.
       processWatermark(new Watermark(Long.MAX_VALUE));
       if (currentOutputWatermark < Long.MAX_VALUE) {
-        throw new RuntimeException("There are still watermark holds. Watermark 
held at "
-            + keyedStateInternals.watermarkHold().getMillis() + ".");
+        if (keyedStateInternals == null) {
+          throw new RuntimeException("Current watermark is still " + 
currentOutputWatermark + ".");
+
+        } else {
+          throw new RuntimeException("There are still watermark holds. 
Watermark held at "
+              + keyedStateInternals.watermarkHold().getMillis() + ".");
+        }
       }
     } finally {
       super.close();
@@ -405,12 +431,13 @@ public void close() throws Exception {
 
     // sanity check: these should have been flushed out by +Inf watermarks
     if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
-      BagState<WindowedValue<InputT>> pushedBack =
-          nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
 
-      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-      if (!Iterables.isEmpty(pushedBackContents)) {
-        String pushedBackString = Joiner.on(",").join(pushedBackContents);
+      List<WindowedValue<InputT>> pushedBackElements = 
pushedBackElementsHandler
+          .getElements()
+          .collect(Collectors.toList());
+
+      if (pushedBackElements.size() > 0) {
+        String pushedBackString = Joiner.on(",").join(pushedBackElements);
         throw new RuntimeException(
             "Leftover pushed-back data: " + pushedBackString + ". This 
indicates a bug.");
       }
@@ -418,33 +445,7 @@ public void close() throws Exception {
   }
 
   private long getPushbackWatermarkHold() {
-    // if we don't have side inputs we never hold the watermark
-    if (sideInputs.isEmpty()) {
-      return Long.MAX_VALUE;
-    }
-
-    try {
-      checkInitPushedBackWatermark();
-      return pushedBackWatermark.get();
-    } catch (Exception e) {
-      throw new RuntimeException("Error retrieving pushed back watermark 
state.", e);
-    }
-  }
-
-  private void checkInitPushedBackWatermark() {
-    // init and restore from pushedBack state.
-    // Not done in initializeState, because OperatorState is not ready.
-    if (!pushedBackWatermark.isPresent()) {
-
-      BagState<WindowedValue<InputT>> pushedBack =
-          nonKeyedStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
-
-      long min = Long.MAX_VALUE;
-      for (WindowedValue<InputT> value : pushedBack.read()) {
-        min = Math.min(min, value.getTimestamp().getMillis());
-      }
-      setPushedBackWatermark(min);
-    }
+    return pushedBackWatermark;
   }
 
   @Override
@@ -456,7 +457,7 @@ public final void processElement(
   }
 
   private void setPushedBackWatermark(long watermark) {
-    pushedBackWatermark = Optional.fromNullable(watermark);
+    pushedBackWatermark = watermark;
   }
 
   @Override
@@ -466,17 +467,13 @@ public final void processElement1(
     Iterable<WindowedValue<InputT>> justPushedBack =
         
pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
 
-    BagState<WindowedValue<InputT>> pushedBack =
-        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-    checkInitPushedBackWatermark();
-
-    long min = pushedBackWatermark.get();
+    long min = pushedBackWatermark;
     for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
-      pushedBack.add(pushedBackValue);
+      pushedBackElementsHandler.pushBack(pushedBackValue);
     }
     setPushedBackWatermark(min);
+
     checkInvokeFinishBundleByCount();
   }
 
@@ -497,28 +494,26 @@ public final void processElement2(
     PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
     sideInputHandler.addSideInputValue(sideInput, value);
 
-    BagState<WindowedValue<InputT>> pushedBack =
-        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
 
-    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-    for (WindowedValue<InputT> elem : pushedBackContents) {
+    Iterator<WindowedValue<InputT>> it = 
pushedBackElementsHandler.getElements().iterator();
 
+    while (it.hasNext()) {
+      WindowedValue<InputT> element = it.next();
       // we need to set the correct key in case the operator is
       // a (keyed) window operator
-      setKeyContextElement1(new StreamRecord<>(elem));
+      setKeyContextElement1(new StreamRecord<>(element));
 
       Iterable<WindowedValue<InputT>> justPushedBack =
-          pushbackDoFnRunner.processElementInReadyWindows(elem);
+          pushbackDoFnRunner.processElementInReadyWindows(element);
       Iterables.addAll(newPushedBack, justPushedBack);
     }
 
-    pushedBack.clear();
+    pushedBackElementsHandler.clear();
     long min = Long.MAX_VALUE;
     for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
-      pushedBack.add(pushedBackValue);
+      pushedBackElementsHandler.pushBack(pushedBackValue);
     }
     setPushedBackWatermark(min);
 
@@ -561,7 +556,8 @@ public void processWatermark1(Watermark mark) throws 
Exception {
       // hold back by the pushed back values waiting for side inputs
       long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
 
-      
timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark));
+      timeServiceManager.advanceWatermark(
+          new Watermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)));
 
       Instant watermarkHold = keyedStateInternals.watermarkHold();
 
@@ -616,20 +612,18 @@ private static long toFlinkRuntimeWatermark(long 
beamWatermark) {
    */
   private void emitAllPushedBackData() throws Exception {
 
-    BagState<WindowedValue<InputT>> pushedBack =
-        nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-    for (WindowedValue<InputT> elem : pushedBackContents) {
+    Iterator<WindowedValue<InputT>> it = 
pushedBackElementsHandler.getElements().iterator();
 
+    while (it.hasNext()) {
+      WindowedValue<InputT> element = it.next();
       // we need to set the correct key in case the operator is
       // a (keyed) window operator
-      setKeyContextElement1(new StreamRecord<>(elem));
+      setKeyContextElement1(new StreamRecord<>(element));
 
-      doFnRunner.processElement(elem);
+      doFnRunner.processElement(element);
     }
 
-    pushedBack.clear();
+    pushedBackElementsHandler.clear();
 
     setPushedBackWatermark(Long.MAX_VALUE);
 
@@ -692,102 +686,7 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
     invokeFinishBundle();
     outputManager.closeBuffer();
 
-    // copy from AbstractStreamOperator
-    if (getKeyedStateBackend() != null) {
-      KeyedStateCheckpointOutputStream out;
-
-      try {
-        out = context.getRawKeyedOperatorStateOutput();
-      } catch (Exception exception) {
-        throw new Exception("Could not open raw keyed operator state stream 
for "
-            + getOperatorName() + '.', exception);
-      }
-
-      try {
-        KeyGroupsList allKeyGroups = out.getKeyGroupList();
-        for (int keyGroupIdx : allKeyGroups) {
-          out.startNewKeyGroup(keyGroupIdx);
-
-          DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
-
-          // if (this instanceof KeyGroupCheckpointedOperator)
-          snapshotKeyGroupState(keyGroupIdx, dov);
-
-          // We can't get all timerServices, so we just snapshot our 
timerService
-          // Maybe this is a normal DoFn that has no timerService
-          if (keyCoder != null) {
-            timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
-          }
-
-        }
-      } catch (Exception exception) {
-        throw new Exception("Could not write timer service of " + 
getOperatorName()
-            + " to checkpoint state stream.", exception);
-      } finally {
-        try {
-          out.close();
-        } catch (Exception closeException) {
-          LOG.warn("Could not close raw keyed operator state stream for {}. 
This "
-              + "might have prevented deleting some state data.", 
getOperatorName(),
-              closeException);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) 
throws Exception {
-    if (keyCoder != null) {
-      ((FlinkKeyGroupStateInternals) 
nonKeyedStateInternals).snapshotKeyGroupState(
-          keyGroupIndex, out);
-    }
-  }
-
-  @Override
-  public void initializeState(StateInitializationContext context) throws 
Exception {
-    if (getKeyedStateBackend() != null) {
-      int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
-      KeyGroupsList localKeyGroupRange = 
getKeyedStateBackend().getKeyGroupRange();
-
-      for (KeyGroupStatePartitionStreamProvider streamProvider : 
context.getRawKeyedStateInputs()) {
-        DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(streamProvider.getStream());
-
-        int keyGroupIdx = streamProvider.getKeyGroupId();
-        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
-            "Key Group " + keyGroupIdx + " does not belong to the local 
range.");
-
-        // if (this instanceof KeyGroupRestoringOperator)
-        restoreKeyGroupState(keyGroupIdx, div);
-
-        // We just initialize our timerService
-        if (keyCoder != null) {
-          if (timerService == null) {
-            final HeapInternalTimerService<Object, TimerData> localService =
-                new HeapInternalTimerService<>(
-                    totalKeyGroups,
-                    localKeyGroupRange,
-                    this,
-                    getRuntimeContext().getProcessingTimeService());
-            
localService.startTimerService(getKeyedStateBackend().getKeySerializer(),
-                new CoderTypeSerializer<>(timerCoder), this);
-            timerService = localService;
-          }
-          timerService.restoreTimersForKeyGroup(div, keyGroupIdx, 
getUserCodeClassloader());
-        }
-      }
-    }
-  }
-
-  @Override
-  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) 
throws Exception {
-    if (keyCoder != null) {
-      if (nonKeyedStateInternals == null) {
-        nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
-            getKeyedStateBackend());
-      }
-      ((FlinkKeyGroupStateInternals) nonKeyedStateInternals)
-          .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
-    }
+    super.snapshotState(context);
   }
 
   @Override
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index fc84e4259b3..9867b393793 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -93,7 +93,7 @@ public ExecutableStageDoFnOperator(String stepName,
     super(new NoOpDoFn(),
             stepName, inputCoder, mainOutputTag, additionalOutputTags,
             outputManagerFactory, WindowingStrategy.globalDefault() /* unused 
*/,
-            sideInputTagMapping, sideInputs, options, null /*keyCoder*/);
+            sideInputTagMapping, sideInputs, options, null /*keyCoder*/, null 
/* key selector */);
       this.payload = payload;
       this.jobInfo = jobInfo;
       this.contextFactory = contextFactory;
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
new file mode 100644
index 00000000000..f0e16005b09
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+
+/**
+ * {@link PushedBackElementsHandler} that stores elements in Flink keyed 
state, for use when an
+ * operation is keyed and pushed-back data needs to stay in the correct 
partition when they get
+ * moved.
+ */
+class KeyedPushedBackElementsHandler<K, T> implements 
PushedBackElementsHandler<T> {
+
+  static <K, T> KeyedPushedBackElementsHandler<K, T> create(
+      KeySelector<T, K> keySelector,
+      KeyedStateBackend<K> backend,
+      ListStateDescriptor<T> stateDescriptor) {
+    return new KeyedPushedBackElementsHandler<>(keySelector, backend, 
stateDescriptor);
+  }
+
+  private final KeySelector<T, K> keySelector;
+  private final KeyedStateBackend<K> backend;
+  private final ListStateDescriptor<T> stateDescriptor;
+
+  private KeyedPushedBackElementsHandler(
+      KeySelector<T, K> keySelector,
+      KeyedStateBackend<K> backend,
+      ListStateDescriptor<T> stateDescriptor) {
+    this.keySelector = keySelector;
+    this.backend = backend;
+    this.stateDescriptor = stateDescriptor;
+  }
+
+  @Override
+  public Stream<T> getElements() {
+
+    return backend.getKeys(stateDescriptor.getName(), VoidNamespace.INSTANCE)
+        .flatMap((key) -> {
+          try {
+            backend.setCurrentKey(key);
+
+            ListState<T> state = backend.getPartitionedState(
+                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateDescriptor);
+
+            return StreamSupport.stream(state.get().spliterator(), false);
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading keyed state.", e);
+          }
+
+        });
+  }
+
+  @Override
+  public void clear() throws Exception {
+    // TODO we have to collect all keys because otherwise we get 
ConcurrentModificationExceptions
+    // from flink. We can change this once it's fixed in Flink
+
+    List<K> keys = backend
+        .getKeys(stateDescriptor.getName(), VoidNamespace.INSTANCE)
+        .collect(Collectors.toList());
+
+    for (K key : keys) {
+      backend.setCurrentKey(key);
+
+      ListState<T> state = backend.getPartitionedState(
+          VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateDescriptor);
+
+      state.clear();
+    }
+  }
+
+  @Override
+  public void pushBack(T element) throws Exception {
+    ListState<T> state = backend.getPartitionedState(
+        VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateDescriptor);
+
+    backend.setCurrentKey(keySelector.getKey(element));
+    state.add(element);
+  }
+
+  @Override
+  public void pushBackAll(Iterable<T> elements) throws Exception {
+    for (T e : elements) {
+      pushBack(e);
+    }
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/NonKeyedPushedBackElementsHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/NonKeyedPushedBackElementsHandler.java
new file mode 100644
index 00000000000..e6a0ab65f4e
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/NonKeyedPushedBackElementsHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.flink.api.common.state.ListState;
+
+/**
+ * {@link PushedBackElementsHandler} that stores elements in a Flink operator 
state list.
+ */
+class NonKeyedPushedBackElementsHandler<T> implements 
PushedBackElementsHandler<T> {
+
+  static <T> NonKeyedPushedBackElementsHandler<T> create(ListState<T> 
elementState) {
+    return new NonKeyedPushedBackElementsHandler<>(elementState);
+  }
+
+  private final ListState<T> elementState;
+
+  private NonKeyedPushedBackElementsHandler(ListState<T> elementState) {
+    this.elementState = checkNotNull(elementState);
+  }
+
+  @Override
+  public Stream<T> getElements() throws Exception {
+    return StreamSupport.stream(elementState.get().spliterator(), false);
+  }
+
+  @Override
+  public void clear() {
+    elementState.clear();
+  }
+
+  @Override
+  public void pushBack(T element) throws Exception {
+    elementState.add(element);
+  }
+
+  @Override
+  public void pushBackAll(Iterable<T> elements) throws Exception {
+    for (T e : elements) {
+      // TODO: use addAll() once Flink has addAll(Iterable<T>)
+      elementState.add(e);
+    }
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/PushedBackElementsHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/PushedBackElementsHandler.java
new file mode 100644
index 00000000000..98b40411080
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/PushedBackElementsHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.util.stream.Stream;
+
+/**
+ * Helper that keeps pushed-back data, most likely due to side inputs that are 
not available.
+ *
+ * <p>Implementations of this must use Flink state to make sure pushed-back 
data is fault tolerant.
+ *
+ * @param <T> The type of pushed back elements.
+ */
+interface PushedBackElementsHandler<T> {
+
+  /**
+   * Returns all pushed back elements.
+   */
+  Stream<T> getElements() throws Exception;
+
+  /**
+   * Clears the pushed back elements.
+   */
+  void clear() throws Exception;
+
+  /**
+   * Adds the given element to the pushed back elements.
+   */
+  void pushBack(T element) throws Exception;
+
+  /**
+   * Adds all the given element to the pushed back elements.
+   */
+  void pushBackAll(Iterable<T> elements) throws Exception;
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 6f7992ec052..df008a84160 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -48,6 +48,8 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -73,7 +75,8 @@ public SplittableDoFnOperator(
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options,
-      Coder<?> keyCoder) {
+      Coder<?> keyCoder,
+      KeySelector<WindowedValue<KeyedWorkItem<String, KV<InputT, 
RestrictionT>>>, ?> keySelector) {
     super(
         doFn,
         stepName,
@@ -85,7 +88,8 @@ public SplittableDoFnOperator(
         sideInputTagMapping,
         sideInputs,
         options,
-        keyCoder);
+        keyCoder,
+        keySelector);
   }
 
   @Override
@@ -98,8 +102,8 @@ public SplittableDoFnOperator(
   }
 
   @Override
-  public void open() throws Exception {
-    super.open();
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
 
     checkState(doFn instanceof ProcessFn);
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 35178a79c45..94b1b3bf3f1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -41,6 +41,7 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 
 /**
@@ -62,7 +63,8 @@ public WindowDoFnOperator(
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options,
-      Coder<K> keyCoder) {
+      Coder<K> keyCoder,
+      KeySelector<WindowedValue<KeyedWorkItem<K, InputT>>, ?> keySelector) {
     super(
         null,
         stepName,
@@ -74,7 +76,8 @@ public WindowDoFnOperator(
         sideInputTagMapping,
         sideInputs,
         options,
-        keyCoder);
+        keyCoder,
+        keySelector);
 
     this.systemReduceFn = systemReduceFn;
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
index 6df1101c31c..bd8deaf82d7 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
@@ -17,34 +17,20 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsList;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.joda.time.Duration;
 
@@ -53,138 +39,59 @@
  */
 public class DedupingOperator<T> extends 
AbstractStreamOperator<WindowedValue<T>>
     implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, 
WindowedValue<T>>,
-    KeyGroupCheckpointedOperator {
+    Triggerable<ByteBuffer, VoidNamespace>{
 
   private static final long MAX_RETENTION_SINCE_ACCESS = 
Duration.standardMinutes(10L).getMillis();
-  private static final long MAX_CACHE_SIZE = 100_000L;
 
-  private transient LoadingCache<Integer, LoadingCache<ByteBuffer, 
AtomicBoolean>> dedupingCache;
-  private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;
+  // we keep the time when we last saw an element id for cleanup
+  private ValueStateDescriptor<Long> dedupingStateDescriptor =
+      new ValueStateDescriptor<>("dedup-cache", LongSerializer.INSTANCE);
+
+  private transient InternalTimerService<VoidNamespace> timerService;
 
   @Override
-  public void open() throws Exception {
-    super.open();
-    checkInitCache();
-    keyedStateBackend = getKeyedStateBackend();
-  }
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
 
-  private void checkInitCache() {
-    if (dedupingCache == null) {
-      dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
-    }
-  }
+    timerService =
+        getInternalTimerService("dedup-cleanup-timer", 
VoidNamespaceSerializer.INSTANCE, this);
 
-  private static class KeyGroupLoader extends
-      CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
-    @Override
-    public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer ignore) throws 
Exception {
-      return CacheBuilder.newBuilder()
-          .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
-          .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader());
-    }
-  }
-
-  private static class TrueBooleanLoader extends CacheLoader<ByteBuffer, 
AtomicBoolean> {
-    @Override
-    public AtomicBoolean load(ByteBuffer ignore) throws Exception {
-      return new AtomicBoolean(true);
-    }
   }
 
   @Override
   public void processElement(
       StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws 
Exception {
-    ByteBuffer currentKey = keyedStateBackend.getCurrentKey();
-    int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex();
-    if (shouldOutput(groupIndex, currentKey)) {
-      WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
-      
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
-    }
-  }
 
-  private boolean shouldOutput(int groupIndex, ByteBuffer id) throws 
ExecutionException {
-    return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false);
-  }
+    ValueState<Long> dedupingState = 
getPartitionedState(dedupingStateDescriptor);
 
-  @Override
-  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) 
throws Exception {
-    checkInitCache();
-    Integer size = VarIntCoder.of().decode(in, Context.NESTED);
-    for (int i = 0; i < size; i++) {
-      byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED);
-      // restore the ids which not expired.
-      shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes));
-    }
-  }
+    Long lastSeenTimestamp = dedupingState.value();
 
-  @Override
-  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) 
throws Exception {
-    Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
-    VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
-    for (ByteBuffer id : ids) {
-      byte[] bytes = new byte[id.remaining()];
-      id.get(bytes);
-      id.position(id.position() - bytes.length);
-      ByteArrayCoder.of().encode(bytes, out, Context.NESTED);
+    if (lastSeenTimestamp == null) {
+      // we have never seen this, emit
+      WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
+      
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
     }
+
+    long currentProcessingTime = timerService.currentProcessingTime();
+    dedupingState.update(currentProcessingTime);
+    timerService.registerProcessingTimeTimer(
+        VoidNamespace.INSTANCE, currentProcessingTime + 
MAX_RETENTION_SINCE_ACCESS);
   }
 
   @Override
-  public void snapshotState(StateSnapshotContext context) throws Exception {
-    // copy from AbstractStreamOperator
-    if (getKeyedStateBackend() != null) {
-      KeyedStateCheckpointOutputStream out;
-
-      try {
-        out = context.getRawKeyedOperatorStateOutput();
-      } catch (Exception exception) {
-        throw new Exception("Could not open raw keyed operator state stream 
for "
-            + getOperatorName() + '.', exception);
-      }
-
-      try {
-        KeyGroupsList allKeyGroups = out.getKeyGroupList();
-        for (int keyGroupIdx : allKeyGroups) {
-          out.startNewKeyGroup(keyGroupIdx);
-
-          DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
-
-          // if (this instanceof KeyGroupCheckpointedOperator)
-          snapshotKeyGroupState(keyGroupIdx, dov);
-
-        }
-      } catch (Exception exception) {
-        throw new Exception("Could not write timer service of " + 
getOperatorName()
-            + " to checkpoint state stream.", exception);
-      } finally {
-        try {
-          out.close();
-        } catch (Exception closeException) {
-          LOG.warn("Could not close raw keyed operator state stream for {}. 
This "
-                  + "might have prevented deleting some state data.", 
getOperatorName(),
-              closeException);
-        }
-      }
-    }
+  public void onEventTime(InternalTimer<ByteBuffer, VoidNamespace> 
internalTimer) {
+    // will never happen
   }
 
   @Override
-  public void initializeState(StateInitializationContext context) throws 
Exception {
-    if (getKeyedStateBackend() != null) {
-      KeyGroupsList localKeyGroupRange = 
getKeyedStateBackend().getKeyGroupRange();
-
-      for (KeyGroupStatePartitionStreamProvider streamProvider : 
context.getRawKeyedStateInputs()) {
-        DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(streamProvider.getStream());
-
-        int keyGroupIdx = streamProvider.getKeyGroupId();
-        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
-            "Key Group " + keyGroupIdx + " does not belong to the local 
range.");
-
-        // if (this instanceof KeyGroupRestoringOperator)
-        restoreKeyGroupState(keyGroupIdx, div);
-
-      }
+  public void onProcessingTime(InternalTimer<ByteBuffer, VoidNamespace> 
internalTimer)
+      throws Exception {
+    ValueState<Long> dedupingState = 
getPartitionedState(dedupingStateDescriptor);
+
+    Long lastSeenTimestamp = dedupingState.value();
+    if (lastSeenTimestamp != null
+        && lastSeenTimestamp.equals(internalTimer.getTimestamp() - 
MAX_RETENTION_SINCE_ACCESS)) {
+      dedupingState.clear();
     }
   }
-
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index e91b4d600a5..8df8262822f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -73,7 +73,8 @@ public void parDoBaseClassPipelineOptionsNullTest() {
         new HashMap<>(),
         Collections.emptyList(),
         null,
-        null);
+        null, /* key coder */
+        null /* key selector */);
   }
 
   /**
@@ -97,7 +98,8 @@ public void parDoBaseClassPipelineOptionsSerializationTest() 
throws Exception {
             new HashMap<>(),
             Collections.emptyList(),
             options,
-            null);
+            null, /* key coder */
+            null /* key selector */);
 
     final byte[] serialized = SerializationUtils.serialize(doFnOperator);
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
index 77016023e15..5141cd5d6da 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
@@ -33,6 +33,7 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -41,7 +42,6 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
 import org.junit.Test;
@@ -114,7 +114,7 @@ public void testRestore() throws Exception {
     assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
 
     // draw a snapshot
-    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
     // finalize checkpoint
     final ArrayList<Integer> finalizeList = new ArrayList<>();
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
index 15d76dfa3c7..7828e366ac1 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -27,8 +27,8 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -73,7 +73,7 @@ public void testDeduping() throws Exception {
         contains(WindowedValue.valueInGlobalWindow(key1),
             WindowedValue.valueInGlobalWindow(key2)));
 
-    OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+    OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 
     harness.close();
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 34b4befcb2d..f2a39a98cc4 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -70,8 +70,8 @@
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -127,6 +127,7 @@ public void testSingleOutput() throws Exception {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            null,
             null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, 
WindowedValue<String>> testHarness =
@@ -184,6 +185,7 @@ public void testMultiOutputOutput() throws Exception {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+            null,
             null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, 
WindowedValue<String>> testHarness =
@@ -277,7 +279,8 @@ public void onEventTime(OnTimerContext context) {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-            VarIntCoder.of() /* key coder */);
+            VarIntCoder.of(), /* key coder */
+            WindowedValue::getValue);
 
     OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<String>> testHarness =
         new KeyedOneInputStreamOperatorTestHarness<>(
@@ -358,7 +361,8 @@ public void processElement(ProcessContext context) {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-            VarIntCoder.of() /* key coder */);
+            VarIntCoder.of(), /* key coder */
+            WindowedValue::getValue);
 
     OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<String>> testHarness =
         new KeyedOneInputStreamOperatorTestHarness<>(
@@ -464,7 +468,8 @@ public void onTimer(OnTimerContext context, 
@StateId(stateId) ValueState<String>
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-            StringUtf8Coder.of() /* key coder */);
+            StringUtf8Coder.of(), /* key coder */
+            kvWindowedValue -> kvWindowedValue.getValue().getKey());
 
     KeyedOneInputStreamOperatorTestHarness<
             String, WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>>
@@ -565,7 +570,8 @@ void testSideInputs(boolean keyed) throws Exception {
             sideInputMapping, /* side-input mapping */
             ImmutableList.of(view1, view2), /* side inputs */
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-            keyCoder);
+            keyCoder,
+            keyed ? WindowedValue::getValue : null);
 
     TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, 
WindowedValue<String>>
         testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator);
@@ -574,8 +580,8 @@ void testSideInputs(boolean keyed) throws Exception {
       // we use a dummy key for the second input since it is considered to be 
broadcast
       testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
           doFnOperator,
-          new StringKeySelector(),
-          new DummyKeySelector(),
+          WindowedValue::getValue,
+          null,
           BasicTypeInfo.STRING_TYPE_INFO);
     }
 
@@ -668,7 +674,7 @@ public void processElement(ProcessContext context,
     testHarness
         .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L))));
 
-    final OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
     testHarness.close();
 
     testHarness = createTestHarness(windowingStrategy, 
filterElementsEqualToCountFn, kvCoder,
@@ -721,6 +727,7 @@ public void nonKeyedParDoSideInputCheckpointing() throws 
Exception {
           sideInputMapping, /* side-input mapping */
           ImmutableList.of(view1, view2), /* side inputs */
           PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          null,
           null);
 
       return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
@@ -753,13 +760,14 @@ public void keyedParDoSideInputCheckpointing() throws 
Exception {
           sideInputMapping, /* side-input mapping */
           ImmutableList.of(view1, view2), /* side inputs */
           PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-          keyCoder);
+          keyCoder,
+          WindowedValue::getValue);
 
       return new KeyedTwoInputStreamOperatorTestHarness<>(
           doFnOperator,
-          new StringKeySelector(),
+          WindowedValue::getValue,
           // we use a dummy key for the second input since it is considered to 
be broadcast
-          new DummyKeySelector(),
+          null,
           BasicTypeInfo.STRING_TYPE_INFO);
     });
   }
@@ -799,7 +807,7 @@ void sideInputCheckpointing(
 
     // snapshot state, throw away the operator, then restore and verify that 
we still match
     // main-input elements to the side-inputs that we sent earlier
-    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
     testHarness = harnessFactory.create();
 
@@ -844,6 +852,7 @@ public void nonKeyedParDoPushbackDataCheckpointing() throws 
Exception {
           sideInputMapping, /* side-input mapping */
           ImmutableList.of(view1, view2), /* side inputs */
           PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+          null,
           null);
 
       return new TwoInputStreamOperatorTestHarness<>(doFnOperator);
@@ -877,13 +886,14 @@ public void keyedParDoPushbackDataCheckpointing() throws 
Exception {
           sideInputMapping, /* side-input mapping */
           ImmutableList.of(view1, view2), /* side inputs */
           PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-          keyCoder);
+          keyCoder,
+          WindowedValue::getValue);
 
       return new KeyedTwoInputStreamOperatorTestHarness<>(
           doFnOperator,
-          new StringKeySelector(),
+          WindowedValue::getValue,
           // we use a dummy key for the second input since it is considered to 
be broadcast
-          new DummyKeySelector(),
+          null,
           BasicTypeInfo.STRING_TYPE_INFO);
     });
   }
@@ -911,7 +921,7 @@ void pushbackDataCheckpointing(
 
     // snapshot state, throw away the operator, then restore and verify that 
we still match
     // main-input elements to the side-inputs that we sent earlier
-    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
     testHarness = harnessFactory.create();
 
@@ -1009,7 +1019,7 @@ public void onEventTime(OnTimerContext context) {
         emptyIterable());
 
     // snapshot and restore
-    final OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    final OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
     testHarness.close();
 
     testHarness = createTestHarness(windowingStrategy, fn, inputCoder, 
outputCoder, outputTag,
@@ -1046,7 +1056,8 @@ public void onEventTime(OnTimerContext context) {
         new HashMap<>(), /* side-input mapping */
         Collections.emptyList(), /* side inputs */
         PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-        VarIntCoder.of() /* key coder */);
+        VarIntCoder.of() /* key coder */,
+        keySelector);
 
     return new KeyedOneInputStreamOperatorTestHarness<>(doFnOperator, 
keySelector, keyCoderInfo);
   }
@@ -1088,6 +1099,7 @@ public void finishBundle(FinishBundleContext context) {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             options,
+            null,
             null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, 
WindowedValue<String>> testHarness =
@@ -1100,7 +1112,7 @@ public void finishBundle(FinishBundleContext context) {
     testHarness.processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow("c")));
 
     // draw a snapshot
-    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
     // There is a finishBundle in snapshot()
     // Elements will be buffered as part of finishing a bundle in snapshot()
@@ -1126,6 +1138,7 @@ public void finishBundle(FinishBundleContext context) {
             new HashMap<>(), /* side-input mapping */
             Collections.emptyList(), /* side inputs */
             options,
+            null,
             null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, 
WindowedValue<String>> newHarness =
@@ -1211,21 +1224,6 @@ public void processElement(ProcessContext c) throws 
Exception {
     return WindowedValue.of(value, timestamp, window, PaneInfo.NO_FIRING);
   }
 
-
-  private static class DummyKeySelector implements KeySelector<RawUnionValue, 
String> {
-    @Override
-    public String getKey(RawUnionValue stringWindowedValue) throws Exception {
-      return "dummy_key";
-    }
-  }
-
-  private static class StringKeySelector implements 
KeySelector<WindowedValue<String>, String> {
-    @Override
-    public String getKey(WindowedValue<String> stringWindowedValue) throws 
Exception {
-      return stringWindowedValue.getValue();
-    }
-  }
-
   private interface TestHarnessFactory<T> {
     T create() throws Exception;
   }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index c1cb06f2c07..b04c76ebc6e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -36,6 +36,7 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -44,7 +45,6 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
@@ -355,7 +355,7 @@ public void close() {
       assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
 
       // draw a snapshot
-      OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+      OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
       // test that finalizeCheckpoint on CheckpointMark is called
       final ArrayList<Integer> finalizeList = new ArrayList<>();
@@ -477,7 +477,7 @@ public void testNullCheckpoint() throws Exception {
 
       testHarness.open();
 
-      OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+      OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
       UnboundedSourceWrapper<
           KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
index 5a3717dc145..8a90551abd1 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
@@ -55,8 +55,8 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -88,7 +88,7 @@ public void testRestore() throws Exception {
         
Item.builder().key(2L).timestamp(3L).value(77L).window(window).build().toStreamRecord());
 
     // create snapshot
-    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
     testHarness.close();
 
     // restore from the snapshot
@@ -148,7 +148,9 @@ public void testRestore() throws Exception {
         emptyMap(),
         emptyList(),
         PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-        VarLongCoder.of()
+        VarLongCoder.of(),
+        null /* key selector */
+
     );
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 110474)
    Time Spent: 5h  (was: 4h 50m)

> Update Flink Runner to Flink 1.5.0
> ----------------------------------
>
>                 Key: BEAM-3905
>                 URL: https://issues.apache.org/jira/browse/BEAM-3905
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to