http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 258e30a..1e2d3ec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,16 +30,19 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -62,6 +65,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        final ExecutionConfig executionConfig;
        
        final Object checkpointLock;
+
+       StreamTask<?, ?> mockTask;
+
+       AbstractStateBackend stateBackend;
        
        
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) {
@@ -71,26 +78,33 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                this.executionConfig = new ExecutionConfig();
                this.checkpointLock = new Object();
 
-               Environment env = new MockEnvironment("MockTwoInputTask", 3 * 
1024 * 1024, new MockInputSplitProvider(), 1024);
-               StreamTask<?, ?> mockTask = mock(StreamTask.class);
+               final Environment env = new MockEnvironment("MockTwoInputTask", 
3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+               mockTask = mock(StreamTask.class);
                when(mockTask.getName()).thenReturn("Mock Task");
                when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
                when(mockTask.getEnvironment()).thenReturn(env);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-               
-               // ugly Java generic hacks
-               @SuppressWarnings("unchecked")
-               OngoingStubbing<StateBackend<?>> stubbing = 
-                               (OngoingStubbing<StateBackend<?>>) 
(OngoingStubbing<?>) when(mockTask.getStateBackend());
-               stubbing.thenReturn(MemoryStateBackend.defaultInstance());
 
-               operator.setup(mockTask, config, new MockOutput());
+               try {
+                       doAnswer(new Answer<AbstractStateBackend>() {
+                               @Override
+                               public AbstractStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                       final String operatorIdentifier = 
(String) invocationOnMock.getArguments()[0];
+                                       final TypeSerializer<?> keySerializer = 
(TypeSerializer<?>) invocationOnMock.getArguments()[1];
+                                       MemoryStateBackend backend = 
MemoryStateBackend.create();
+                                       backend.initializeForJob(env, 
operatorIdentifier, keySerializer);
+                                       return backend;
+                               }
+                       }).when(mockTask).createStateBackend(any(String.class), 
any(TypeSerializer.class));
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
        }
 
        public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, 
TypeInformation<K> keyType) {
                ClosureCleaner.clean(keySelector, false);
-               config.setStatePartitioner(keySelector);
+               config.setStatePartitioner(0, keySelector);
                
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
        }
        
@@ -107,6 +121,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
         * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}
         */
        public void open() throws Exception {
+               operator.setup(mockTask, config, new MockOutput());
+
                operator.open();
        }
 
@@ -119,13 +135,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        public void processElement(StreamRecord<IN> element) throws Exception {
-               operator.setKeyContextElement(element);
+               operator.setKeyContextElement1(element);
                operator.processElement(element);
        }
 
        public void processElements(Collection<StreamRecord<IN>> elements) 
throws Exception {
                for (StreamRecord<IN> element: elements) {
-                       operator.setKeyContextElement(element);
+                       operator.setKeyContextElement1(element);
                        operator.processElement(element);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index c586db3..e23673a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -76,12 +76,6 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, 
OUT> {
                when(mockTask.getEnvironment()).thenReturn(env);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 
-               // ugly Java generic hacks
-               @SuppressWarnings("unchecked")
-               OngoingStubbing<StateBackend<?>> stubbing =
-                               (OngoingStubbing<StateBackend<?>>) 
(OngoingStubbing<?>) when(mockTask.getStateBackend());
-               stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
                operator.setup(mockTask, new StreamConfig(new Configuration()), 
new MockOutput());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 69147f6..29bf5da 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.io.{FileInputFormat, 
InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.runtime.state.AbstractStateBackend
+import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaEnv}
 import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -211,7 +212,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * program can be executed highly available and strongly consistent 
(assuming that Flink
    * is run in high-availability mode).
    */
-  def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = {
+  def setStateBackend(backend: AbstractStateBackend): 
StreamExecutionEnvironment = {
     javaEnv.setStateBackend(backend)
     this
   }
@@ -219,7 +220,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Returns the state backend that defines how to store and checkpoint state.
    */
-  def getStateBackend: StateBackend[_] = javaEnv.getStateBackend()
+  def getStateBackend: AbstractStateBackend = javaEnv.getStateBackend()
   
   /**
    * Sets the number of times that failed tasks are re-executed. A value of 
zero

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index d66cfdb..dc49173 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.api.scala.function
 
 import org.apache.flink.api.common.functions.RichFunction
+import org.apache.flink.api.common.state.OperatorState
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.state.OperatorState
 
 /**
  * Trait implementing the functionality necessary to apply stateful functions 
in 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 304dcb5..18c1b3c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -383,7 +383,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichEventTimeSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointNotifier
+                       implements Checkpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 81e8f0a..7a1a879 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -451,7 +451,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichEventTimeSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointNotifier
+                       implements Checkpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 42b6230..387421e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -29,6 +29,9 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -190,14 +193,17 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
 
                private static Map<Integer, Long> allCounts = new 
ConcurrentHashMap<Integer, Long>();
 
+               private ValueStateDescriptor<Long> bCountsId = new 
ValueStateDescriptor<>("b", 0L,
+                               LongSerializer.INSTANCE);
+
                private OperatorState<NonSerializableLong> aCounts;
-               private OperatorState<Long> bCounts;
+               private ValueState<Long> bCounts;
 
                @Override
                public void open(Configuration parameters) throws IOException {
                        aCounts = getRuntimeContext().getKeyValueState(
                                        "a", NonSerializableLong.class, 
NonSerializableLong.of(0L));
-                       bCounts = getRuntimeContext().getKeyValueState("b", 
Long.class, 0L);
+                       bCounts = 
getRuntimeContext().getPartitionedState(bCountsId);
                }
 
                @Override
@@ -224,6 +230,22 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
                public static NonSerializableLong of(long value) {
                        return new NonSerializableLong(value);
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) return true;
+                       if (o == null || getClass() != o.getClass()) return 
false;
+
+                       NonSerializableLong that = (NonSerializableLong) o;
+
+                       return value.equals(that.value);
+
+               }
+
+               @Override
+               public int hashCode() {
+                       return value.hashCode();
+               }
        }
        
        public static class IdentityKeySelector<T> implements KeySelector<T, T> 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 4e5e1b5..46c0453 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -44,7 +44,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.filesystem.AbstractFileState;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
@@ -53,7 +53,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -365,7 +365,7 @@ public class SavepointITCase extends TestLogger {
                                for (StreamTaskState taskState : 
taskStateList.getState(
                                                
ClassLoader.getSystemClassLoader())) {
 
-                                       AbstractFileState fsState = 
(AbstractFileState) taskState.getFunctionState();
+                                       AbstractFileStateHandle fsState = 
(AbstractFileStateHandle) taskState.getFunctionState();
                                        checkpointFiles.add(new 
File(fsState.getFilePath().toUri()));
                                }
                        }
@@ -660,7 +660,7 @@ public class SavepointITCase extends TestLogger {
                                for (StreamTaskState taskState : 
taskStateList.getState(
                                                
ClassLoader.getSystemClassLoader())) {
 
-                                       AbstractFileState fsState = 
(AbstractFileState) taskState.getFunctionState();
+                                       AbstractFileStateHandle fsState = 
(AbstractFileStateHandle) taskState.getFunctionState();
                                        checkpointFiles.add(new 
File(fsState.getFilePath().toUri()));
                                }
                        }
@@ -784,7 +784,7 @@ public class SavepointITCase extends TestLogger {
        }
 
        private static class InfiniteTestSource
-                       implements SourceFunction<Integer>, CheckpointNotifier {
+                       implements SourceFunction<Integer>, CheckpointListener {
 
                private static final long serialVersionUID = 1L;
                private volatile boolean running = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index d7c06f6..962fe84 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -21,9 +21,9 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue;
  * A simple test that runs a streaming topology with checkpointing enabled.
  *
  * The test triggers a failure after a while and verifies that, after 
completion, the
- * state defined with either the {@link OperatorState} or the {@link 
Checkpointed}
+ * state defined with either the {@link ValueState} or the {@link Checkpointed}
  * interface reflects the "exactly once" semantics.
  * 
  * The test throttles the input until at least two checkpoints are completed, 
to make sure that
@@ -295,7 +295,7 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
        }
        
        private static class OnceFailingAggregator extends 
RichFlatMapFunction<PrefixCount, PrefixCount> 
-               implements Checkpointed<HashMap<String, PrefixCount>>, 
CheckpointNotifier {
+               implements Checkpointed<HashMap<String, PrefixCount>>, 
CheckpointListener {
 
                static boolean wasCheckpointedBeforeFailure = false;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 22f61b7..5fa0666 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Integration test for the {@link CheckpointNotifier} interface. The test 
ensures that
- * {@link CheckpointNotifier#notifyCheckpointComplete(long)} is called for 
completed
+ * Integration test for the {@link CheckpointListener} interface. The test 
ensures that
+ * {@link CheckpointListener#notifyCheckpointComplete(long)} is called for 
completed
  * checkpoints, that it is called at most once for any checkpoint id and that 
it is not
  * called for a deliberately failed checkpoint.
  *
@@ -66,7 +66,7 @@ import static org.junit.Assert.fail;
  *
  * <p>
  * Note that as a result of doing the checks on the task level there is no way 
to verify
- * that the {@link CheckpointNotifier#notifyCheckpointComplete(long)} is 
called for every
+ * that the {@link CheckpointListener#notifyCheckpointComplete(long)} is 
called for every
  * successfully completed checkpoint.
  */
 @SuppressWarnings("serial")
@@ -197,11 +197,11 @@ public class StreamCheckpointNotifierITCase {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Generates some Long values and as an implementation for the {@link 
CheckpointNotifier}
+        * Generates some Long values and as an implementation for the {@link 
CheckpointListener}
         * interface it stores all the checkpoint ids it has seen in a static 
list.
         */
        private static class GeneratingSourceFunction extends 
RichSourceFunction<Long>
-                       implements ParallelSourceFunction<Long>, 
CheckpointNotifier, Checkpointed<Integer> {
+                       implements ParallelSourceFunction<Long>, 
CheckpointListener, Checkpointed<Integer> {
                
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
                
@@ -285,10 +285,10 @@ public class StreamCheckpointNotifierITCase {
 
        /**
         * Identity transform on Long values wrapping the output in a tuple. As 
an implementation
-        * for the {@link CheckpointNotifier} interface it stores all the 
checkpoint ids it has seen in a static list.
+        * for the {@link CheckpointListener} interface it stores all the 
checkpoint ids it has seen in a static list.
         */
        private static class IdentityMapFunction extends RichMapFunction<Long, 
Tuple1<Long>>
-                       implements CheckpointNotifier {
+                       implements CheckpointListener {
 
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
 
@@ -316,10 +316,10 @@ public class StreamCheckpointNotifierITCase {
 
        /**
         * Filter on Long values supposedly letting all values through. As an 
implementation
-        * for the {@link CheckpointNotifier} interface it stores all the 
checkpoint ids
+        * for the {@link CheckpointListener} interface it stores all the 
checkpoint ids
         * it has seen in a static list.
         */
-       private static class LongRichFilterFunction extends 
RichFilterFunction<Long> implements CheckpointNotifier {
+       private static class LongRichFilterFunction extends 
RichFilterFunction<Long> implements CheckpointListener {
 
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
                
@@ -347,11 +347,11 @@ public class StreamCheckpointNotifierITCase {
 
        /**
         * CoFlatMap on Long values as identity transform on the left input, 
while ignoring the right.
-        * As an implementation for the {@link CheckpointNotifier} interface it 
stores all the checkpoint
+        * As an implementation for the {@link CheckpointListener} interface it 
stores all the checkpoint
         * ids it has seen in a static list.
         */
        private static class LeftIdentityCoRichFlatMapFunction extends 
RichCoFlatMapFunction<Long, Long, Long>
-                       implements CheckpointNotifier {
+                       implements CheckpointListener {
 
                static final List<Long>[] completedCheckpoints = 
createCheckpointLists(PARALLELISM);
 
@@ -386,7 +386,7 @@ public class StreamCheckpointNotifierITCase {
         * Reducer that causes one failure between seeing 40% to 70% of the 
records.
         */
        private static class OnceFailingReducer extends 
RichReduceFunction<Tuple1<Long>> 
-               implements Checkpointed<Long>, CheckpointNotifier
+               implements Checkpointed<Long>, CheckpointListener
        {
                static volatile boolean hasFailed = false;
                static volatile long failureCheckpointID;

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 500d7d3..8d59975 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -26,8 +26,8 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -335,7 +335,7 @@ public class WindowCheckpointingITCase extends TestLogger {
        // 
------------------------------------------------------------------------
 
        private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointNotifier
+                       implements Checkpointed<Integer>, CheckpointListener
        {
                private static volatile boolean failedBefore = false;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index cda5a7b..bc7cebb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.classloading.jar;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -83,7 +83,7 @@ public class CheckpointedStreamingProgram {
                }
        }
 
-       public static class StatefulMapper implements MapFunction<String, 
String>, Checkpointed<StatefulMapper>, CheckpointNotifier {
+       public static class StatefulMapper implements MapFunction<String, 
String>, Checkpointed<StatefulMapper>, CheckpointListener {
 
                private String someState;
                private boolean atLeastOneSnapshotComplete = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index acc8569..6ae0d46 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -378,7 +378,7 @@ public class ChaosMonkeyITCase extends TestLogger {
        }
 
        public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
-                       implements Checkpointed<Long>, CheckpointNotifier {
+                       implements Checkpointed<Long>, CheckpointListener {
 
                private static final long serialVersionUID = 0L;
 
@@ -448,7 +448,7 @@ public class ChaosMonkeyITCase extends TestLogger {
        }
 
        public static class CountingSink extends RichSinkFunction<Long>
-                       implements Checkpointed<CountingSink>, 
CheckpointNotifier {
+                       implements Checkpointed<CountingSink>, 
CheckpointListener {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index cc4998d..737d39a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -505,7 +505,7 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
         * are exhausted.
         */
        public static class CountingSink implements SinkFunction<Long>, 
Checkpointed<CountingSink>,
-                       CheckpointNotifier {
+               CheckpointListener {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
 

Reply via email to