http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 5ef7896..d5594e6 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -17,56 +17,32 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import static org.apache.beam.runners.core.TimerInternals.TimerData;
 
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.joda.time.Instant;
 
 /**
@@ -76,26 +52,13 @@ import org.joda.time.Instant;
  * @param <OutputT>
  */
 public class WindowDoFnOperator<K, InputT, OutputT>
-    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, 
WindowedValue<KV<K, OutputT>>>
-    implements Triggerable {
-
-  private final TimerInternals.TimerDataCoder timerCoder;
-
-  private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> 
watermarkTimers;
-  private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> 
watermarkTimersQueue;
-
-  private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> 
processingTimeTimersQueue;
-  private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> 
processingTimeTimers;
-  private transient Multiset<Long> processingTimeTimerTimestamps;
-  private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
-
-  private transient FlinkTimerInternals timerInternals;
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, 
WindowedValue<KV<K, OutputT>>> {
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> 
systemReduceFn;
 
   public WindowDoFnOperator(
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
-      TypeInformation<WindowedValue<KeyedWorkItem<K, InputT>>> inputType,
+      Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
       TupleTag<KV<K, OutputT>> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
@@ -106,7 +69,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       Coder<K> keyCoder) {
     super(
         null,
-        inputType,
+        inputCoder,
         mainOutputTag,
         sideOutputTags,
         outputManagerFactory,
@@ -118,8 +81,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
     this.systemReduceFn = systemReduceFn;
 
-    this.timerCoder =
-        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+
   }
 
   @Override
@@ -151,139 +113,17 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     return doFn;
   }
 
-
-  @Override
-  public void open() throws Exception {
-
-    // might already be initialized from restoreTimers()
-    if (watermarkTimers == null) {
-      watermarkTimers = new HashSet<>();
-
-      watermarkTimersQueue = new PriorityQueue<>(
-          10,
-          new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
-            @Override
-            public int compare(
-                Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
-                Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
-              return o1.f1.compareTo(o2.f1);
-            }
-          });
-    }
-
-    if (processingTimeTimers == null) {
-      processingTimeTimers = new HashSet<>();
-      processingTimeTimerTimestamps = HashMultiset.create();
-      processingTimeTimersQueue = new PriorityQueue<>(
-          10,
-          new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
-            @Override
-            public int compare(
-                Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
-                Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
-              return o1.f1.compareTo(o2.f1);
-            }
-          });
-    }
-
-    // ScheduledFutures are not checkpointed
-    processingTimeTimerFutures = new HashMap<>();
-
-    timerInternals = new FlinkTimerInternals();
-
-    // call super at the end because this will call getDoFn() which requires 
stateInternals
-    // to be set
-    super.open();
-  }
-
   @Override
   protected ExecutionContext.StepContext createStepContext() {
     return new WindowDoFnOperator.StepContext();
   }
 
-  private void registerEventTimeTimer(TimerInternals.TimerData timer) {
-    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
-    if (watermarkTimers.add(keyedTimer)) {
-      watermarkTimersQueue.add(keyedTimer);
-    }
-  }
-
-  private void deleteEventTimeTimer(TimerInternals.TimerData timer) {
-    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
-    if (watermarkTimers.remove(keyedTimer)) {
-      watermarkTimersQueue.remove(keyedTimer);
-    }
-  }
-
-  private void registerProcessingTimeTimer(TimerInternals.TimerData timer) {
-    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
-    if (processingTimeTimers.add(keyedTimer)) {
-      processingTimeTimersQueue.add(keyedTimer);
-
-      // If this is the first timer added for this timestamp register a timer 
Task
-      if (processingTimeTimerTimestamps.add(timer.getTimestamp().getMillis(), 
1) == 0) {
-        ScheduledFuture<?> scheduledFuture = 
registerTimer(timer.getTimestamp().getMillis(), this);
-        processingTimeTimerFutures.put(timer.getTimestamp().getMillis(), 
scheduledFuture);
-      }
-    }
-  }
-
-  private void deleteProcessingTimeTimer(TimerInternals.TimerData timer) {
-    Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-        new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer);
-    if (processingTimeTimers.remove(keyedTimer)) {
-      processingTimeTimersQueue.remove(keyedTimer);
-
-      // If there are no timers left for this timestamp, remove it from queue 
and cancel the
-      // timer Task
-      if 
(processingTimeTimerTimestamps.remove(timer.getTimestamp().getMillis(), 1) == 
1) {
-        ScheduledFuture<?> triggerTaskFuture =
-            
processingTimeTimerFutures.remove(timer.getTimestamp().getMillis());
-        if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) {
-          triggerTaskFuture.cancel(false);
-        }
-      }
-
-    }
-  }
-
   @Override
-  public void trigger(long time) throws Exception {
-
-    //Remove information about the triggering task
-    processingTimeTimerFutures.remove(time);
-    processingTimeTimerTimestamps.setCount(time, 0);
-
-    boolean fire;
-
-    do {
-      Tuple2<ByteBuffer, TimerInternals.TimerData> timer = 
processingTimeTimersQueue.peek();
-      if (timer != null && timer.f1.getTimestamp().getMillis() <= time) {
-        fire = true;
-
-        processingTimeTimersQueue.remove();
-        processingTimeTimers.remove(timer);
-
-        setKeyContext(timer.f0);
-
-        pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-            KeyedWorkItems.<K, InputT>timersWorkItem(
-                (K) stateInternals.getKey(),
-                Collections.singletonList(timer.f1))));
-
-      } else {
-        fire = false;
-      }
-    } while (fire);
-
-  }
-
-  @Override
-  public void processWatermark(Watermark mark) throws Exception {
-    processWatermark1(mark);
+  public void fireTimer(InternalTimer<?, TimerData> timer) {
+    pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+        KeyedWorkItems.<K, InputT>timersWorkItem(
+            (K) stateInternals.getKey(),
+            Collections.singletonList(timer.getNamespace()))));
   }
 
   @Override
@@ -295,27 +135,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     // hold back by the pushed back values waiting for side inputs
     long actualInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
 
-    boolean fire;
-
-    do {
-      Tuple2<ByteBuffer, TimerInternals.TimerData> timer = 
watermarkTimersQueue.peek();
-      if (timer != null && timer.f1.getTimestamp().getMillis() < 
actualInputWatermark) {
-        fire = true;
-
-        watermarkTimersQueue.remove();
-        watermarkTimers.remove(timer);
-
-        setKeyContext(timer.f0);
-
-        pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
-                KeyedWorkItems.<K, InputT>timersWorkItem(
-                    (K) stateInternals.getKey(),
-                    Collections.singletonList(timer.f1))));
-
-      } else {
-        fire = false;
-      }
-    } while (fire);
+    timerService.advanceWatermark(actualInputWatermark);
 
     Instant watermarkHold = stateInternals.watermarkHold();
 
@@ -331,122 +151,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
   }
 
-  @Override
-  public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-    StreamTaskState result = super.snapshotOperatorState(checkpointId, 
timestamp);
-
-    AbstractStateBackend.CheckpointStateOutputView outputView =
-        getStateBackend().createCheckpointStateOutputView(checkpointId, 
timestamp);
-
-    snapshotTimers(outputView);
-
-    StateHandle<DataInputView> handle = outputView.closeAndGetHandle();
-
-    // this might overwrite stuff that super checkpointed
-    result.setOperatorState(handle);
-
-    return result;
-  }
-
-  @Override
-  public void restoreState(StreamTaskState state) throws Exception {
-    super.restoreState(state);
-
-    @SuppressWarnings("unchecked")
-    StateHandle<DataInputView> operatorState =
-        (StateHandle<DataInputView>) state.getOperatorState();
-
-    DataInputView in = operatorState.getState(getUserCodeClassloader());
-
-    restoreTimers(new DataInputViewWrapper(in));
-  }
-
-  private void restoreTimers(InputStream in) throws IOException {
-    DataInputStream dataIn = new DataInputStream(in);
-
-    int numWatermarkTimers = dataIn.readInt();
-
-    watermarkTimers = new HashSet<>(numWatermarkTimers);
-
-    watermarkTimersQueue = new PriorityQueue<>(
-        Math.max(numWatermarkTimers, 1),
-        new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
-          @Override
-          public int compare(
-                  Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
-                  Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
-            return o1.f1.compareTo(o2.f1);
-          }
-        });
-
-    for (int i = 0; i < numWatermarkTimers; i++) {
-      int length = dataIn.readInt();
-      byte[] keyBytes = new byte[length];
-      dataIn.readFully(keyBytes);
-      TimerInternals.TimerData timerData = timerCoder.decode(dataIn, 
Coder.Context.NESTED);
-      Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-          new Tuple2<>(ByteBuffer.wrap(keyBytes), timerData);
-      if (watermarkTimers.add(keyedTimer)) {
-        watermarkTimersQueue.add(keyedTimer);
-      }
-    }
-
-    int numProcessingTimeTimers = dataIn.readInt();
-
-    processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
-    processingTimeTimersQueue = new PriorityQueue<>(
-        Math.max(numProcessingTimeTimers, 1),
-        new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() {
-          @Override
-          public int compare(
-              Tuple2<ByteBuffer, TimerInternals.TimerData> o1,
-              Tuple2<ByteBuffer, TimerInternals.TimerData> o2) {
-            return o1.f1.compareTo(o2.f1);
-          }
-        });
-
-    processingTimeTimerTimestamps = HashMultiset.create();
-    processingTimeTimerFutures = new HashMap<>();
-
-    for (int i = 0; i < numProcessingTimeTimers; i++) {
-      int length = dataIn.readInt();
-      byte[] keyBytes = new byte[length];
-      dataIn.readFully(keyBytes);
-      TimerInternals.TimerData timerData = timerCoder.decode(dataIn, 
Coder.Context.NESTED);
-      Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer =
-          new Tuple2<>(ByteBuffer.wrap(keyBytes), timerData);
-      if (processingTimeTimers.add(keyedTimer)) {
-        processingTimeTimersQueue.add(keyedTimer);
-
-        //If this is the first timer added for this timestamp register a timer 
Task
-        if 
(processingTimeTimerTimestamps.add(timerData.getTimestamp().getMillis(), 1) == 
0) {
-          // this registers a timer with the Flink processing-time service
-          ScheduledFuture<?> scheduledFuture =
-              registerTimer(timerData.getTimestamp().getMillis(), this);
-          processingTimeTimerFutures.put(timerData.getTimestamp().getMillis(), 
scheduledFuture);
-        }
-
-      }
-    }
-  }
-
-  private void snapshotTimers(OutputStream out) throws IOException {
-    DataOutputStream dataOut = new DataOutputStream(out);
-    dataOut.writeInt(watermarkTimersQueue.size());
-    for (Tuple2<ByteBuffer, TimerInternals.TimerData> timer : 
watermarkTimersQueue) {
-      dataOut.writeInt(timer.f0.limit());
-      dataOut.write(timer.f0.array(), 0, timer.f0.limit());
-      timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
-    }
-
-    dataOut.writeInt(processingTimeTimersQueue.size());
-    for (Tuple2<ByteBuffer, TimerInternals.TimerData> timer : 
processingTimeTimersQueue) {
-      dataOut.writeInt(timer.f0.limit());
-      dataOut.write(timer.f0.array(), 0, timer.f0.limit());
-      timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED);
-    }
-  }
-
   /**
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does 
now allow
    * accessing state or timer internals.
@@ -459,75 +163,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     }
   }
 
-  private class FlinkTimerInternals implements TimerInternals {
 
-    @Override
-    public void setTimer(
-            StateNamespace namespace, String timerId, Instant target, 
TimeDomain timeDomain) {
-      throw new UnsupportedOperationException("Setting a timer by ID is not 
yet supported.");
-    }
-
-    @Deprecated
-    @Override
-    public void setTimer(TimerData timerKey) {
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        registerEventTimeTimer(timerKey);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        registerProcessingTimeTimer(timerKey);
-      } else {
-        throw new UnsupportedOperationException(
-                "Unsupported time domain: " + timerKey.getDomain());
-      }
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-      throw new UnsupportedOperationException(
-              "Canceling of a timer by ID is not yet supported.");
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
-      throw new UnsupportedOperationException(
-          "Canceling of a timer by ID is not yet supported.");
-    }
-
-    @Deprecated
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-        deleteEventTimeTimer(timerKey);
-      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-        deleteProcessingTimeTimer(timerKey);
-      } else {
-        throw new UnsupportedOperationException(
-                "Unsupported time domain: " + timerKey.getDomain());
-      }
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return new Instant(getCurrentProcessingTime());
-    }
-
-    @Nullable
-    @Override
-    public Instant currentSynchronizedProcessingTime() {
-      return new Instant(getCurrentProcessingTime());
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return new Instant(Math.min(currentInputWatermark, 
getPushbackWatermarkHold()));
-    }
-
-    @Nullable
-    @Override
-    public Instant currentOutputWatermarkTime() {
-      return new Instant(currentOutputWatermark);
-    }
-
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 909cb0e..820a9bd 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -82,10 +81,6 @@ public class BoundedSourceWrapper<OutputT>
 
   @Override
   public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-      throw new RuntimeException(
-          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
-    }
 
     // figure out which split sources we're responsible for
     int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 68746b2..237e5a3 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -43,10 +43,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
     OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements Triggerable, StoppableFunction, Checkpointed<byte[]>, 
CheckpointListener {
+    implements ProcessingTimeCallback, StoppableFunction, 
Checkpointed<byte[]>, CheckpointListener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -85,7 +84,7 @@ public class UnboundedSourceWrapper<
 
   /**
    * The local split readers. Assigned at runtime when the wrapper is executed 
in parallel.
-   * Make it a field so that we can access it in {@link #trigger(long)} for
+   * Make it a field so that we can access it in {@link 
#onProcessingTime(long)} for
    * emitting watermarks.
    */
   private transient List<UnboundedSource.UnboundedReader<OutputT>> 
localReaders;
@@ -98,16 +97,16 @@ public class UnboundedSourceWrapper<
   private volatile boolean isRunning = true;
 
   /**
-   * Make it a field so that we can access it in {@link #trigger(long)} for 
registering new
+   * Make it a field so that we can access it in {@link 
#onProcessingTime(long)} for registering new
    * triggers.
    */
   private transient StreamingRuntimeContext runtimeContext;
 
   /**
-   * Make it a field so that we can access it in {@link #trigger(long)} for 
emitting
+   * Make it a field so that we can access it in {@link 
#onProcessingTime(long)} for emitting
    * watermarks.
    */
-  private transient 
StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context;
+  private transient SourceContext<WindowedValue<OutputT>> context;
 
   /**
    * Pending checkpoints which have not been acknowledged yet.
@@ -218,12 +217,8 @@ public class UnboundedSourceWrapper<
 
   @Override
   public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-      throw new RuntimeException(
-          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
-    }
 
-    context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) 
ctx;
+    context = ctx;
 
     if (localReaders.size() == 0) {
       // do nothing, but still look busy ...
@@ -405,7 +400,7 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
-  public void trigger(long timestamp) throws Exception {
+  public void onProcessingTime(long timestamp) throws Exception {
     if (this.isRunning) {
       synchronized (context.getCheckpointLock()) {
         // find minimum watermark over all localReaders
@@ -426,7 +421,7 @@ public class UnboundedSourceWrapper<
     if (this.isRunning) {
       long watermarkInterval =  
runtime.getExecutionConfig().getAutoWatermarkInterval();
       long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
-      runtime.registerTimer(timeToNextWatermark, this);
+      runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, 
this);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
new file mode 100644
index 0000000..bcc3660
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend}
+ * to manage the broadcast state.
+ * The state is the same on all parallel instances of the operator.
+ * So we just need store state of operator-0 in OperatorStateBackend.
+ *
+ * <p>Note: Ignore index of key.
+ * Mainly for SideInputs.
+ */
+public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
+
+  private int indexInSubtaskGroup;
+  private final DefaultOperatorStateBackend stateBackend;
+  // stateName -> <namespace, state>
+  private Map<String, Map<String, ?>> stateForNonZeroOperator;
+
+  public FlinkBroadcastStateInternals(int indexInSubtaskGroup, 
OperatorStateBackend stateBackend) {
+    //TODO flink do not yet expose through public API
+    this.stateBackend = (DefaultOperatorStateBackend) stateBackend;
+    this.indexInSubtaskGroup = indexInSubtaskGroup;
+    if (indexInSubtaskGroup != 0) {
+      stateForNonZeroOperator = new HashMap<>();
+    }
+  }
+
+  @Override
+  public K getKey() {
+    return null;
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address) {
+
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address,
+      final StateContext<?> context) {
+
+    return address.bind(new StateTag.StateBinder<K>() {
+
+      @Override
+      public <T> ValueState<T> bindValue(
+          StateTag<? super K, ValueState<T>> address,
+          Coder<T> coder) {
+
+        return new FlinkBroadcastValueState<>(stateBackend, address, 
namespace, coder);
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          StateTag<? super K, BagState<T>> address,
+          Coder<T> elemCoder) {
+
+        return new FlinkBroadcastBagState<>(stateBackend, address, namespace, 
elemCoder);
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(
+          StateTag<? super K, SetState<T>> address,
+          Coder<T> elemCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
SetState.class.getSimpleName()));
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          StateTag<? super K, MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
MapState.class.getSimpleName()));
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      bindCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+
+        return new FlinkAccumulatorCombiningState<>(
+            stateBackend, address, combineFn, namespace, accumCoder);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        return new FlinkKeyedAccumulatorCombiningState<>(
+            stateBackend,
+            address,
+            combineFn,
+            namespace,
+            accumCoder,
+            FlinkBroadcastStateInternals.this);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<
+              ? super K, InputT, AccumT, OutputT> combineFn) {
+        return new FlinkAccumulatorCombiningStateWithContext<>(
+            stateBackend,
+            address,
+            combineFn,
+            namespace,
+            accumCoder,
+            FlinkBroadcastStateInternals.this,
+            CombineContextFactory.createFromStateContext(context));
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
+         throw new UnsupportedOperationException(
+             String.format("%s is not supported", 
WatermarkHoldState.class.getSimpleName()));
+      }
+    });
+  }
+
+  /**
+   * 1. The way we would use it is to only checkpoint anything from the 
operator
+   * with subtask index 0 because we assume that the state is the same on all
+   * parallel instances of the operator.
+   *
+   * <p>2. Use map to support namespace.
+   */
+  private abstract class AbstractBroadcastState<T> {
+
+    private String name;
+    private final StateNamespace namespace;
+    private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
+    private final DefaultOperatorStateBackend flinkStateBackend;
+
+    AbstractBroadcastState(
+        DefaultOperatorStateBackend flinkStateBackend,
+        String name,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      this.name = name;
+
+      this.namespace = namespace;
+      this.flinkStateBackend = flinkStateBackend;
+
+      CoderTypeInformation<Map<String, T>> typeInfo =
+          new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder));
+
+      flinkStateDescriptor = new ListStateDescriptor<>(name,
+          typeInfo.createSerializer(new ExecutionConfig()));
+    }
+
+    /**
+     * Get map(namespce->T) from index 0.
+     */
+    Map<String, T> getMap() throws Exception {
+      if (indexInSubtaskGroup == 0) {
+        return getMapFromBroadcastState();
+      } else {
+        Map<String, T> result = (Map<String, T>) 
stateForNonZeroOperator.get(name);
+        // maybe restore from BroadcastState of Operator-0
+        if (result == null) {
+          result = getMapFromBroadcastState();
+          if (result != null) {
+            stateForNonZeroOperator.put(name, result);
+            // we don't need it anymore, must clear it.
+            flinkStateBackend.getBroadcastOperatorState(
+                flinkStateDescriptor).clear();
+          }
+        }
+        return result;
+      }
+    }
+
+    Map<String, T> getMapFromBroadcastState() throws Exception {
+      ListState<Map<String, T>> state = 
flinkStateBackend.getBroadcastOperatorState(
+          flinkStateDescriptor);
+      Iterable<Map<String, T>> iterable = state.get();
+      Map<String, T> ret = null;
+      if (iterable != null) {
+        // just use index 0
+        Iterator<Map<String, T>> iterator = iterable.iterator();
+        if (iterator.hasNext()) {
+          ret = iterator.next();
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Update map(namespce->T) from index 0.
+     */
+    void updateMap(Map<String, T> map) throws Exception {
+      if (indexInSubtaskGroup == 0) {
+        ListState<Map<String, T>> state = 
flinkStateBackend.getBroadcastOperatorState(
+            flinkStateDescriptor);
+        state.clear();
+        if (map.size() > 0) {
+          state.add(map);
+        }
+      } else {
+        if (map.size() == 0) {
+          stateForNonZeroOperator.remove(name);
+          // updateMap is always behind getMap,
+          // getMap will clear map in BroadcastOperatorState,
+          // we don't need clear here.
+        } else {
+          stateForNonZeroOperator.put(name, map);
+        }
+      }
+    }
+
+    void writeInternal(T input) {
+      try {
+        Map<String, T> map = getMap();
+        if (map == null) {
+          map = new HashMap<>();
+        }
+        map.put(namespace.stringKey(), input);
+        updateMap(map);
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    T readInternal() {
+      try {
+        Map<String, T> map = getMap();
+        if (map == null) {
+          return null;
+        } else {
+          return map.get(namespace.stringKey());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    void clearInternal() {
+      try {
+        Map<String, T> map = getMap();
+        if (map != null) {
+          map.remove(namespace.stringKey());
+          updateMap(map);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+  }
+
+  private class FlinkBroadcastValueState<K, T>
+      extends AbstractBroadcastState<T> implements ValueState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, ValueState<T>> address;
+
+    FlinkBroadcastValueState(
+        DefaultOperatorStateBackend flinkStateBackend,
+        StateTag<? super K, ValueState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      super(flinkStateBackend, address.getId(), namespace, coder);
+
+      this.namespace = namespace;
+      this.address = address;
+
+    }
+
+    @Override
+    public void write(T input) {
+      writeInternal(input);
+    }
+
+    @Override
+    public ValueState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public T read() {
+      return readInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkBroadcastValueState<?, ?> that = (FlinkBroadcastValueState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+  }
+
+  private class FlinkBroadcastBagState<K, T> extends 
AbstractBroadcastState<List<T>>
+      implements BagState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, BagState<T>> address;
+
+    FlinkBroadcastBagState(
+        DefaultOperatorStateBackend flinkStateBackend,
+        StateTag<? super K, BagState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      super(flinkStateBackend, address.getId(), namespace, 
ListCoder.of(coder));
+
+      this.namespace = namespace;
+      this.address = address;
+    }
+
+    @Override
+    public void add(T input) {
+      List<T> list = readInternal();
+      if (list == null) {
+        list = new ArrayList<>();
+      }
+      list.add(input);
+      writeInternal(list);
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      List<T> result = readInternal();
+      return result != null ? result : Collections.<T>emptyList();
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            List<T> result = readInternal();
+            return result == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkBroadcastBagState<?, ?> that = (FlinkBroadcastBagState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private class FlinkAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+      extends AbstractBroadcastState<AccumT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+    FlinkAccumulatorCombiningState(
+        DefaultOperatorStateBackend flinkStateBackend,
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      AccumT current = readInternal();
+      if (current == null) {
+        current = combineFn.createAccumulator();
+      }
+      current = combineFn.addInput(current, value);
+      writeInternal(current);
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      AccumT current = readInternal();
+
+      if (current == null) {
+        writeInternal(accum);
+      } else {
+        current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
+        writeInternal(current);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      return readInternal();
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT read() {
+      AccumT accum = readInternal();
+      if (accum != null) {
+        return combineFn.extractOutput(accum);
+      } else {
+        return combineFn.extractOutput(combineFn.createAccumulator());
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return readInternal() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
+          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+      extends AbstractBroadcastState<AccumT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn;
+    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
+
+    FlinkKeyedAccumulatorCombiningState(
+        DefaultOperatorStateBackend flinkStateBackend,
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder,
+        FlinkBroadcastStateInternals<K> flinkStateInternals) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+      this.flinkStateInternals = flinkStateInternals;
+
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        AccumT current = readInternal();
+        if (current == null) {
+          current = combineFn.createAccumulator(flinkStateInternals.getKey());
+        }
+        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value);
+        writeInternal(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state." , e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+        AccumT current = readInternal();
+        if (current == null) {
+          writeInternal(accum);
+        } else {
+          current = combineFn.mergeAccumulators(
+              flinkStateInternals.getKey(),
+              Arrays.asList(current, accum));
+          writeInternal(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        return readInternal();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        AccumT accum = readInternal();
+        if (accum != null) {
+          return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
+        } else {
+          return combineFn.extractOutput(
+              flinkStateInternals.getKey(),
+              combineFn.createAccumulator(flinkStateInternals.getKey()));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return readInternal() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private class FlinkAccumulatorCombiningStateWithContext<K, InputT, AccumT, 
OutputT>
+      extends AbstractBroadcastState<AccumT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
+    private final CombineWithContext.KeyedCombineFnWithContext<
+        ? super K, InputT, AccumT, OutputT> combineFn;
+    private final FlinkBroadcastStateInternals<K> flinkStateInternals;
+    private final CombineWithContext.Context context;
+
+    FlinkAccumulatorCombiningStateWithContext(
+        DefaultOperatorStateBackend flinkStateBackend,
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+        CombineWithContext.KeyedCombineFnWithContext<
+            ? super K, InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder,
+        FlinkBroadcastStateInternals<K> flinkStateInternals,
+        CombineWithContext.Context context) {
+      super(flinkStateBackend, address.getId(), namespace, accumCoder);
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+      this.flinkStateInternals = flinkStateInternals;
+      this.context = context;
+
+    }
+
+    @Override
+    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        AccumT current = readInternal();
+        if (current == null) {
+          current = combineFn.createAccumulator(flinkStateInternals.getKey(), 
context);
+        }
+        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value, context);
+        writeInternal(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state." , e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+
+        AccumT current = readInternal();
+        if (current == null) {
+          writeInternal(accum);
+        } else {
+          current = combineFn.mergeAccumulators(
+              flinkStateInternals.getKey(),
+              Arrays.asList(current, accum),
+              context);
+          writeInternal(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        return readInternal();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators, context);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        AccumT accum = readInternal();
+        return combineFn.extractOutput(flinkStateInternals.getKey(), accum, 
context);
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return readInternal() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
new file mode 100644
index 0000000..a29b1b2
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * {@link StateInternals} that uses {@link KeyGroupCheckpointedOperator}
+ * to checkpoint state.
+ *
+ * <p>Note:
+ * Ignore index of key.
+ * Just implement BagState.
+ *
+ * <p>Reference from {@link HeapInternalTimerService} to the local key-group 
range.
+ */
+public class FlinkKeyGroupStateInternals<K> implements StateInternals<K> {
+
+  private final Coder<K> keyCoder;
+  private final KeyGroupsList localKeyGroupRange;
+  private KeyedStateBackend keyedStateBackend;
+  private final int localKeyGroupRangeStartIdx;
+
+  // stateName -> namespace -> (valueCoder, value)
+  private final Map<String, Tuple2<Coder<?>, Map<String, ?>>>[] stateTables;
+
+  public FlinkKeyGroupStateInternals(
+      Coder<K> keyCoder,
+      KeyedStateBackend keyedStateBackend) {
+    this.keyCoder = keyCoder;
+    this.keyedStateBackend = keyedStateBackend;
+    this.localKeyGroupRange = keyedStateBackend.getKeyGroupRange();
+    // find the starting index of the local key-group range
+    int startIdx = Integer.MAX_VALUE;
+    for (Integer keyGroupIdx : localKeyGroupRange) {
+      startIdx = Math.min(keyGroupIdx, startIdx);
+    }
+    this.localKeyGroupRangeStartIdx = startIdx;
+    stateTables = (Map<String, Tuple2<Coder<?>, Map<String, ?>>>[])
+        new Map[localKeyGroupRange.getNumberOfKeyGroups()];
+    for (int i = 0; i < stateTables.length; i++) {
+      stateTables[i] = new HashMap<>();
+    }
+  }
+
+  @Override
+  public K getKey() {
+    ByteBuffer keyBytes = (ByteBuffer) keyedStateBackend.getCurrentKey();
+    try {
+      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+    } catch (CoderException e) {
+      throw new RuntimeException("Error decoding key.", e);
+    }
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address) {
+
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address,
+      final StateContext<?> context) {
+
+    return address.bind(new StateTag.StateBinder<K>() {
+
+      @Override
+      public <T> ValueState<T> bindValue(
+          StateTag<? super K, ValueState<T>> address,
+          Coder<T> coder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
ValueState.class.getSimpleName()));
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          StateTag<? super K, BagState<T>> address,
+          Coder<T> elemCoder) {
+
+        return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(
+          StateTag<? super K, SetState<T>> address,
+          Coder<T> elemCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
SetState.class.getSimpleName()));
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          StateTag<? super K, MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
MapState.class.getSimpleName()));
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT>
+      bindCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        throw new UnsupportedOperationException("bindKeyedCombiningValue is 
not supported.");
+
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<
+              ? super K, InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException(
+            "bindKeyedCombiningValueWithContext is not supported.");
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
AccumulatorCombiningState.class.getSimpleName()));
+      }
+    });
+  }
+
+  /**
+   * Reference from {@link Combine.CombineFn}.
+   *
+   * <p>Accumulators are stored in each KeyGroup, call addInput() when a 
element comes,
+   * call extractOutput() to produce the desired value when need to read data.
+   */
+  interface KeyGroupCombiner<InputT, AccumT, OutputT> {
+
+    /**
+     * Returns a new, mutable accumulator value, representing the accumulation
+     * of zero input values.
+     */
+    AccumT createAccumulator();
+
+    /**
+     * Adds the given input value to the given accumulator, returning the
+     * new accumulator value.
+     */
+    AccumT addInput(AccumT accumulator, InputT input);
+
+    /**
+     * Returns the output value that is the result of all accumulators from 
KeyGroups
+     * that are assigned to this operator.
+     */
+    OutputT extractOutput(Iterable<AccumT> accumulators);
+  }
+
+  private abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> {
+
+    private String stateName;
+    private String namespace;
+    private Coder<AccumT> coder;
+    private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner;
+
+    AbstractKeyGroupState(
+        String stateName,
+        String namespace,
+        Coder<AccumT> coder,
+        KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) {
+      this.stateName = stateName;
+      this.namespace = namespace;
+      this.coder = coder;
+      this.keyGroupCombiner = keyGroupCombiner;
+    }
+
+    /**
+     * Choose keyGroup of input and addInput to accumulator.
+     */
+    void addInput(InputT input) {
+      int keyGroupIdx = keyedStateBackend.getCurrentKeyGroupIndex();
+      int localIdx = getIndexForKeyGroup(keyGroupIdx);
+      Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = 
stateTables[localIdx];
+      Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
+      if (tuple2 == null) {
+        tuple2 = new Tuple2<>();
+        tuple2.f0 = coder;
+        tuple2.f1 = new HashMap<>();
+        stateTable.put(stateName, tuple2);
+      }
+      Map<String, AccumT> map = (Map<String, AccumT>) tuple2.f1;
+      AccumT accumulator = map.get(namespace);
+      if (accumulator == null) {
+        accumulator = keyGroupCombiner.createAccumulator();
+      }
+      accumulator = keyGroupCombiner.addInput(accumulator, input);
+      map.put(namespace, accumulator);
+    }
+
+    /**
+     * Get all accumulators and invoke extractOutput().
+     */
+    OutputT extractOutput() {
+      List<AccumT> accumulators = new ArrayList<>(stateTables.length);
+      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
+        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
+        if (tuple2 != null) {
+          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
+          if (accumulator != null) {
+            accumulators.add(accumulator);
+          }
+        }
+      }
+      return keyGroupCombiner.extractOutput(accumulators);
+    }
+
+    /**
+     * Find the first accumulator and return immediately.
+     */
+    boolean isEmptyInternal() {
+      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
+        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
+        if (tuple2 != null) {
+          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
+          if (accumulator != null) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Clear accumulators and clean empty map.
+     */
+    void clearInternal() {
+      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
+        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
+        if (tuple2 != null) {
+          tuple2.f1.remove(namespace);
+          if (tuple2.f1.size() == 0) {
+            stateTable.remove(stateName);
+          }
+        }
+      }
+    }
+
+  }
+
+  private int getIndexForKeyGroup(int keyGroupIdx) {
+    checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+        "Key Group " + keyGroupIdx + " does not belong to the local range.");
+    return keyGroupIdx - this.localKeyGroupRangeStartIdx;
+  }
+
+  private class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, List<T>, 
Iterable<T>> {
+
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public Iterable<T> extractOutput(Iterable<List<T>> accumulators) {
+      List<T> result = new ArrayList<>();
+      // maybe can return an unmodifiable view.
+      for (List<T> list : accumulators) {
+        result.addAll(list);
+      }
+      return result;
+    }
+  }
+
+  private class FlinkKeyGroupBagState<T> extends AbstractKeyGroupState<T, 
List<T>, Iterable<T>>
+      implements BagState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, BagState<T>> address;
+
+    FlinkKeyGroupBagState(
+        StateTag<? super K, BagState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      super(address.getId(), namespace.stringKey(), ListCoder.of(coder),
+          new KeyGroupBagCombiner<T>());
+      this.namespace = namespace;
+      this.address = address;
+    }
+
+    @Override
+    public void add(T input) {
+      addInput(input);
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      Iterable<T> result = extractOutput();
+      return result != null ? result : Collections.<T>emptyList();
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return isEmptyInternal();
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      clearInternal();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkKeyGroupBagState<?> that = (FlinkKeyGroupBagState<?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  /**
+   * Snapshots the state {@code (stateName -> (valueCoder && (namespace -> 
value)))} for a given
+   * {@code keyGroupIdx}.
+   *
+   * @param keyGroupIdx the id of the key-group to be put in the snapshot.
+   * @param out the stream to write to.
+   */
+  public void snapshotKeyGroupState(int keyGroupIdx, DataOutputStream out) 
throws Exception {
+    int localIdx = getIndexForKeyGroup(keyGroupIdx);
+    Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = 
stateTables[localIdx];
+    Preconditions.checkState(stateTable.size() <= Short.MAX_VALUE,
+        "Too many States: " + stateTable.size() + ". Currently at most "
+            + Short.MAX_VALUE + " states are supported");
+    out.writeShort(stateTable.size());
+    for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : 
stateTable.entrySet()) {
+      out.writeUTF(entry.getKey());
+      Coder coder = entry.getValue().f0;
+      InstantiationUtil.serializeObject(out, coder);
+      Map<String, ?> map = entry.getValue().f1;
+      out.writeInt(map.size());
+      for (Map.Entry<String, ?> entry1 : map.entrySet()) {
+        StringUtf8Coder.of().encode(entry1.getKey(), out, Context.NESTED);
+        coder.encode(entry1.getValue(), out, Context.NESTED);
+      }
+    }
+  }
+
+  /**
+   * Restore the state {@code (stateName -> (valueCoder && (namespace -> 
value)))}
+   * for a given {@code keyGroupIdx}.
+   *
+   * @param keyGroupIdx the id of the key-group to be put in the snapshot.
+   * @param in the stream to read from.
+   * @param userCodeClassLoader the class loader that will be used to 
deserialize
+   *                            the valueCoder.
+   */
+  public void restoreKeyGroupState(int keyGroupIdx, DataInputStream in,
+                                   ClassLoader userCodeClassLoader) throws 
Exception {
+    int localIdx = getIndexForKeyGroup(keyGroupIdx);
+    Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = 
stateTables[localIdx];
+    int numStates = in.readShort();
+    for (int i = 0; i < numStates; ++i) {
+      String stateName = in.readUTF();
+      Coder coder = InstantiationUtil.deserializeObject(in, 
userCodeClassLoader);
+      Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
+      if (tuple2 == null) {
+        tuple2 = new Tuple2<>();
+        tuple2.f0 = coder;
+        tuple2.f1 = new HashMap<>();
+        stateTable.put(stateName, tuple2);
+      }
+      Map<String, Object> map = (Map<String, Object>) tuple2.f1;
+      int mapSize = in.readInt();
+      for (int j = 0; j < mapSize; j++) {
+        String namespace = StringUtf8Coder.of().decode(in, Context.NESTED);
+        Object value = coder.decode(in, Context.NESTED);
+        map.put(namespace, value);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
new file mode 100644
index 0000000..d9e87d1
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import com.google.common.collect.Iterators;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
+ * to manage the split-distribute state.
+ *
+ * <p>Elements in ListState will be redistributed in round robin fashion
+ * to operators when restarting with a different parallelism.
+ *
+ *  <p>Note:
+ *  Ignore index of key and namespace.
+ *  Just implement BagState.
+ */
+public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+
+  private final OperatorStateBackend stateBackend;
+
+  public FlinkSplitStateInternals(OperatorStateBackend stateBackend) {
+    this.stateBackend = stateBackend;
+  }
+
+  @Override
+  public K getKey() {
+    return null;
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address) {
+
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address,
+      final StateContext<?> context) {
+
+    return address.bind(new StateTag.StateBinder<K>() {
+
+      @Override
+      public <T> ValueState<T> bindValue(
+          StateTag<? super K, ValueState<T>> address,
+          Coder<T> coder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
ValueState.class.getSimpleName()));
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          StateTag<? super K, BagState<T>> address,
+          Coder<T> elemCoder) {
+
+        return new FlinkSplitBagState<>(stateBackend, address, namespace, 
elemCoder);
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(
+          StateTag<? super K, SetState<T>> address,
+          Coder<T> elemCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
SetState.class.getSimpleName()));
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          StateTag<? super K, MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
MapState.class.getSimpleName()));
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      AccumulatorCombiningState<InputT, AccumT, OutputT>
+      bindCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        throw new UnsupportedOperationException("bindKeyedCombiningValue is 
not supported.");
+
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<
+              ? super K, InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException(
+            "bindKeyedCombiningValueWithContext is not supported.");
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
AccumulatorCombiningState.class.getSimpleName()));
+      }
+    });
+  }
+
+  private static class FlinkSplitBagState<K, T> implements BagState<T> {
+
+    private final ListStateDescriptor<T> descriptor;
+    private OperatorStateBackend flinkStateBackend;
+    private final StateNamespace namespace;
+    private final StateTag<? super K, BagState<T>> address;
+
+    FlinkSplitBagState(
+        OperatorStateBackend flinkStateBackend,
+        StateTag<? super K, BagState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      this.flinkStateBackend = flinkStateBackend;
+      this.namespace = namespace;
+      this.address = address;
+
+      CoderTypeInformation<T> typeInfo =
+          new CoderTypeInformation<>(coder);
+
+      descriptor = new ListStateDescriptor<>(address.getId(),
+          typeInfo.createSerializer(new ExecutionConfig()));
+    }
+
+    @Override
+    public void add(T input) {
+      try {
+        flinkStateBackend.getOperatorState(descriptor).add(input);
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      try {
+        Iterable<T> result = 
flinkStateBackend.getOperatorState(descriptor).get();
+        return result != null ? result : Collections.<T>emptyList();
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            Iterable<T> result = 
flinkStateBackend.getOperatorState(descriptor).get();
+            // PartitionableListState.get() return empty collection When there 
is no element,
+            // KeyedListState different. (return null)
+            return result == null || Iterators.size(result.iterator()) == 0;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getOperatorState(descriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+}

Reply via email to