Repository: flink
Updated Branches:
  refs/heads/master 7c9bc1e51 -> 10a42f951


[FLINK-4744] [streaming api] Introduce usercode class loader to deserialize 
partitionable operator state

This closes #2598


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56cba7ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56cba7ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56cba7ee

Branch: refs/heads/master
Commit: 56cba7ee9e00ef4b2493845558b614816b837d1b
Parents: 7c9bc1e
Author: Stefan Richter <[email protected]>
Authored: Wed Oct 5 10:35:13 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:12 2016 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/JavaSerializer.java  | 27 ++++++-----
 .../apache/flink/util/InstantiationUtil.java    | 12 -----
 .../runtime/state/AbstractStateBackend.java     |  4 +-
 .../state/DefaultOperatorStateBackend.java      | 16 ++++++-
 .../flink/runtime/state/OperatorStateStore.java | 13 ++++++
 .../state/RetrievableStreamStateHandle.java     |  2 +-
 .../zookeeper/ZooKeeperStateHandleStore.java    |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 19 +++++---
 .../jobmanager/JobManagerHARecoveryTest.java    |  7 ++-
 .../runtime/state/OperatorStateBackendTest.java | 15 +++++--
 .../kafka/FlinkKafkaConsumerBase.java           | 15 ++-----
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 47 ++++++++++----------
 .../api/checkpoint/ListCheckpointed.java        |  5 ---
 .../operators/AbstractUdfStreamOperator.java    | 10 ++---
 .../runtime/tasks/OneInputStreamTaskTest.java   |  8 ++--
 .../EventTimeWindowCheckpointingITCase.java     |  7 +--
 16 files changed, 118 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
index 4ae00d1..3af7653 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
@@ -22,16 +22,25 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
 
+       private final ClassLoader userClassLoader;
+
+       public JavaSerializer() {
+               this(Thread.currentThread().getContextClassLoader());
+       }
+
+       public JavaSerializer(ClassLoader userClassLoader) {
+               this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
+       }
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -69,21 +78,15 @@ public class JavaSerializer<T extends Serializable> extends 
TypeSerializer<T> {
 
        @Override
        public void serialize(T record, DataOutputView target) throws 
IOException {
-               ObjectOutputStream oos = new ObjectOutputStream(new 
DataOutputViewStream(target));
-               oos.writeObject(record);
-               oos.flush();
+               InstantiationUtil.serializeObject(new 
DataOutputViewStream(target), record);
        }
 
        @Override
        public T deserialize(DataInputView source) throws IOException {
-               ObjectInputStream ois = new ObjectInputStream(new 
DataInputViewStream(source));
-
                try {
-                       @SuppressWarnings("unchecked")
-                       T nfa = (T) ois.readObject();
-                       return nfa;
+                       return InstantiationUtil.deserializeObject(new 
DataInputViewStream(source), userClassLoader);
                } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not deserialize 
NFA.", e);
+                       throw new IOException("Could not deserialize object.", 
e);
                }
        }
 
@@ -101,7 +104,7 @@ public class JavaSerializer<T extends Serializable> extends 
TypeSerializer<T> {
 
        @Override
        public boolean equals(Object obj) {
-               return obj instanceof JavaSerializer && ((JavaSerializer<T>) 
obj).canEqual(this);
+               return obj instanceof JavaSerializer && 
userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index de4cffb..cd5c91a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -311,18 +311,6 @@ public final class InstantiationUtil {
                }
        }
 
-       @SuppressWarnings("unchecked")
-       public static <T> T deserializeObject(byte[] bytes) throws IOException, 
ClassNotFoundException {
-               ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
-               return deserializeObject(byteArrayInputStream);
-       }
-
-       @SuppressWarnings("unchecked")
-       public static <T> T deserializeObject(InputStream in) throws 
IOException, ClassNotFoundException {
-               ObjectInputStream objectInputStream = new ObjectInputStream(in);
-               return (T) objectInputStream.readObject();
-       }
-
        public static byte[] serializeObject(Object o) throws IOException {
                try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
                                ObjectOutputStream oos = new 
ObjectOutputStream(baos)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index c2e665b..c683a02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -83,7 +83,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        Environment env,
                        String operatorIdentifier
        ) throws Exception {
-               return new DefaultOperatorStateBackend();
+               return new 
DefaultOperatorStateBackend(env.getUserClassLoader());
        }
 
        /**
@@ -95,6 +95,6 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        String operatorIdentifier,
                        Collection<OperatorStateHandle> restoreSnapshots
        ) throws Exception {
-               return new DefaultOperatorStateBackend(restoreSnapshots);
+               return new 
DefaultOperatorStateBackend(env.getUserClassLoader(), restoreSnapshots);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 0bd5eeb..af97a3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -30,6 +31,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -46,6 +48,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        private final Map<String, PartitionableListState<?>> registeredStates;
        private final Collection<OperatorStateHandle> restoreSnapshots;
        private final ClosableRegistry closeStreamOnCancelRegistry;
+       private final JavaSerializer<Serializable> javaSerializer;
 
        /**
         * Restores a OperatorStateStore (lazily) using the provided snapshots.
@@ -53,7 +56,11 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
         * @param restoreSnapshots snapshots that are available to restore 
partitionable states on request.
         */
        public DefaultOperatorStateBackend(
+                       ClassLoader userClassLoader,
                        Collection<OperatorStateHandle> restoreSnapshots) {
+
+               Preconditions.checkNotNull(userClassLoader);
+               this.javaSerializer = new JavaSerializer<>(userClassLoader);
                this.restoreSnapshots = restoreSnapshots;
                this.registeredStates = new HashMap<>();
                this.closeStreamOnCancelRegistry = new ClosableRegistry();
@@ -62,8 +69,13 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        /**
         * Creates an empty OperatorStateStore.
         */
-       public DefaultOperatorStateBackend() {
-               this(null);
+       public DefaultOperatorStateBackend(ClassLoader userClassLoader) {
+               this(userClassLoader, null);
+       }
+
+       @Override
+       public ListState<Serializable> getDefaultPartitionableState(String 
stateName) throws Exception {
+               return getPartitionableState(new 
ListStateDescriptor<>(stateName, javaSerializer));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
index 6914a7c..ceab87f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 
+import java.io.Serializable;
 import java.util.Set;
 
 /**
@@ -28,6 +30,17 @@ import java.util.Set;
  */
 public interface OperatorStateStore {
 
+       String DEFAULT_OPERATOR_STATE_NAME = "";
+
+       /**
+        * Creates a satte descriptor of the given name that uses {@link 
JavaSerializer}.
+        *
+        * @param stateName The name of state to create
+        * @return A state descriptor that uses {@link JavaSerializer}
+        * @throws Exception
+        */
+       ListState<Serializable> getDefaultPartitionableState(String stateName) 
throws Exception;
+
        /**
         * Creates (or restores) the partitionable state in this backend. Each 
state is registered under a unique name.
         * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 9934382..29d21ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -55,7 +55,7 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
        @Override
        public T retrieveState() throws Exception {
                try (FSDataInputStream in = openInputStream()) {
-                       return InstantiationUtil.deserializeObject(in);
+                       return InstantiationUtil.deserializeObject(in, 
Thread.currentThread().getContextClassLoader());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index d62b13e..5623715 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -219,7 +219,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
                byte[] data = client.getData().forPath(pathInZooKeeper);
-               return InstantiationUtil.deserializeObject(data);
+               return InstantiationUtil.deserializeObject(data, 
Thread.currentThread().getContextClassLoader());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c39e436..5fb0e6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2478,14 +2478,19 @@ public class CheckpointCoordinatorTest {
                        for (int groupId : 
expectedHeadOpKeyGroupStateHandle.keyGroups()) {
                                long offset = 
expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                                inputStream.seek(offset);
-                               int expectedKeyGroupState = 
InstantiationUtil.deserializeObject(inputStream);
+                               int expectedKeyGroupState =
+                                               
InstantiationUtil.deserializeObject(inputStream, 
Thread.currentThread().getContextClassLoader());
                                for (KeyGroupsStateHandle 
oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
                                        if 
(oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
                                                long actualOffset = 
oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
-                                               try (FSDataInputStream 
actualInputStream =
-                                                                    
oneActualKeyGroupStateHandle.getStateHandle().openInputStream()) {
+                                               try (FSDataInputStream 
actualInputStream = oneActualKeyGroupStateHandle.
+                                                               
getStateHandle().openInputStream()) {
+
                                                        
actualInputStream.seek(actualOffset);
-                                                       int actualGroupState = 
InstantiationUtil.deserializeObject(actualInputStream);
+
+                                                       int actualGroupState = 
InstantiationUtil.
+                                                                       
deserializeObject(actualInputStream, 
Thread.currentThread().getContextClassLoader());
+
                                                        
assertEquals(expectedKeyGroupState, actualGroupState);
                                                }
                                        }
@@ -2506,7 +2511,8 @@ public class CheckpointCoordinatorTest {
                                        for (Map.Entry<String, long[]> entry : 
operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                                                for (long offset : 
entry.getValue()) {
                                                        in.seek(offset);
-                                                       Integer state = 
InstantiationUtil.deserializeObject(in);
+                                                       Integer state = 
InstantiationUtil.
+                                                                       
deserializeObject(in, Thread.currentThread().getContextClassLoader());
                                                        expectedResult.add(i + 
" : " + entry.getKey() + " : " + state);
                                                }
                                        }
@@ -2525,7 +2531,8 @@ public class CheckpointCoordinatorTest {
                                                        for (Map.Entry<String, 
long[]> entry : 
operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                                                                for (long 
offset : entry.getValue()) {
                                                                        
in.seek(offset);
-                                                                       Integer 
state = InstantiationUtil.deserializeObject(in);
+                                                                       Integer 
state = InstantiationUtil.
+                                                                               
        deserializeObject(in, Thread.currentThread().getContextClassLoader());
                                                                        
actualResult.add(i + " : " + entry.getKey() + " : " + state);
                                                                }
                                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 9b12cac..38231ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -452,7 +452,7 @@ public class JobManagerHARecoveryTest {
                        int subtaskIndex = getIndexInSubtaskGroup();
                        if (subtaskIndex < recoveredStates.length) {
                                try (FSDataInputStream in = 
chainedState.get(0).openInputStream()) {
-                                       recoveredStates[subtaskIndex] = 
InstantiationUtil.deserializeObject(in);
+                                       recoveredStates[subtaskIndex] = 
InstantiationUtil.deserializeObject(in, getUserCodeClassLoader());
                                }
                        }
                }
@@ -464,7 +464,10 @@ public class JobManagerHARecoveryTest {
                                                
InstantiationUtil.serializeObject(checkpointId));
 
                                RetrievableStreamStateHandle<Long> state = new 
RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
-                               ChainedStateHandle<StreamStateHandle> 
chainedStateHandle = new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+
+                               ChainedStateHandle<StreamStateHandle> 
chainedStateHandle =
+                                               new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+
                                CheckpointStateHandles checkpointStateHandles =
                                                new 
CheckpointStateHandles(chainedStateHandle, null, 
Collections.<KeyGroupsStateHandle>emptyList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 56c8987..ff1a23d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
 
@@ -31,13 +32,21 @@ import java.util.Iterator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class OperatorStateBackendTest {
 
        AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(1024);
 
+       static Environment createMockEnvironment() {
+               Environment env = mock(Environment.class);
+               
when(env.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
+               return env;
+       }
+
        private OperatorStateBackend createNewOperatorStateBackend() throws 
Exception {
-               return abstractStateBackend.createOperatorStateBackend(null, 
"test-operator");
+               return 
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), 
"test-operator");
        }
 
        @Test
@@ -123,8 +132,8 @@ public class OperatorStateBackendTest {
 
                        operatorStateBackend.dispose();
 
-                       operatorStateBackend = abstractStateBackend.
-                                       restoreOperatorStateBackend(null, 
"testOperator", Collections.singletonList(stateHandle));
+                       operatorStateBackend = 
abstractStateBackend.restoreOperatorStateBackend(
+                                       createMockEnvironment(), 
"testOperator", Collections.singletonList(stateHandle));
 
                        assertEquals(0, 
operatorStateBackend.getRegisteredStateNames().size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 939b77b..a30341b 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -27,7 +26,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -67,8 +65,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                CheckpointedFunction {
        private static final long serialVersionUID = -6272159445203409112L;
 
-       private static final String KAFKA_OFFSETS = "kafka_offsets";
-
        protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
        
        /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
@@ -130,9 +126,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                checkArgument(topics.size() > 0, "You have to define at least 
one topic.");
 
                this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
-
-               TypeInformation<Tuple2<KafkaTopicPartition, Long>> typeInfo =
-                               TypeInformation.of(new 
TypeHint<Tuple2<KafkaTopicPartition, Long>>(){});
        }
 
        /**
@@ -314,13 +307,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        //  Checkpoint and restore
        // 
------------------------------------------------------------------------
 
-
        @Override
        public void initializeState(OperatorStateStore stateStore) throws 
Exception {
 
                this.stateStore = stateStore;
 
-               ListState<Serializable> offsets = 
stateStore.getPartitionableState(ListCheckpointed.DEFAULT_LIST_DESCRIPTOR);
+               ListState<Serializable> offsets =
+                               
stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                restoreToOffset = new HashMap<>();
 
@@ -339,8 +332,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        LOG.debug("storeOperatorState() called on closed 
source");
                } else {
 
-                       ListState<Serializable> listState = 
stateStore.getPartitionableState(ListCheckpointed.DEFAULT_LIST_DESCRIPTOR);
-
+                       ListState<Serializable> listState =
+                                       
stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
                        listState.clear();
 
                        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index fc8b7e9..45b45f0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.OperatorStateStore;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -34,6 +33,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,8 +47,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -115,31 +113,31 @@ public class FlinkKafkaConsumerBaseTest {
        public void checkRestoredCheckpointWhenFetcherNotReady() throws 
Exception {
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
 
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> 
expectedState = new TestingListState<>();
+               TestingListState<Serializable> expectedState = new 
TestingListState<>();
                expectedState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 
16768L));
                expectedState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 
987654321L));
 
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
+               TestingListState<Serializable> listState = new 
TestingListState<>();
 
                FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
 
-               
when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(expectedState);
+               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(expectedState);
                consumer.initializeState(operatorStateStore);
 
-               
when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
 
                consumer.prepareSnapshot(17L, 17L);
 
-               Set<Tuple2<KafkaTopicPartition, Long>> expected = new 
HashSet<Tuple2<KafkaTopicPartition, Long>>();
+               Set<Serializable> expected = new HashSet<>();
 
-               for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 : expectedState.get()) {
-                       expected.add(kafkaTopicPartitionLongTuple2);
+               for (Serializable serializable : expectedState.get()) {
+                       expected.add(serializable);
                }
 
                int counter = 0;
 
-               for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 : listState.get()) {
-                       
assertTrue(expected.contains(kafkaTopicPartitionLongTuple2));
+               for (Serializable serializable : listState.get()) {
+                       assertTrue(expected.contains(serializable));
                        counter++;
                }
 
@@ -154,8 +152,8 @@ public class FlinkKafkaConsumerBaseTest {
                FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
 
                OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
-               
when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+               TestingListState<Serializable> listState = new 
TestingListState<>();
+               
when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
 
                consumer.initializeState(operatorStateStore);
                consumer.prepareSnapshot(17L, 17L);
@@ -188,12 +186,12 @@ public class FlinkKafkaConsumerBaseTest {
 
                OperatorStateStore backend = mock(OperatorStateStore.class);
 
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState1 
= new TestingListState<>();
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState2 
= new TestingListState<>();
-               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState3 
= new TestingListState<>();
+               TestingListState<Serializable> listState1 = new 
TestingListState<>();
+               TestingListState<Serializable> listState2 = new 
TestingListState<>();
+               TestingListState<Serializable> listState3 = new 
TestingListState<>();
 
-               
when(backend.getPartitionableState(Matchers.any(ListStateDescriptor.class))).
-                               thenReturn(listState1, listState1, listState2, 
listState2, listState3, listState3);
+               
when(backend.getDefaultPartitionableState(Matchers.any(String.class))).
+                               thenReturn(listState1, listState1, listState2, 
listState3);
 
                consumer.initializeState(backend);
 
@@ -202,7 +200,8 @@ public class FlinkKafkaConsumerBaseTest {
 
                HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
 
-               for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 : listState1.get()) {
+               for (Serializable serializable : listState1.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
                        snapshot1.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
                }
 
@@ -215,7 +214,8 @@ public class FlinkKafkaConsumerBaseTest {
 
                HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
 
-               for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 : listState2.get()) {
+               for (Serializable serializable : listState2.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
                        snapshot2.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
                }
 
@@ -233,8 +233,9 @@ public class FlinkKafkaConsumerBaseTest {
 
                HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
 
-               for (Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 : listState1.get()) {
-                       snapshot1.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
+               for (Serializable serializable : listState3.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
+                       snapshot3.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
                }
 
                assertEquals(state3, snapshot3);

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 430b2b9..1031b88 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -19,8 +19,6 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 
 import java.io.Serializable;
 import java.util.List;
@@ -36,9 +34,6 @@ import java.util.List;
 @PublicEvolving
 public interface ListCheckpointed<T extends Serializable> {
 
-       ListStateDescriptor<Serializable> DEFAULT_LIST_DESCRIPTOR =
-                       new ListStateDescriptor<>("", new JavaSerializer<>());
-
        /**
         * Gets the current state of the function of operator. The state must 
reflect the result of all
         * prior invocations to this function.

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f683d9a..428442d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -70,7 +71,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
        /** Flag to prevent duplicate function.close() calls in close() and 
dispose() */
        private transient boolean functionsClosed = false;
        
-       
        public AbstractUdfStreamOperator(F userFunction) {
                this.userFunction = requireNonNull(userFunction);
        }
@@ -107,8 +107,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                        @SuppressWarnings("unchecked")
                        ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
 
-                       ListState<Serializable> listState =
-                                       
getOperatorStateBackend().getPartitionableState(ListCheckpointed.DEFAULT_LIST_DESCRIPTOR);
+                       ListState<Serializable> listState = 
getOperatorStateBackend().
+                                       
getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                        List<Serializable> list = new ArrayList<>();
 
@@ -201,8 +201,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                        List<Serializable> partitionableState =
                                        ((ListCheckpointed<Serializable>) 
userFunction).snapshotState(checkpointId, timestamp);
 
-                       ListState<Serializable> listState =
-                                       
getOperatorStateBackend().getPartitionableState(ListCheckpointed.DEFAULT_LIST_DESCRIPTOR);
+                       ListState<Serializable> listState = 
getOperatorStateBackend().
+                                       
getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
                        listState.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4003e59..31ccc28 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -39,12 +39,12 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -632,8 +632,10 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                        assertNotNull(in);
 
-                       Serializable functionState= 
InstantiationUtil.deserializeObject(in);
-                       Integer operatorState= 
InstantiationUtil.deserializeObject(in);
+                       ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
+
+                       Serializable functionState= 
InstantiationUtil.deserializeObject(in, cl);
+                       Integer operatorState= 
InstantiationUtil.deserializeObject(in, cl);
 
                        assertEquals(random.nextInt(), functionState);
                        assertEquals(random.nextInt(), (int) operatorState);

http://git-wip-us.apache.org/repos/asf/flink/blob/56cba7ee/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 0aee128..0687f66 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -30,10 +30,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -45,7 +45,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -62,7 +61,9 @@ import java.util.HashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This verifies that checkpointing works correctly with event time windows. 
This is more

Reply via email to