http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 3fcfb46..725cbf6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 import java.io.IOException;
@@ -35,8 +35,8 @@ public class CountTrigger<W extends Window> implements 
Trigger<Object, W> {
 
        private final long maxCount;
 
-       private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("count", 0L,
-               BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+       private final ValueStateDescriptor<Long> stateDesc =
+                       new ValueStateDescriptor<>("count", 
LongSerializer.INSTANCE, 0L);
 
 
        private CountTrigger(long maxCount) {

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 3135961..55c719a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -43,7 +43,7 @@ public class DeltaTrigger<T, W extends Window> implements 
Trigger<T, W> {
        private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, 
TypeSerializer<T> stateSerializer) {
                this.deltaFunction = deltaFunction;
                this.threshold = threshold;
-               stateDesc = new ValueStateDescriptor<>("last-element", null, 
stateSerializer);
+               stateDesc = new ValueStateDescriptor<>("last-element", 
stateSerializer, null);
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 5c71355..fb61064 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -179,7 +179,7 @@ public interface Trigger<T, W extends Window> extends 
Serializable {
                 * @throws UnsupportedOperationException Thrown, if no 
partitioned state is available for the
                 *                                       function (function is 
not part os a KeyedStream).
                 */
-               <S extends State> S getPartitionedState(StateDescriptor<S> 
stateDescriptor);
+               <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor);
 
                /**
                 * Retrieves a {@link ValueState} object that can be used to 
interact with

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index f163de1..41ec91a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -55,13 +55,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
        private final Evictor<? super IN, ? super W> evictor;
 
-       private final StateDescriptor<? extends ListState<StreamRecord<IN>>> 
windowStateDescriptor;
+       private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor;
 
        public EvictingWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                TypeSerializer<W> windowSerializer,
                KeySelector<IN, K> keySelector,
                TypeSerializer<K> keySerializer,
-               StateDescriptor<? extends ListState<StreamRecord<IN>>> 
windowStateDescriptor,
+               StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor,
                WindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
                Trigger<? super IN, ? super W> trigger,
                Evictor<? super IN, ? super W> evictor) {
@@ -161,7 +161,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        @Override
        @VisibleForTesting
        @SuppressWarnings("unchecked, rawtypes")
-       public StateDescriptor<? extends MergingState<IN, Iterable<IN>>> 
getStateDescriptor() {
-               return (StateDescriptor<? extends MergingState<IN, 
Iterable<IN>>>) windowStateDescriptor;
+       public StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?> 
getStateDescriptor() {
+               return (StateDescriptor<? extends MergingState<IN, 
Iterable<IN>>, ?>) windowStateDescriptor;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 1b712d9..d7dbaf5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -448,13 +448,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        requireNonNull(name, "The name of the state must not be 
null");
                        requireNonNull(stateType, "The state type information 
must not be null");
 
-                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, defaultState, 
stateType.createSerializer(getExecutionConfig()));
+                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(
+                                       name, 
stateType.createSerializer(getExecutionConfig()), defaultState);
                        return getPartitionedState(stateDesc);
                }
 
                @Override
                @SuppressWarnings("rawtypes, unchecked")
-               public <S extends State> S getPartitionedState(final 
StateDescriptor<S> stateDescriptor) {
+               public <S extends State> S getPartitionedState(final 
StateDescriptor<S, ?> stateDescriptor) {
                        if (!(stateDescriptor instanceof ValueStateDescriptor)) 
{
                                throw new 
UnsupportedOperationException("NonKeyedWindowOperator Triggers only " +
                                        "support ValueState.");

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index d562925..eccaeee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -102,7 +102,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        protected final Trigger<? super IN, ? super W> trigger;
 
-       protected final StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor;
+       protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> 
windowStateDescriptor;
 
        /**
         * If this is true. The current processing time is set as the timestamp 
of incoming elements.
@@ -167,7 +167,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                TypeSerializer<W> windowSerializer,
                KeySelector<IN, K> keySelector,
                TypeSerializer<K> keySerializer,
-               StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor,
+               StateDescriptor<? extends MergingState<IN, ACC>, ?> 
windowStateDescriptor,
                WindowFunction<ACC, OUT, K, W> windowFunction,
                Trigger<? super IN, ? super W> trigger) {
 
@@ -374,15 +374,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        requireNonNull(name, "The name of the state must not be 
null");
                        requireNonNull(stateType, "The state type information 
must not be null");
 
-                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, defaultState, 
stateType.createSerializer(getExecutionConfig()));
+                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), 
defaultState);
                        return getPartitionedState(stateDesc);
                }
 
                @SuppressWarnings("unchecked")
-               public <S extends State> S 
getPartitionedState(StateDescriptor<S> stateDescriptor) {
+               public <S extends State> S 
getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
                        try {
-                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer,
-                                       stateDescriptor);
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer, 
stateDescriptor);
                        } catch (Exception e) {
                                throw new RuntimeException("Could not retrieve 
state", e);
                        }
@@ -608,7 +607,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @VisibleForTesting
-       public StateDescriptor<? extends MergingState<IN, ACC>> 
getStateDescriptor() {
+       public StateDescriptor<? extends MergingState<IN, ACC>, ?> 
getStateDescriptor() {
                return windowStateDescriptor;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
new file mode 100644
index 0000000..72e02c2
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+public class StreamingRuntimeContextTest {
+       
+       @Test
+       public void testValueStateInstantiation() throws Exception {
+               
+               final ExecutionConfig config = new ExecutionConfig();
+               config.registerKryoType(Path.class);
+               
+               final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
+               
+               StreamingRuntimeContext context = new StreamingRuntimeContext(
+                               createMockOp(descriptorCapture, config),
+                               createMockEnvironment(),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+
+               ValueStateDescriptor<TaskInfo> descr = new 
ValueStateDescriptor<>("name", TaskInfo.class, null);
+               context.getState(descr);
+               
+               StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, 
?>) descriptorCapture.get();
+               TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+               
+               // check that the Path class is really registered, i.e., the 
execution config was applied
+               assertTrue(serializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
+       }
+
+       @Test
+       public void testReduceingStateInstantiation() throws Exception {
+
+               final ExecutionConfig config = new ExecutionConfig();
+               config.registerKryoType(Path.class);
+
+               final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
+
+               StreamingRuntimeContext context = new StreamingRuntimeContext(
+                               createMockOp(descriptorCapture, config),
+                               createMockEnvironment(),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+
+               @SuppressWarnings("unchecked")
+               ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) 
mock(ReduceFunction.class);
+               
+               ReducingStateDescriptor<TaskInfo> descr = 
+                               new ReducingStateDescriptor<>("name", reducer, 
TaskInfo.class);
+               
+               context.getReducingState(descr);
+
+               StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, 
?>) descriptorCapture.get();
+               TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+               // check that the Path class is really registered, i.e., the 
execution config was applied
+               assertTrue(serializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
+       }
+
+       @Test
+       public void testListStateInstantiation() throws Exception {
+
+               final ExecutionConfig config = new ExecutionConfig();
+               config.registerKryoType(Path.class);
+
+               final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
+
+               StreamingRuntimeContext context = new StreamingRuntimeContext(
+                               createMockOp(descriptorCapture, config),
+                               createMockEnvironment(),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+
+               ListStateDescriptor<TaskInfo> descr = new 
ListStateDescriptor<>("name", TaskInfo.class);
+               context.getListState(descr);
+
+               StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, 
?>) descriptorCapture.get();
+               TypeSerializer<?> serializer = descrIntercepted.getSerializer();
+
+               // check that the Path class is really registered, i.e., the 
execution config was applied
+               assertTrue(serializer instanceof KryoSerializer);
+               assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  
+       // 
------------------------------------------------------------------------
+       
+       @SuppressWarnings("unchecked")
+       private static AbstractStreamOperator<?> createMockOp(
+                       final AtomicReference<Object> ref, final 
ExecutionConfig config) throws Exception {
+               
+               AbstractStreamOperator<?> operatorMock = 
mock(AbstractStreamOperator.class);
+               when(operatorMock.getExecutionConfig()).thenReturn(config);
+               
+               
when(operatorMock.getPartitionedState(any(StateDescriptor.class))).thenAnswer(
+                               new Answer<Object>() {
+                                       
+                                       @Override
+                                       public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                                               
ref.set(invocationOnMock.getArguments()[0]);
+                                               return null;
+                                       }
+                               });
+               
+               return operatorMock;
+       }
+       
+       private static Environment createMockEnvironment() {
+               Environment env = mock(Environment.class);
+               
when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader());
+               
when(env.getDistributedCacheEntries()).thenReturn(Collections.<String, 
Future<Path>>emptyMap());
+               when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 0, 
1, 1));
+               return env;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index f2f8c5a..b6e51c6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -763,7 +763,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                public void open(Configuration parameters) {
                        assertNotNull(getRuntimeContext());
                        state = getRuntimeContext().getState(
-                                       new 
ValueStateDescriptor<>("totalCount", 0, IntSerializer.INSTANCE));
+                                       new 
ValueStateDescriptor<>("totalCount", Integer.class, 0));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index e88f6de..3429215 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -943,8 +943,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        
                        // start with one, so the final count is correct and we 
test that we do not
                        // initialize with 0 always by default
-                       state = getRuntimeContext().getState(
-                                       new 
ValueStateDescriptor<>("totalCount", 1, IntSerializer.INSTANCE));
+                       state = getRuntimeContext().getState(new 
ValueStateDescriptor<>("totalCount", Integer.class, 1));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 5e1d1db..ff6aca4 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -45,7 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
   }
 
   override def open(c: Configuration) = {
-    val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], 
stateSerializer)
+    val info = new ValueStateDescriptor[S]("state", stateSerializer, 
null.asInstanceOf[S])
     state = getRuntimeContext().getState(info)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 19ff090..2039528 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -45,6 +44,7 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -224,7 +224,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                        
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
                                                        open = true;
                                                        count = 
getRuntimeContext().getState(
-                                                                       new 
ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE));
+                                                                       new 
ValueStateDescriptor<>("count", Integer.class, 0));
                                                }
 
                                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index a2d6c24..0728b41 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -27,14 +27,11 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -173,7 +170,7 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
                        sum = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("my_state", 
0L, LongSerializer.INSTANCE));
+                                       new ValueStateDescriptor<>("my_state", 
Long.class, 0L));
                }
 
                @Override
@@ -202,11 +199,10 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
                public void open(Configuration parameters) throws IOException {
                        
                        aCounts = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.of(0L), 
-                                                       new 
KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
+                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.class, NonSerializableLong.of(0L)));
                        
                        bCounts = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("b", 0L, 
LongSerializer.INSTANCE));
+                                       new ValueStateDescriptor<>("b", 
Long.class, 0L));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index d8131c7..27c0f80 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -255,8 +254,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
                        
-                       pCount = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("pCount", 
0L, LongSerializer.INSTANCE));
+                       pCount = getRuntimeContext().getState(new 
ValueStateDescriptor<>("pCount", Long.class, 0L));
                }
                
                @Override

Reply via email to