[FLINK-4744] [streaming api] Followup: Unify names for operator state access 
methods and comments.

Also make JavaSerializer package private, as it is not intended for user as a 
proper TypeSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a42f95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a42f95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a42f95

Branch: refs/heads/master
Commit: 10a42f951c5143537c28a0f9df65627e5c632c4b
Parents: 56cba7e
Author: Stephan Ewen <[email protected]>
Authored: Wed Oct 5 15:30:26 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../api/common/state/OperatorStateStore.java    |  61 ++++++++++
 .../java/typeutils/runtime/JavaSerializer.java  | 119 ------------------
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../flink/runtime/state/JavaSerializer.java     | 122 +++++++++++++++++++
 .../runtime/state/OperatorStateBackend.java     |   2 +
 .../flink/runtime/state/OperatorStateStore.java |  60 ---------
 .../runtime/state/OperatorStateBackendTest.java |  15 ++-
 .../kafka/FlinkKafkaConsumerBase.java           |   6 +-
 .../kafka/FlinkKafkaProducerBase.java           |   2 +-
 .../kafka/AtLeastOnceProducerTest.java          |   1 -
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  14 +--
 .../api/checkpoint/CheckpointedFunction.java    |   4 +-
 .../operators/AbstractUdfStreamOperator.java    |   6 +-
 .../operators/StreamCheckpointedOperator.java   |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   4 +-
 16 files changed, 217 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
new file mode 100644
index 0000000..03c11f6
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Interface for a backend that manages operator state.
+ */
+public interface OperatorStateStore {
+
+       String DEFAULT_OPERATOR_STATE_NAME = "_default_";
+
+       /**
+        * Creates a state descriptor of the given name that uses Java 
serialization to persist the
+        * state.
+        * 
+        * <p>This is a simple convenience method. For more flexibility on how 
state serialization
+        * should happen, use the {@link 
#getOperatorState(ListStateDescriptor)} method.
+        *
+        * @param stateName The name of state to create
+        * @return A list state using Java serialization to serialize state 
objects.
+        * @throws Exception
+        */
+       ListState<Serializable> getSerializableListState(String stateName) 
throws Exception;
+
+       /**
+        * Creates (or restores) a list state. Each state is registered under a 
unique name.
+        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
+        *
+        * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
+        * @param <S> The generic type of the state
+        * 
+        * @return A list for all state partitions.
+        * @throws Exception
+        */
+       <S> ListState<S> getOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
+
+       /**
+        * Returns a set with the names of all currently registered states.
+        * @return set of names for all registered states.
+        */
+       Set<String> getRegisteredStateNames();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
deleted file mode 100644
index 3af7653..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final ClassLoader userClassLoader;
-
-       public JavaSerializer() {
-               this(Thread.currentThread().getContextClassLoader());
-       }
-
-       public JavaSerializer(ClassLoader userClassLoader) {
-               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
-       }
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public TypeSerializer<T> duplicate() {
-               return this;
-       }
-
-       @Override
-       public T createInstance() {
-               return null;
-       }
-
-       @Override
-       public T copy(T from) {
-
-               try {
-                       return InstantiationUtil.clone(from);
-               } catch (IOException | ClassNotFoundException e) {
-                       throw new RuntimeException("Could not copy instance of 
" + from + '.', e);
-               }
-       }
-
-       @Override
-       public T copy(T from, T reuse) {
-               return copy(from);
-       }
-
-       @Override
-       public int getLength() {
-               return 0;
-       }
-
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               InstantiationUtil.serializeObject(new 
DataOutputViewStream(target), record);
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               try {
-                       return InstantiationUtil.deserializeObject(new 
DataInputViewStream(source), userClassLoader);
-               } catch (ClassNotFoundException e) {
-                       throw new IOException("Could not deserialize object.", 
e);
-               }
-       }
-
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               return deserialize(source);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               int size = source.readInt();
-               target.writeInt(size);
-               target.write(source, size);
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj instanceof JavaSerializer && 
userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof JavaSerializer;
-       }
-
-       @Override
-       public int hashCode() {
-               return getClass().hashCode();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index af97a3f..b1ab7e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -74,15 +74,15 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        @Override
-       public ListState<Serializable> getDefaultPartitionableState(String 
stateName) throws Exception {
-               return getPartitionableState(new 
ListStateDescriptor<>(stateName, javaSerializer));
+       public ListState<Serializable> getSerializableListState(String 
stateName) throws Exception {
+               return getOperatorState(new ListStateDescriptor<>(stateName, 
javaSerializer));
        }
 
        /**
         * @see OperatorStateStore
         */
        @Override
-       public <S> ListState<S> getPartitionableState(
+       public <S> ListState<S> getOperatorState(
                        ListStateDescriptor<S> stateDescriptor) throws 
IOException {
 
                Preconditions.checkNotNull(stateDescriptor);

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
new file mode 100644
index 0000000..2eb9595
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+@Internal
+final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+       private final ClassLoader userClassLoader;
+
+       public JavaSerializer() {
+               this(Thread.currentThread().getContextClassLoader());
+       }
+
+       public JavaSerializer(ClassLoader userClassLoader) {
+               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return this;
+       }
+
+       @Override
+       public T createInstance() {
+               return null;
+       }
+
+       @Override
+       public T copy(T from) {
+
+               try {
+                       return InstantiationUtil.clone(from);
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException("Could not copy instance of 
" + from + '.', e);
+               }
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return 0;
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               InstantiationUtil.serializeObject(new 
DataOutputViewStream(target), record);
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               try {
+                       return InstantiationUtil.deserializeObject(new 
DataInputViewStream(source), userClassLoader);
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Could not deserialize object.", 
e);
+               }
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int size = source.readInt();
+               target.writeInt(size);
+               target.write(source, size);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj instanceof JavaSerializer && 
userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof JavaSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 4e980b7..83e6369 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.state.OperatorStateStore;
+
 import java.io.Closeable;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
deleted file mode 100644
index ceab87f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
-
-import java.io.Serializable;
-import java.util.Set;
-
-/**
- * Interface for a backend that manages partitionable operator state.
- */
-public interface OperatorStateStore {
-
-       String DEFAULT_OPERATOR_STATE_NAME = "";
-
-       /**
-        * Creates a satte descriptor of the given name that uses {@link 
JavaSerializer}.
-        *
-        * @param stateName The name of state to create
-        * @return A state descriptor that uses {@link JavaSerializer}
-        * @throws Exception
-        */
-       ListState<Serializable> getDefaultPartitionableState(String stateName) 
throws Exception;
-
-       /**
-        * Creates (or restores) the partitionable state in this backend. Each 
state is registered under a unique name.
-        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
-        *
-        * @param stateDescriptor The descriptr for this state, providing a 
name and serializer
-        * @param <S> The generic type of the state
-        * @return A list for all state partitions.
-        * @throws Exception
-        */
-       <S> ListState<S> getPartitionableState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
-
-       /**
-        * Returns a set with the names of all currently registered states.
-        * @return set of names for all registered states.
-        */
-       Set<String> getRegisteredStateNames();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ff1a23d..2db8735 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
@@ -61,7 +60,7 @@ public class OperatorStateBackendTest {
                OperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
-               ListState<Serializable> listState1 = 
operatorStateBackend.getPartitionableState(stateDescriptor1);
+               ListState<Serializable> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
                assertNotNull(listState1);
                assertEquals(1, 
operatorStateBackend.getRegisteredStateNames().size());
                Iterator<Serializable> it = listState1.get().iterator();
@@ -74,7 +73,7 @@ public class OperatorStateBackendTest {
                assertEquals(4711, it.next());
                assertTrue(!it.hasNext());
 
-               ListState<Serializable> listState2 = 
operatorStateBackend.getPartitionableState(stateDescriptor2);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
                assertNotNull(listState2);
                assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
                assertTrue(!it.hasNext());
@@ -88,7 +87,7 @@ public class OperatorStateBackendTest {
                assertEquals(23, it.next());
                assertTrue(!it.hasNext());
 
-               ListState<Serializable> listState1b = 
operatorStateBackend.getPartitionableState(stateDescriptor1);
+               ListState<Serializable> listState1b = 
operatorStateBackend.getOperatorState(stateDescriptor1);
                assertNotNull(listState1b);
                listState1b.add(123);
                it = listState1b.get().iterator();
@@ -115,8 +114,8 @@ public class OperatorStateBackendTest {
                OperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
-               ListState<Serializable> listState1 = 
operatorStateBackend.getPartitionableState(stateDescriptor1);
-               ListState<Serializable> listState2 = 
operatorStateBackend.getPartitionableState(stateDescriptor2);
+               ListState<Serializable> listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+               ListState<Serializable> listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
 
                listState1.add(42);
                listState1.add(4711);
@@ -137,8 +136,8 @@ public class OperatorStateBackendTest {
 
                        assertEquals(0, 
operatorStateBackend.getRegisteredStateNames().size());
 
-                       listState1 = 
operatorStateBackend.getPartitionableState(stateDescriptor1);
-                       listState2 = 
operatorStateBackend.getPartitionableState(stateDescriptor2);
+                       listState1 = 
operatorStateBackend.getOperatorState(stateDescriptor1);
+                       listState2 = 
operatorStateBackend.getOperatorState(stateDescriptor2);
 
                        assertEquals(2, 
operatorStateBackend.getRegisteredStateNames().size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index a30341b..8d63345 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -313,7 +313,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                this.stateStore = stateStore;
 
                ListState<Serializable> offsets =
-                               
stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+                               
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                restoreToOffset = new HashMap<>();
 
@@ -333,7 +333,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                } else {
 
                        ListState<Serializable> listState =
-                                       
stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+                                       
stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
                        listState.clear();
 
                        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 8b87004..f0975dc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index 766a107..d2d7fca 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorStateStore;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 45b45f0..373d6ab 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -22,7 +22,7 @@ import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -98,7 +98,7 @@ public class FlinkKafkaConsumerBaseTest {
                FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
new LinkedMap(), false);
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
-               
when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
                consumer.prepareSnapshot(17L, 17L);
 
@@ -121,10 +121,10 @@ public class FlinkKafkaConsumerBaseTest {
 
                FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
 
-               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(expectedState);
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(expectedState);
                consumer.initializeState(operatorStateStore);
 
-               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
 
                consumer.prepareSnapshot(17L, 17L);
 
@@ -153,7 +153,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                TestingListState<Serializable> listState = new 
TestingListState<>();
-               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
 
                consumer.initializeState(operatorStateStore);
                consumer.prepareSnapshot(17L, 17L);
@@ -190,7 +190,7 @@ public class FlinkKafkaConsumerBaseTest {
                TestingListState<Serializable> listState2 = new 
TestingListState<>();
                TestingListState<Serializable> listState3 = new 
TestingListState<>();
 
-               
when(backend.getDefaultPartitionableState(Matchers.any(String.class))).
+               
when(backend.getSerializableListState(Matchers.any(String.class))).
                                thenReturn(listState1, listState1, listState2, 
listState3);
 
                consumer.initializeState(backend);
@@ -252,7 +252,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
                TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
-               
when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
                // create 500 snapshots
                for (int i = 100; i < 600; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 2227201..777cb91 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 
 /**
  *
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.OperatorStateStore;
  * repartitionable state that needs to be checkpointed. Methods from this 
interface are called upon checkpointing and
  * restoring of state.
  *
- * On #initializeState the implementing class receives the {@link 
org.apache.flink.runtime.state.OperatorStateStore}
+ * On #initializeState the implementing class receives the {@link 
OperatorStateStore}
  * to store it's state. At least before each snapshot, all state persistent 
state must be stored in the state store.
  *
  * When the backend is received for initialization, the user registers states 
with the backend via

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 428442d..72f30b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -108,7 +108,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                        ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
 
                        ListState<Serializable> listState = 
getOperatorStateBackend().
-                                       
getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+                                       
getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                        List<Serializable> list = new ArrayList<>();
 
@@ -202,7 +202,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                                        ((ListCheckpointed<Serializable>) 
userFunction).snapshotState(checkpointId, timestamp);
 
                        ListState<Serializable> listState = 
getOperatorStateBackend().
-                                       
getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+                                       
getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                        listState.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index 50cdc02..d2f7e0d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 @Deprecated
@@ -45,8 +44,8 @@ public interface StreamCheckpointedOperator {
         * This method restores the operator state (if the operator is 
stateful) and the key/value state
         * (if it had been used and was initialized when the snapshot occurred).
         *
-        * <p>This method is called after {@link #setup(StreamTask, 
StreamConfig, Output)}
-        * and before {@link #open()}.
+        * <p>This method is called after {@link 
StreamOperator#setup(StreamTask, StreamConfig, Output)}
+        * and before {@link StreamOperator#open()}.
         *
         * @param in The stream from which we have to restore our state.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 88c3ba4..9802a16 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -85,7 +85,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
  *
  * The life cycle of the task is set up as follows:
  * <pre>{@code
- *  -- getPartitionableState() -> restores state of all operators in the chain
+ *  -- getOperatorState() -> restores state of all operators in the chain
  *
  *  -- invoke()
  *        |

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 31ccc28..f6e7dca 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -559,7 +559,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                public void open() throws Exception {
                        super.open();
 
-                       ListState<Integer> partitionableState = 
getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+                       ListState<Integer> partitionableState = 
getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
 
                        if (numberSnapshotCalls == 0) {
                                for (Integer v : partitionableState.get()) {
@@ -582,7 +582,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                                long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception {
 
                        ListState<Integer> partitionableState =
-                                       
getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+                                       
getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
                        partitionableState.clear();
 
                        partitionableState.add(42);

Reply via email to