http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2036f69..f638ddd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -317,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
        @Override
        public void acknowledgeCheckpoint(
                        long checkpointId,
-                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+                       CheckpointStateHandles checkpointStateHandles,
                        long synchronousDurationMillis, long 
asynchronousDurationMillis,
                        long bytesBufferedInAlignment, long 
alignmentDurationNanos) {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 430c6de..247edd6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -24,12 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
@@ -41,11 +43,12 @@ import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the 
operator to get
- * a {@link KeyedStateBackend}.
+ * a {@link AbstractKeyedStateBackend}.
  *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -53,7 +56,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
 
        // in case the operator creates one we store it here so that we
        // can snapshot its state
-       private KeyedStateBackend<?> keyedStateBackend = null;
+       private AbstractKeyedStateBackend<?> keyedStateBackend = null;
 
        // when we restore we keep the state here so that we can call restore
        // when the operator requests the keyed state backend
@@ -114,7 +117,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                                        final KeyGroupRange keyGroupRange = 
(KeyGroupRange) invocationOnMock.getArguments()[2];
 
                                        if(keyedStateBackend != null) {
-                                               keyedStateBackend.close();
+                                               keyedStateBackend.dispose();
                                        }
 
                                        if (restoredKeyedState == null) {
@@ -148,7 +151,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream,
 long, long)} ()}
+        *
         */
        @Override
        public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
@@ -159,7 +162,9 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                CheckpointStreamFactory.CheckpointStateOutputStream outStream =
                                
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
 
-               operator.snapshotState(outStream, checkpointId, timestamp);
+               if (operator instanceof StreamCheckpointedOperator) {
+                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
+               }
 
                if (keyedStateBackend != null) {
                        RunnableFuture<KeyGroupsStateHandle> 
keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
@@ -180,17 +185,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)}
 ()}
+        * 
         */
        @Override
        public void restore(StreamStateHandle snapshot) throws Exception {
-               FSDataInputStream inStream = snapshot.openInputStream();
-               operator.restoreState(inStream);
+               try (FSDataInputStream inStream = snapshot.openInputStream()) {
+
+                       if (operator instanceof StreamCheckpointedOperator) {
+                               ((StreamCheckpointedOperator) 
operator).restoreState(inStream);
+                       }
 
-               byte keyedStatePresent = (byte) inStream.read();
-               if (keyedStatePresent == 1) {
-                       ObjectInputStream ois = new ObjectInputStream(inStream);
-                       this.restoredKeyedState = (KeyGroupsStateHandle) 
ois.readObject();
+                       byte keyedStatePresent = (byte) inStream.read();
+                       if (keyedStatePresent == 1) {
+                               ObjectInputStream ois = new 
ObjectInputStream(inStream);
+                               this.restoredKeyedState = 
(KeyGroupsStateHandle) ois.readObject();
+                       }
                }
        }
 
@@ -200,7 +209,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
        public void close() throws Exception {
                super.close();
                if(keyedStateBackend != null) {
-                       keyedStateBackend.close();
+                       keyedStateBackend.dispose();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index acf046a..d6f46fd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -39,7 +40,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -204,14 +204,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(FSDataOutputStream,
 long, long)} ()}
+        *
         */
        public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
                CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
                                new JobID(),
                                
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-               operator.snapshotState(outStream, checkpointId, timestamp);
-               return outStream.closeAndGetHandle();
+               if(operator instanceof StreamCheckpointedOperator) {
+                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
+                       return outStream.closeAndGetHandle();
+               } else {
+                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
+               }
        }
 
        /**
@@ -222,10 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)}
 ()}
+        *
         */
        public void restore(StreamStateHandle snapshot) throws Exception {
-               operator.restoreState(snapshot.openInputStream());
+               if(operator instanceof StreamCheckpointedOperator) {
+                       try (FSDataInputStream in = snapshot.openInputStream()) 
{
+                               ((StreamCheckpointedOperator) 
operator).restoreState(in);
+                       }
+               } else {
+                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index c12bcb9..5874f56 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -35,6 +36,8 @@ import 
org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.junit.Assert;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 
@@ -180,7 +183,7 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
         */
        private static class OnceFailingIdentityMapFunction
                        extends RichMapFunction<Tuple2<Integer, Long>, 
Tuple2<Integer, Long>> 
-                       implements Checkpointed<Long> {
+                       implements ListCheckpointed<Long> {
 
                private static volatile boolean hasFailed = false;
 
@@ -211,15 +214,16 @@ public class UdfStreamOperatorCheckpointingITCase extends 
StreamFaultToleranceTe
                        return value;
                }
 
-
                @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
+               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+                       return Collections.singletonList(count);
                }
 
                @Override
-               public void restoreState(Long state) {
-                       count = state;
+               public void restoreState(List<Long> state) throws Exception {
+                       if(!state.isEmpty()) {
+                               count = state.get(0);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 694f006..2a635ab 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -21,17 +21,18 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.junit.Test;
@@ -66,7 +67,7 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                                @Override
                                public void open(Configuration parameters) 
throws Exception {
                                        super.open(parameters);
-                                       
getRuntimeContext().getKeyValueState("test", String.class, "");
+                                       getRuntimeContext().getState(new 
ValueStateDescriptor<Integer>("Test", Integer.class, 0));
                                }
 
                                @Override
@@ -99,7 +100,8 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
-               public <K> KeyedStateBackend<K> 
createKeyedStateBackend(Environment env,
+               public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                               Environment env,
                                JobID jobID,
                                String operatorIdentifier,
                                TypeSerializer<K> keySerializer,
@@ -110,7 +112,8 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
-               public <K> KeyedStateBackend<K> 
restoreKeyedStateBackend(Environment env,
+               public <K> AbstractKeyedStateBackend<K> 
restoreKeyedStateBackend(
+                               Environment env,
                                JobID jobID,
                                String operatorIdentifier,
                                TypeSerializer<K> keySerializer,

Reply via email to