[FLINK-3312] Add simple constructors for State Descriptors

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/180cd3f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/180cd3f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/180cd3f6

Branch: refs/heads/master
Commit: 180cd3f608731208f5b5ed71e3eb80ae5ccdf5fc
Parents: 456d0ab
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 3 17:44:07 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:28:41 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   4 +-
 .../contrib/streaming/state/DbStateBackend.java |  22 +-
 .../state/DBStateCheckpointingTest.java         |  13 +-
 .../streaming/state/DbStateBackendTest.java     |  16 +-
 .../api/common/state/ListStateDescriptor.java   |  82 +++---
 .../common/state/ReducingStateDescriptor.java   |  97 ++++---
 .../apache/flink/api/common/state/State.java    |   8 +-
 .../flink/api/common/state/StateBackend.java    |   9 +-
 .../flink/api/common/state/StateDescriptor.java | 264 ++++++++++++++++++-
 .../api/common/state/ValueStateDescriptor.java  | 162 +++---------
 .../common/state/ListStateDescriptorTest.java   |  91 +++++++
 .../state/ReducingStateDescriptorTest.java      | 105 ++++++++
 .../common/state/ValueStateDescriptorTest.java  | 104 ++++++++
 .../examples/windowing/SessionWindowing.java    |   6 +-
 .../flink/runtime/state/AbstractHeapState.java  |   2 +-
 .../runtime/state/AbstractStateBackend.java     |   7 +-
 .../org/apache/flink/runtime/state/KvState.java |   2 +-
 .../flink/runtime/state/KvStateSnapshot.java    |   2 +-
 .../state/filesystem/AbstractFsState.java       |   2 +-
 .../filesystem/AbstractFsStateSnapshot.java     |   2 +-
 .../runtime/state/memory/AbstractMemState.java  |   2 +-
 .../state/memory/AbstractMemStateSnapshot.java  |   2 +-
 .../runtime/state/StateBackendTestBase.java     |  31 ++-
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 .../api/operators/StreamGroupedFold.java        |  11 +-
 .../api/operators/StreamGroupedReduce.java      |   2 +-
 .../api/operators/StreamingRuntimeContext.java  |   5 +-
 .../triggers/ContinuousEventTimeTrigger.java    |   8 +-
 .../ContinuousProcessingTimeTrigger.java        |  11 +-
 .../api/windowing/triggers/CountTrigger.java    |  10 +-
 .../api/windowing/triggers/DeltaTrigger.java    |   2 +-
 .../api/windowing/triggers/Trigger.java         |   2 +-
 .../windowing/EvictingWindowOperator.java       |   8 +-
 .../windowing/NonKeyedWindowOperator.java       |   5 +-
 .../operators/windowing/WindowOperator.java     |  13 +-
 .../operators/StreamingRuntimeContextTest.java  | 156 +++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   3 +-
 .../api/scala/function/StatefulFunction.scala   |   2 +-
 .../EventTimeWindowCheckpointingITCase.java     |   4 +-
 .../PartitionedStateCheckpointingITCase.java    |  10 +-
 .../StreamCheckpointingITCase.java              |   4 +-
 42 files changed, 953 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6dbe16c..3ecec5d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -62,7 +62,7 @@ import static java.util.Objects.requireNonNull;
  * @param <SD> The type of {@link StateDescriptor}.
  * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S>, Backend extends AbstractStateBackend>
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
        implements KvState<K, N, S, SD, Backend>, State {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBState.class);
@@ -258,7 +258,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                }
        }
 
-       public static abstract class AbstractRocksDBSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
+       public static abstract class AbstractRocksDBSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
                private static final long serialVersionUID = 1L;
 
                private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index c55b3c0..1d1ccd7 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -195,6 +195,11 @@ public class DbStateBackend extends AbstractStateBackend {
        @Override
        protected <N, T> ValueState<T> createValueState(TypeSerializer<N> 
namespaceSerializer,
                ValueStateDescriptor<T> stateDesc) throws Exception {
+               
+               if (!stateDesc.isSerializerInitialized()) {
+                       throw new IllegalArgumentException("state descriptor 
serializer not initialized");
+               }
+               
                String stateName = operatorIdentifier + "_"+ 
stateDesc.getName();
 
                return new LazyDbValueState<>(
@@ -210,7 +215,14 @@ public class DbStateBackend extends AbstractStateBackend {
        @Override
        protected <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer,
                ListStateDescriptor<T> stateDesc) throws Exception {
-               ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new 
ValueStateDescriptor<>(stateDesc.getName(), null, new 
ArrayListSerializer<>(stateDesc.getSerializer()));
+
+               if (!stateDesc.isSerializerInitialized()) {
+                       throw new IllegalArgumentException("state descriptor 
serializer not initialized");
+               }
+               
+               ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new 
ValueStateDescriptor<>(stateDesc.getName(), 
+                               new 
ArrayListSerializer<>(stateDesc.getSerializer()), null);
+               
                ValueState<ArrayList<T>> valueState = 
createValueState(namespaceSerializer, valueStateDescriptor);
                return new GenericListState<>(valueState);
        }
@@ -220,7 +232,13 @@ public class DbStateBackend extends AbstractStateBackend {
        protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer,
                ReducingStateDescriptor<T> stateDesc) throws Exception {
 
-               ValueStateDescriptor<T> valueStateDescriptor = new 
ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer());
+               if (!stateDesc.isSerializerInitialized()) {
+                       throw new IllegalArgumentException("state descriptor 
serializer not initialized");
+               }
+               
+               ValueStateDescriptor<T> valueStateDescriptor = new 
ValueStateDescriptor<>(
+                               stateDesc.getName(), stateDesc.getSerializer(), 
null);
+               
                ValueState<T> valueState = 
createValueState(namespaceSerializer, valueStateDescriptor);
                return new GenericReducingState<>(valueState, 
stateDesc.getReduceFunction());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
index 0afdada..cc2147f 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
@@ -31,14 +31,13 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.derby.drda.NetworkServerControl;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -210,7 +209,7 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
                        sum = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("my_state", 
0L, LongSerializer.INSTANCE));
+                                       new ValueStateDescriptor<>("my_state", 
Long.class, 0L));
                }
 
                @Override
@@ -238,11 +237,9 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
                @Override
                public void open(Configuration parameters) throws IOException {
                        aCounts = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.of(0L), 
-                                                       new 
KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
+                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.class, NonSerializableLong.of(0L)));
                        
-                       bCounts = getRuntimeContext().getState(
-                                       new ValueStateDescriptor<>("b", 0L, 
LongSerializer.INSTANCE));
+                       bCounts = getRuntimeContext().getState(new 
ValueStateDescriptor<>("b", Long.class, 0L));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 34adf75..d4883dd 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -63,7 +63,6 @@ import org.junit.Test;
 import com.google.common.base.Optional;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.fail;
 
 public class DbStateBackendTest {
 
@@ -201,7 +200,7 @@ public class DbStateBackendTest {
                        backend.initializeForJob(env, "dummy_test_kv", 
IntSerializer.INSTANCE);
 
                        ValueState<String> state = 
backend.createValueState(IntSerializer.INSTANCE,
-                               new ValueStateDescriptor<>("state1", null, 
StringSerializer.INSTANCE));
+                               new ValueStateDescriptor<>("state1", 
StringSerializer.INSTANCE, null));
 
                        LazyDbValueState<Integer, Integer, String> kv = 
(LazyDbValueState<Integer, Integer, String>) state;
 
@@ -455,9 +454,14 @@ public class DbStateBackendTest {
                backend2.initializeForJob(new DummyEnvironment("test", 3, 1), 
"dummy_2", StringSerializer.INSTANCE);
                backend3.initializeForJob(new DummyEnvironment("test", 3, 2), 
"dummy_3", StringSerializer.INSTANCE);
 
-               ValueState<String> s1State = 
backend1.createValueState(StringSerializer.INSTANCE, new 
ValueStateDescriptor<>("a1", null, StringSerializer.INSTANCE));
-               ValueState<String> s2State = 
backend2.createValueState(StringSerializer.INSTANCE, new 
ValueStateDescriptor<>("a2", null, StringSerializer.INSTANCE));
-               ValueState<String> s3State = 
backend3.createValueState(StringSerializer.INSTANCE, new 
ValueStateDescriptor<>("a3", null, StringSerializer.INSTANCE));
+               ValueState<String> s1State = 
backend1.createValueState(StringSerializer.INSTANCE, 
+                               new ValueStateDescriptor<>("a1", 
StringSerializer.INSTANCE, null));
+               
+               ValueState<String> s2State = 
backend2.createValueState(StringSerializer.INSTANCE, 
+                               new ValueStateDescriptor<>("a2", 
StringSerializer.INSTANCE, null));
+               
+               ValueState<String> s3State = 
backend3.createValueState(StringSerializer.INSTANCE, 
+                               new ValueStateDescriptor<>("a3", 
StringSerializer.INSTANCE, null));
 
                LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) 
s1State;
                LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) 
s2State;
@@ -520,7 +524,7 @@ public class DbStateBackendTest {
                backend.initializeForJob(env, "dummy_test_caching", 
IntSerializer.INSTANCE);
 
                ValueState<String> state = 
backend.createValueState(IntSerializer.INSTANCE,
-                       new ValueStateDescriptor<>("state1", "a", 
StringSerializer.INSTANCE));
+                       new ValueStateDescriptor<>("state1", 
StringSerializer.INSTANCE, "a"));
 
                LazyDbValueState<Integer, Integer, String> kv = 
(LazyDbValueState<Integer, Integer, String>) state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index e391126..9f266d3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,82 +6,68 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * {@link StateDescriptor} for {@link ListState}. This can be used to create a 
partitioned
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create 
a partitioned
  * list state using
- * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
  */
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> {
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
        private static final long serialVersionUID = 1L;
 
-       private final TypeSerializer<T> serializer;
-
        /**
-        * Creates a new {@code ListStateDescriptor} with the given name.
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #ListStateDescriptor(String, 
TypeInformation)} constructor.
         *
         * @param name The (unique) name for the state.
-        * @param serializer {@link TypeSerializer} for the state values.
+        * @param typeClass The type of the values in the state.
         */
-       public ListStateDescriptor(String name, TypeSerializer<T> serializer) {
-               super(requireNonNull(name));
-               this.serializer = requireNonNull(serializer);
-       }
-
-       @Override
-       public ListState<T> bind(StateBackend stateBackend) throws Exception {
-               return stateBackend.createListState(this);
+       public ListStateDescriptor(String name, Class<T> typeClass) {
+               super(name, typeClass, null);
        }
 
        /**
-        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeInfo The type of the values in the state.
         */
-       public TypeSerializer<T> getSerializer() {
-               return serializer;
+       public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
+               super(name, typeInfo, null);
        }
 
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
+       /**
+        * Creates a new {@code ListStateDescriptor} with the given name and 
list element type.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeSerializer The type serializer for the list values.
+        */
+       public ListStateDescriptor(String name, TypeSerializer<T> 
typeSerializer) {
+               super(name, typeSerializer, null);
        }
+       
+       // 
------------------------------------------------------------------------
 
        @Override
-       public String toString() {
-               return "ListStateDescriptor{" +
-                       "serializer=" + serializer +
-                       '}';
+       public ListState<T> bind(StateBackend stateBackend) throws Exception {
+               return stateBackend.createListState(this);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 7153a05..1ef65a3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import static java.util.Objects.requireNonNull;
@@ -27,80 +28,70 @@ import static java.util.Objects.requireNonNull;
 /**
  * {@link StateDescriptor} for {@link ReducingState}. This can be used to 
create partitioned
  * reducing state using
- * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getReducingState(ReducingStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
  */
-public class ReducingStateDescriptor<T> extends 
StateDescriptor<ReducingState<T>> {
+public class ReducingStateDescriptor<T> extends 
StateDescriptor<ReducingState<T>, T> {
        private static final long serialVersionUID = 1L;
-
-       private final TypeSerializer<T> serializer;
-
+       
+       
        private final ReduceFunction<T> reduceFunction;
 
        /**
-        * Creates a new {@code ReducingStateDescriptor} with the given name 
and reduce function.
+        * Creates a new {@code ReducingStateDescriptor} with the given name, 
type, and default value.
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #ReducingStateDescriptor(String, 
ReduceFunction, TypeInformation)} constructor.
         *
         * @param name The (unique) name for the state.
-        * @param serializer {@link TypeSerializer} for the state values.
+        * @param reduceFunction The {@code ReduceFunction} used to aggregate 
the state.   
+        * @param typeClass The type of the values in the state.
         */
-       public ReducingStateDescriptor(String name,
-                       ReduceFunction<T> reduceFunction,
-                       TypeSerializer<T> serializer) {
-               super(requireNonNull(name));
+       public ReducingStateDescriptor(String name, ReduceFunction<T> 
reduceFunction, Class<T> typeClass) {
+               super(name, typeClass, null);
+               this.reduceFunction = requireNonNull(reduceFunction);
+
                if (reduceFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of ReducingState can not be a RichFunction.");
                }
-               this.serializer = requireNonNull(serializer);
-               this.reduceFunction = reduceFunction;
-       }
-
-       @Override
-       public ReducingState<T> bind(StateBackend stateBackend) throws 
Exception {
-               return stateBackend.createReducingState(this);
        }
 
        /**
-        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        * Creates a new {@code ReducingStateDescriptor} with the given name 
and default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param reduceFunction The {@code ReduceFunction} used to aggregate 
the state.
+        * @param typeInfo The type of the values in the state.
         */
-       public TypeSerializer<T> getSerializer() {
-               return serializer;
+       public ReducingStateDescriptor(String name, ReduceFunction<T> 
reduceFunction, TypeInformation<T> typeInfo) {
+               super(name, typeInfo, null);
+               this.reduceFunction = requireNonNull(reduceFunction);
        }
 
        /**
-        * Returns the reduce function to be used for the reducing state.
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param reduceFunction The {@code ReduceFunction} used to aggregate 
the state.
+        * @param typeSerializer The type serializer of the values in the state.
         */
-       public ReduceFunction<T> getReduceFunction() {
-               return reduceFunction;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) 
o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
+       public ReducingStateDescriptor(String name, ReduceFunction<T> 
reduceFunction, TypeSerializer<T> typeSerializer) {
+               super(name, typeSerializer, null);
+               this.reduceFunction = requireNonNull(reduceFunction);
        }
 
+       // 
------------------------------------------------------------------------
+       
        @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
+       public ReducingState<T> bind(StateBackend stateBackend) throws 
Exception {
+               return stateBackend.createReducingState(this);
        }
 
-       @Override
-       public String toString() {
-               return "ReducingStateDescriptor{" +
-                       "serializer=" + serializer +
-                       ", reduceFunction=" + reduceFunction +
-                       '}';
+       /**
+        * Returns the reduce function to be used for the reducing state.
+        */
+       public ReduceFunction<T> getReduceFunction() {
+               return reduceFunction;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
index 5a7650e..b97658b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index d5adf9b..8c7c608 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 737133f..38087fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,17 +6,30 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import static java.util.Objects.requireNonNull;
@@ -30,21 +43,80 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <S> The type of the State objects created from this {@code 
StateDescriptor}.
  */
-public abstract class StateDescriptor<S extends State> implements Serializable 
{
+public abstract class StateDescriptor<S extends State, T> implements 
Serializable {
        private static final long serialVersionUID = 1L;
 
        /** Name that uniquely identifies state created from this 
StateDescriptor. */
-       protected final String name;
+       private final String name;
+
+       /** The serializer for the type. May be eagerly initialized in the 
constructor,
+        * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+       private TypeSerializer<T> serializer;
+
+       /** The default value returned by the state when no other value is 
bound to a key */
+       private transient T defaultValue;
 
+       /** The type information describing the value type. Only used to lazily 
create the serializer
+        * and dropped during serialization */
+       private transient TypeInformation<T> typeInfo;
+       
+       // 
------------------------------------------------------------------------
+       
        /**
-        * Create a new {@code StateDescriptor} with the given name.
+        * Create a new {@code StateDescriptor} with the given name and the 
given type serializer.
+        * 
         * @param name The name of the {@code StateDescriptor}.
+        * @param serializer The type serializer for the values in the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.
         */
-       public StateDescriptor(String name) {
-               this.name = requireNonNull(name);
+       protected StateDescriptor(String name, TypeSerializer<T> serializer, T 
defaultValue) {
+               this.name = requireNonNull(name, "name must not be null");
+               this.serializer = requireNonNull(serializer, "serializer must 
not be null");
+               this.defaultValue = defaultValue;
        }
 
        /**
+        * Create a new {@code StateDescriptor} with the given name and the 
given type information.
+        *
+        * @param name The name of the {@code StateDescriptor}.
+        * @param typeInfo The type information for the values in the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.   
+        */
+       protected StateDescriptor(String name, TypeInformation<T> typeInfo, T 
defaultValue) {
+               this.name = requireNonNull(name, "name must not be null");
+               this.typeInfo = requireNonNull(typeInfo, "type information must 
not be null");
+               this.defaultValue = defaultValue;
+       }
+
+       /**
+        * Create a new {@code StateDescriptor} with the given name and the 
given type information.
+        * 
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #StateDescriptor(String, TypeInformation, 
Object)} constructor.
+        *
+        * @param name The name of the {@code StateDescriptor}.
+        * @param type The class of the type of values in the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.   
+        */
+       protected StateDescriptor(String name, Class<T> type, T defaultValue) {
+               this.name = requireNonNull(name, "name must not be null");
+               requireNonNull(type, "type class must not be null");
+               
+               try {
+                       this.typeInfo = TypeExtractor.createTypeInfo(type);
+               } catch (Exception e) {
+                       throw new RuntimeException("Cannot create full type 
information based on the given class. If the type has generics, please", e);
+               }
+
+               this.defaultValue = defaultValue;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       /**
         * Returns the name of this {@code StateDescriptor}.
         */
        public String getName() {
@@ -52,15 +124,179 @@ public abstract class StateDescriptor<S extends State> 
implements Serializable {
        }
 
        /**
+        * Returns the default value.
+        */
+       public T getDefaultValue() {
+               if (defaultValue != null) {
+                       if (serializer != null) {
+                               return serializer.copy(defaultValue);
+                       } else {
+                               throw new IllegalStateException("Serializer not 
yet initialized.");
+                       }
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        * Note that the serializer may initialized lazily and is only 
guaranteed to exist after
+        * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
+        */
+       public TypeSerializer<T> getSerializer() {
+               if (serializer != null) {
+                       return serializer;
+               } else {
+                       throw new IllegalStateException("Serializer not yet 
initialized.");
+               }
+       }
+       
+       /**
         * Creates a new {@link State} on the given {@link StateBackend}.
         *
         * @param stateBackend The {@code StateBackend} on which to create the 
{@link State}.
         */
-       public abstract S bind(StateBackend stateBackend) throws Exception ;
+       public abstract S bind(StateBackend stateBackend) throws Exception;
+       
+       // 
------------------------------------------------------------------------
 
-       // Force subclasses to implement
-       public abstract boolean equals(Object o);
+       /**
+        * Checks whether the serializer has been initialized. Serializer 
initialization is lazy,
+        * to allow parametrization of serializers with an {@link 
ExecutionConfig} via
+        * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
+        * 
+        * @return True if the serializers have been initialized, false 
otherwise.
+        */
+       public boolean isSerializerInitialized() {
+               return serializer != null;
+       }
+
+       /**
+        * Initializes the serializer, unless it has been initialized before.
+        * 
+        * @param executionConfig The execution config to use when creating the 
serializer.
+        */
+       public void initializeSerializerUnlessSet(ExecutionConfig 
executionConfig) {
+               if (serializer == null) {
+                       if (typeInfo != null) {
+                               serializer = 
typeInfo.createSerializer(executionConfig);
+                       } else {
+                               throw new IllegalStateException(
+                                               "Cannot initialize serializer 
after TypeInformation was dropped during serialization");
+                       }
+               }
+       }
+       
+       /**
+        * This method should be called by subclasses prior to serialization. 
Because the TypeInformation is
+        * not always serializable, it is 'transient' and dropped during 
serialization. Hence, the descriptor
+        * needs to make sure that the serializer is created before the 
TypeInformation is dropped. 
+        */
+       private void ensureSerializerCreated() {
+               if (serializer == null) {
+                       if (typeInfo != null) {
+                               serializer = typeInfo.createSerializer(new 
ExecutionConfig());
+                       } else {
+                               throw new IllegalStateException(
+                                               "Cannot initialize serializer 
after TypeInformation was dropped during serialization");
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Standard Utils
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return name.hashCode() + 41;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               else if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               else {
+                       StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
+                       return this.name.equals(that.name);
+               }
+       }
 
-       // Force subclasses to implement
-       public abstract int hashCode();
+       @Override
+       public String toString() {
+               return getClass().getSimpleName() + 
+                               "{ name=" + name +
+                               ", defaultValue=" + defaultValue +
+                               ", serializer=" + serializer +
+                               '}';
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Serialization
+       // 
------------------------------------------------------------------------
+
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               // make sure we have a serializer before the type information 
gets lost
+               ensureSerializerCreated();
+
+               // write all the non-transient fields
+               out.defaultWriteObject();
+
+               // write the non-serializable default value field
+               if (defaultValue == null) {
+                       // we don't have a default value
+                       out.writeBoolean(false);
+               } else {
+                       // we have a default value
+                       out.writeBoolean(true);
+
+                       byte[] serializedDefaultValue;
+                       try (ByteArrayOutputStream baos = new 
ByteArrayOutputStream(); 
+                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(baos))
+                       {
+                               serializer.serialize(defaultValue, outView);
+                               outView.flush();
+                               serializedDefaultValue = baos.toByteArray();
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Unable to serialize 
default value of type " +
+                                               
defaultValue.getClass().getSimpleName() + ".", e);
+                       }
+
+                       out.writeInt(serializedDefaultValue.length);
+                       out.write(serializedDefaultValue);
+               }
+       }
+
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               // read the non-transient fields
+               in.defaultReadObject();
+
+               // read the default value field
+               boolean hasDefaultValue = in.readBoolean();
+               if (hasDefaultValue) {
+                       int size = in.readInt();
+                       byte[] buffer = new byte[size];
+                       int bytesRead = in.read(buffer);
+
+                       if (bytesRead != size) {
+                               throw new RuntimeException("Read size does not 
match expected size.");
+                       }
+
+                       try (ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
+                                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(bais))
+                       {
+                               defaultValue = serializer.deserialize(inView);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Unable to deserialize 
default value.", e);
+                       }
+               } else {
+                       defaultValue = null;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index bcfa46f..f949c57 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,161 +6,75 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static java.util.Objects.requireNonNull;
 
 /**
  * {@link StateDescriptor} for {@link ValueState}. This can be used to create 
partitioned
  * value state using
- * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}.
  *
  * @param <T> The type of the values that the value state can hold.
  */
-public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> {
+public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> 
{
        private static final long serialVersionUID = 1L;
-
-       private transient T defaultValue;
-
-       private final TypeSerializer<T> serializer;
-
+       
        /**
-        * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
-        *
+        * Creates a new {@code ValueStateDescriptor} with the given name, 
type, and default value.
+        * 
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #ValueStateDescriptor(String, 
TypeInformation, Object)} constructor.
+        * 
         * @param name The (unique) name for the state.
+        * @param typeClass The type of the values in the state.   
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
-        * @param serializer {@link TypeSerializer} for the state values.
         */
-       public ValueStateDescriptor(String name, T defaultValue, 
TypeSerializer<T> serializer) {
-               super(requireNonNull(name));
-               this.defaultValue = defaultValue;
-               this.serializer = requireNonNull(serializer);
-       }
-
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               out.defaultWriteObject();
-
-               if (defaultValue == null) {
-                       // we don't have a default value
-                       out.writeBoolean(false);
-               } else {
-                       out.writeBoolean(true);
-
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                       DataOutputViewStreamWrapper outView =
-                                       new DataOutputViewStreamWrapper(new 
DataOutputStream(baos));
-
-                       try {
-                               serializer.serialize(defaultValue, outView);
-                       } catch (IOException ioe) {
-                               throw new RuntimeException("Unable to serialize 
default value of type " +
-                                               
defaultValue.getClass().getSimpleName() + ".", ioe);
-                       }
-
-                       outView.close();
-
-                       out.writeInt(baos.size());
-                       out.write(baos.toByteArray());
-               }
-
-       }
-
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               boolean hasDefaultValue = in.readBoolean();
-
-               if (hasDefaultValue) {
-                       int size = in.readInt();
-                       byte[] buffer = new byte[size];
-                       int bytesRead = in.read(buffer);
-
-                       if (bytesRead != size) {
-                               throw new RuntimeException("Read size does not 
match expected size.");
-                       }
-
-                       ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
-                       DataInputViewStreamWrapper inView =
-                                       new DataInputViewStreamWrapper(new 
DataInputStream(bais));
-                       defaultValue = serializer.deserialize(inView);
-               } else {
-                       defaultValue = null;
-               }
-       }
-
-       @Override
-       public ValueState<T> bind(StateBackend stateBackend) throws Exception {
-               return stateBackend.createValueState(this);
+       public ValueStateDescriptor(String name, Class<T> typeClass, T 
defaultValue) {
+               super(name, typeClass, defaultValue);
        }
 
        /**
-        * Returns the default value.
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeInfo The type of the values in the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.
         */
-       public T getDefaultValue() {
-               if (defaultValue != null) {
-                       return serializer.copy(defaultValue);
-               } else {
-                       return null;
-               }
+       public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T 
defaultValue) {
+               super(name, typeInfo, defaultValue);
        }
 
        /**
-        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        * Creates a new {@code ValueStateDescriptor} with the given name, 
default value, and the specific
+        * serializer.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeSerializer The type serializer of the values in the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.
         */
-       public TypeSerializer<T> getSerializer() {
-               return serializer;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
-               return serializer.equals(that.serializer) && 
name.equals(that.name);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = serializer.hashCode();
-               result = 31 * result + name.hashCode();
-               return result;
+       public ValueStateDescriptor(String name, TypeSerializer<T> 
typeSerializer, T defaultValue) {
+               super(name, typeSerializer, defaultValue);
        }
 
+       // 
------------------------------------------------------------------------
+       
        @Override
-       public String toString() {
-               return "ValueStateDescriptor{" +
-                               "name=" + name +
-                               ", defaultValue=" + defaultValue +
-                               ", serializer=" + serializer +
-                               '}';
+       public ValueState<T> bind(StateBackend stateBackend) throws Exception {
+               return stateBackend.createValueState(this);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
new file mode 100644
index 0000000..6dc00f0
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ListStateDescriptorTest {
+       
+       @Test
+       public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+               TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+               
+               ListStateDescriptor<String> descr = 
+                               new ListStateDescriptor<String>("testName", 
serializer);
+               
+               assertEquals("testName", descr.getName());
+               assertNotNull(descr.getSerializer());
+               assertEquals(serializer, descr.getSerializer());
+
+               ListStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertNotNull(copy.getSerializer());
+               assertEquals(serializer, copy.getSerializer());
+       }
+
+       @Test
+       public void testValueStateDescriptorLazySerializer() throws Exception {
+               // some different registered value
+               ExecutionConfig cfg = new ExecutionConfig();
+               cfg.registerKryoType(TaskInfo.class);
+
+               ListStateDescriptor<Path> descr =
+                               new ListStateDescriptor<Path>("testName", 
Path.class);
+               
+               try {
+                       descr.getSerializer();
+                       fail("should cause an exception");
+               } catch (IllegalStateException ignored) {}
+
+               descr.initializeSerializerUnlessSet(cfg);
+               
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+       }
+
+       @Test
+       public void testValueStateDescriptorAutoSerializer() throws Exception {
+
+               ListStateDescriptor<String> descr =
+                               new ListStateDescriptor<String>("testName", 
String.class);
+
+               ListStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertNotNull(copy.getSerializer());
+               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
new file mode 100644
index 0000000..0bac930
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class ReducingStateDescriptorTest {
+       
+       @Test
+       public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+               @SuppressWarnings("unchecked")
+               ReduceFunction<String> reducer = mock(ReduceFunction.class); 
+               
+               TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+               
+               ReducingStateDescriptor<String> descr = 
+                               new ReducingStateDescriptor<String>("testName", 
reducer, serializer);
+               
+               assertEquals("testName", descr.getName());
+               assertNotNull(descr.getSerializer());
+               assertEquals(serializer, descr.getSerializer());
+
+               ReducingStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertNotNull(copy.getSerializer());
+               assertEquals(serializer, copy.getSerializer());
+       }
+
+       @Test
+       public void testValueStateDescriptorLazySerializer() throws Exception {
+
+               @SuppressWarnings("unchecked")
+               ReduceFunction<Path> reducer = mock(ReduceFunction.class);
+               
+               // some different registered value
+               ExecutionConfig cfg = new ExecutionConfig();
+               cfg.registerKryoType(TaskInfo.class);
+
+               ReducingStateDescriptor<Path> descr =
+                               new ReducingStateDescriptor<Path>("testName", 
reducer, Path.class);
+
+               try {
+                       descr.getSerializer();
+                       fail("should cause an exception");
+               } catch (IllegalStateException ignored) {}
+
+               descr.initializeSerializerUnlessSet(cfg);
+               
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+       }
+
+       @Test
+       public void testValueStateDescriptorAutoSerializer() throws Exception {
+
+               @SuppressWarnings("unchecked")
+               ReduceFunction<String> reducer = mock(ReduceFunction.class);
+
+               ReducingStateDescriptor<String> descr =
+                               new ReducingStateDescriptor<String>("testName", 
reducer, String.class);
+
+               ReducingStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertNotNull(copy.getSerializer());
+               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
new file mode 100644
index 0000000..d03cc47
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ValueStateDescriptorTest {
+       
+       @Test
+       public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+               TypeSerializer<String> serializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+               String defaultValue = "le-value-default";
+               
+               ValueStateDescriptor<String> descr = 
+                               new ValueStateDescriptor<String>("testName", 
serializer, defaultValue);
+               
+               assertEquals("testName", descr.getName());
+               assertEquals(defaultValue, descr.getDefaultValue());
+               assertNotNull(descr.getSerializer());
+               assertEquals(serializer, descr.getSerializer());
+
+               ValueStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertEquals(defaultValue, copy.getDefaultValue());
+               assertNotNull(copy.getSerializer());
+               assertEquals(serializer, copy.getSerializer());
+       }
+
+       @Test
+       public void testValueStateDescriptorLazySerializer() throws Exception {
+               
+               // some default value that goes to the generic serializer
+               Path defaultValue = new Path(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
+               
+               // some different registered value
+               ExecutionConfig cfg = new ExecutionConfig();
+               cfg.registerKryoType(TaskInfo.class);
+
+               ValueStateDescriptor<Path> descr =
+                               new ValueStateDescriptor<Path>("testName", 
Path.class, defaultValue);
+
+               try {
+                       descr.getSerializer();
+                       fail("should cause an exception");
+               } catch (IllegalStateException ignored) {}
+
+               descr.initializeSerializerUnlessSet(cfg);
+               
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+               assertTrue(((KryoSerializer<?>) 
descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+       }
+
+       @Test
+       public void testValueStateDescriptorAutoSerializer() throws Exception {
+               
+               String defaultValue = "le-value-default";
+
+               ValueStateDescriptor<String> descr =
+                               new ValueStateDescriptor<String>("testName", 
String.class, defaultValue);
+
+               ValueStateDescriptor<String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertEquals(defaultValue, copy.getDefaultValue());
+               assertNotNull(copy.getSerializer());
+               assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index dafe86f..bd82800 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,10 +17,8 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -103,8 +101,8 @@ public class SessionWindowing {
 
                private final Long sessionTimeout;
 
-               private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("last-seen", -1L,
-                       BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+               private final ValueStateDescriptor<Long> stateDesc = 
+                               new ValueStateDescriptor<>("last-seen", 
Long.class, -1L);
 
 
                public SessionTrigger(Long sessionTimeout) {

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
index 206be64..8e77752 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
  * @param <SD> The type of StateDescriptor for the State S
  * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S>, Backend extends AbstractStateBackend>
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
                implements KvState<K, N, S, SD, Backend>, State {
 
        /** Map containing the actual key/value pairs */

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 958b4dc..e989af3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -178,12 +179,16 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
         * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
         */
        @SuppressWarnings({"rawtypes", "unchecked"})
-       public <K, N, S extends State> S getPartitionedState(final N namespace, 
final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> 
stateDescriptor) throws Exception {
+       public <K, N, S extends State> S getPartitionedState(final N namespace, 
final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
 
                if (keySerializer == null) {
                        throw new Exception("State key serializer has not been 
configured in the config. " +
                                        "This operation cannot use partitioned 
state.");
                }
+               
+               if (!stateDescriptor.isSerializerInitialized()) {
+                       stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
 
                if (keyValueStatesByName == null) {
                        keyValueStatesByName = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
index 7a97dc0..89de000 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
@@ -35,7 +35,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
  * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
  * @param <Backend> The type of {@link AbstractStateBackend} that manages this 
{@code KvState}.
  */
-public interface KvState<K, N, S extends State, SD extends StateDescriptor<S>, 
Backend extends AbstractStateBackend> {
+public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, 
?>, Backend extends AbstractStateBackend> {
 
        /**
         * Sets the current key, which will be used when using the state access 
methods.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index ce72135..245427e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -39,7 +39,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * @param <SD> The type of the {@link StateDescriptor}
  * @param <Backend> The type of the backend that can restore the state from 
this snapshot.
  */
-public interface KvStateSnapshot<K, N, S extends State, SD extends 
StateDescriptor<S>, Backend extends AbstractStateBackend> extends 
java.io.Serializable {
+public interface KvStateSnapshot<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend> extends 
java.io.Serializable {
 
        /**
         * Loads the key/value state back from this snapshot.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
index 5035953..3cae629 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
@@ -41,7 +41,7 @@ import java.util.Map;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractFsState<K, N, SV, S extends State, SD extends 
StateDescriptor<S>>
+public abstract class AbstractFsState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
                extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
 
        /** The file system state backend backing snapshots of this state */

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index c1e0f12..432a9e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -39,7 +39,7 @@ import java.util.Map;
  * @param <N> The type of the namespace in the snapshot state.
  * @param <SV> The type of the state value.
  */
-public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S>> extends AbstractFileStateHandle implements 
KvStateSnapshot<K, N, S, SD, FsStateBackend> {
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S, ?>> extends AbstractFileStateHandle implements 
KvStateSnapshot<K, N, S, SD, FsStateBackend> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
index 816c883..cae673d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
@@ -39,7 +39,7 @@ import java.util.Map;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractMemState<K, N, SV, S extends State, SD extends 
StateDescriptor<S>>
+public abstract class AbstractMemState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
                extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> {
 
        public AbstractMemState(TypeSerializer<K> keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
index d2efd53..5d4f0d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -36,7 +36,7 @@ import java.util.Map;
  * @param <N> The type of the namespace in the snapshot state.
  * @param <SV> The type of the value in the snapshot state.
  */
-public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S>> implements KvStateSnapshot<K, N, S, SD, 
MemoryStateBackend> {
+public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD 
extends StateDescriptor<S, ?>> implements KvStateSnapshot<K, N, S, SD, 
MemoryStateBackend> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 82ab3b3..20a46a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import com.google.common.base.Joiner;
+
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -29,11 +31,10 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.types.IntValue;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +68,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
 
-                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", null, StringSerializer.INSTANCE);
+                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               
                        ValueState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
@@ -149,7 +152,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                try {
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
 
-                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", StringSerializer.INSTANCE);
+                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
                        ListState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
@@ -246,7 +249,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                                return value1 + "," + value2;
                                        }
                                },
-                               StringSerializer.INSTANCE);
+                               String.class);
                        ReducingState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
@@ -336,12 +339,10 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                "test_op",
                                IntSerializer.INSTANCE);
 
-                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id",
-                               null,
-                               StringSerializer.INSTANCE);
-                       ValueState<String> state = 
backend.getPartitionedState(null,
-                               VoidSerializer.INSTANCE,
-                               kvId);
+                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+                       
+                       ValueState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
                        KvState<Integer, Void, ValueState<String>, 
ValueStateDescriptor<String>, B> kv =
@@ -379,7 +380,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                try {
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
 
-                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", StringSerializer.INSTANCE);
+                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
                        ListState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
@@ -427,7 +428,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                                return value1 + "," + value2;
                                        }
                                },
-                               StringSerializer.INSTANCE);
+                               String.class);
                        ReducingState<String> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")
@@ -468,7 +469,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                try {
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
 
-                       ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", new IntValue(-1), IntValueSerializer.INSTANCE);
+                       ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+                       
                        ValueState<IntValue> state = 
backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
                        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f8f26b5..6cb04b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -254,7 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
         * @throws IllegalStateException Thrown, if the key/value state was 
already initialized.
         * @throws Exception Thrown, if the state backend cannot create the 
key/value state.
         */
-       protected <S extends State> S getPartitionedState(StateDescriptor<S> 
stateDescriptor) throws Exception {
+       protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
                return getStateBackend().getPartitionedState(null, 
VoidSerializer.INSTANCE, stateDescriptor);
        }
 
@@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT>
         * @throws Exception Thrown, if the state backend cannot create the 
key/value state.
         */
        @SuppressWarnings("unchecked")
-       protected <S extends State, N> S getPartitionedState(N namespace, 
TypeSerializer<N> namespaceSerializer, StateDescriptor<S> stateDescriptor) 
throws Exception {
+       protected <S extends State, N> S getPartitionedState(N namespace, 
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) 
throws Exception {
                return getStateBackend().getPartitionedState(namespace, 
(TypeSerializer<Object>) namespaceSerializer,
                        stateDescriptor);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index e627ec8..0b80884 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -64,10 +64,13 @@ public class StreamGroupedFold<IN, OUT, KEY>
                                        "operator. Probably the setOutputType 
method was not called.");
                }
 
-               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-               initialValue = outTypeSerializer.deserialize(in);
-               ValueStateDescriptor<OUT> stateId = new 
ValueStateDescriptor<>(STATE_NAME, null, outTypeSerializer);
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+                       DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais))
+               {
+                       initialValue = outTypeSerializer.deserialize(in);
+               }
+               
+               ValueStateDescriptor<OUT> stateId = new 
ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
                values = getPartitionedState(stateId);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index c054563..2dd7762 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -44,7 +44,7 @@ public class StreamGroupedReduce<IN> extends 
AbstractUdfStreamOperator<IN, Reduc
        @Override
        public void open() throws Exception {
                super.open();
-               ValueStateDescriptor<IN> stateId = new 
ValueStateDescriptor<>(STATE_NAME, null, serializer);
+               ValueStateDescriptor<IN> stateId = new 
ValueStateDescriptor<>(STATE_NAME, serializer, null);
                values = getPartitionedState(stateId);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index f99ab93..c9cc4a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -112,6 +112,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        public <T> ValueState<T> getState(ValueStateDescriptor<T> 
stateProperties) {
                requireNonNull(stateProperties, "The state properties must not 
be null");
                try {
+                       
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
                        return operator.getPartitionedState(stateProperties);
                } catch (Exception e) {
                        throw new RuntimeException("Error while getting state", 
e);
@@ -122,6 +123,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        public <T> ListState<T> getListState(ListStateDescriptor<T> 
stateProperties) {
                requireNonNull(stateProperties, "The state properties must not 
be null");
                try {
+                       
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
                        return operator.getPartitionedState(stateProperties);
                } catch (Exception e) {
                        throw new RuntimeException("Error while getting state", 
e);
@@ -132,6 +134,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties) {
                requireNonNull(stateProperties, "The state properties must not 
be null");
                try {
+                       
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
                        return operator.getPartitionedState(stateProperties);
                } catch (Exception e) {
                        throw new RuntimeException("Error while getting state", 
e);
@@ -163,7 +166,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
                requireNonNull(stateType, "The state type information must not 
be null");
 
                ValueStateDescriptor<S> stateProps = 
-                               new ValueStateDescriptor<>(name, defaultState, 
stateType.createSerializer(getExecutionConfig()));
+                               new ValueStateDescriptor<>(name, stateType, 
defaultState);
                return getState(stateProps);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 21e35db..02a935c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -38,8 +38,8 @@ public class ContinuousEventTimeTrigger<W extends Window> 
implements Trigger<Obj
 
        private final long interval;
 
-       private final ValueStateDescriptor<Boolean> stateDesc = new 
ValueStateDescriptor<>("first", true,
-               BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+       private final ValueStateDescriptor<Boolean> stateDesc = 
+                       new ValueStateDescriptor<>("first", 
BooleanSerializer.INSTANCE, true);
 
        private ContinuousEventTimeTrigger(long interval) {
                this.interval = interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 10c975f..25d9508 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -36,8 +37,8 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> implements Trigge
 
        private final long interval;
 
-       private final ValueStateDescriptor<Long> stateDesc = new 
ValueStateDescriptor<>("fire-timestamp", 0L,
-               BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new 
ExecutionConfig()));
+       private final ValueStateDescriptor<Long> stateDesc = 
+                       new ValueStateDescriptor<>("fire-timestamp", 
LongSerializer.INSTANCE, 0L);
 
 
        private ContinuousProcessingTimeTrigger(long interval) {

Reply via email to