Repository: flink
Updated Branches:
  refs/heads/release-1.2 af244aaed -> 8b7331409


[FLINK-5155] Deprecate ValueStateDescriptor constructors with default value


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b733140
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b733140
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b733140

Branch: refs/heads/release-1.2
Commit: 8b7331409cd1b903d2ca46b81f3265bc2973e659
Parents: b4c60a9
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Jan 11 12:14:13 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 +--
 .../flink/api/common/state/ValueState.java      | 10 ++--
 .../api/common/state/ValueStateDescriptor.java  | 50 +++++++++++++++++++-
 .../AbstractKeyedCEPPatternOperator.java        |  6 +--
 .../runtime/query/QueryableStateClientTest.java |  2 +-
 .../runtime/query/netty/KvStateClientTest.java  |  2 +-
 .../query/netty/KvStateServerHandlerTest.java   | 10 ++--
 .../runtime/query/netty/KvStateServerTest.java  |  2 +-
 .../runtime/state/StateBackendTestBase.java     | 15 +++---
 .../streaming/api/datastream/KeyedStream.java   |  3 +-
 .../api/operators/StreamGroupedFold.java        |  2 +-
 .../api/operators/StreamGroupedReduce.java      |  2 +-
 .../api/windowing/triggers/DeltaTrigger.java    |  2 +-
 .../operators/AbstractStreamOperatorTest.java   |  2 +-
 .../api/operators/ProcessOperatorTest.java      |  2 +-
 .../operators/StreamingRuntimeContextTest.java  |  2 +-
 .../api/operators/co/CoProcessOperatorTest.java |  4 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  3 +-
 .../api/scala/function/StatefulFunction.scala   |  2 +-
 .../StatefulUDFSavepointMigrationITCase.java    |  8 ++--
 .../flink/test/query/QueryableStateITCase.java  |  3 +-
 21 files changed, 90 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 002b16d..70f74b0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -371,8 +371,7 @@ public class RocksDBAsyncSnapshotTest {
                        ValueState<String> state = getPartitionedState(
                                        VoidNamespace.INSTANCE,
                                        VoidNamespaceSerializer.INSTANCE,
-                                       new ValueStateDescriptor<>("count",
-                                                       
StringSerializer.INSTANCE, "hello"));
+                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
 
                }
 
@@ -383,8 +382,7 @@ public class RocksDBAsyncSnapshotTest {
                        ValueState<String> state = getPartitionedState(
                                        VoidNamespace.INSTANCE,
                                        VoidNamespaceSerializer.INSTANCE,
-                                       new ValueStateDescriptor<>("count",
-                                                       
StringSerializer.INSTANCE, "hello"));
+                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
 
                        state.update(element.getValue());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index de3250a..7e42daa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -45,8 +45,11 @@ public interface ValueState<T> extends State {
         * operator instance. If state partitioning is applied, the value 
returned
         * depends on the current operator input, as the operator maintains an
         * independent state for each partition.
-        * 
-        * @return The operator state value corresponding to the current input.
+        *
+        * <p>If you didn't specify a default value when creating the {@link 
ValueStateDescriptor}
+        * this will return {@code null} when to value was previously set using 
{@link #update(Object)}.
+        *
+        * @return The state value corresponding to the current input.
         * 
         * @throws IOException Thrown if the system cannot access the state.
         */
@@ -59,8 +62,7 @@ public interface ValueState<T> extends State {
         * partitioned state is updated with null, the state for the current 
key 
         * will be removed and the default value is returned on the next access.
         * 
-        * @param value
-        *            The new value for the state.
+        * @param value The new value for the state.
         *            
         * @throws IOException Thrown if the system cannot access the state.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 7db9116..b3006c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -27,6 +27,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * value state using
  * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}.
  *
+ * <p>If you don't use one of the constructors that set a default value the 
value that you
+ * get when reading a {@link ValueState} using {@link ValueState#value()} will 
be {@code null}.
+ *
  * @param <T> The type of the values that the value state can hold.
  */
 @PublicEvolving
@@ -38,12 +41,16 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<ValueState<T>, T> {
         * 
         * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
         * consider using the {@link #ValueStateDescriptor(String, 
TypeInformation, Object)} constructor.
-        * 
+        *
+        * @deprecated Use {@link #ValueStateDescriptor(String, Class)} instead 
and manually manage
+        * the default value by checking whether the contents of the state is 
{@code null}.
+        *
         * @param name The (unique) name for the state.
         * @param typeClass The type of the values in the state.   
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
+       @Deprecated
        public ValueStateDescriptor(String name, Class<T> typeClass, T 
defaultValue) {
                super(name, typeClass, defaultValue);
        }
@@ -51,11 +58,15 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<ValueState<T>, T> {
        /**
         * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
         *
+        * @deprecated Use {@link #ValueStateDescriptor(String, 
TypeInformation)} instead and manually
+        * manage the default value by checking whether the contents of the 
state is {@code null}.
+        *
         * @param name The (unique) name for the state.
         * @param typeInfo The type of the values in the state.
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
+       @Deprecated
        public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T 
defaultValue) {
                super(name, typeInfo, defaultValue);
        }
@@ -64,15 +75,52 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<ValueState<T>, T> {
         * Creates a new {@code ValueStateDescriptor} with the given name, 
default value, and the specific
         * serializer.
         *
+        * @deprecated Use {@link #ValueStateDescriptor(String, 
TypeSerializer)} instead and manually
+        * manage the default value by checking whether the contents of the 
state is {@code null}.
+        *
         * @param name The (unique) name for the state.
         * @param typeSerializer The type serializer of the values in the state.
         * @param defaultValue The default value that will be set when 
requesting state without setting
         *                     a value before.
         */
+       @Deprecated
        public ValueStateDescriptor(String name, TypeSerializer<T> 
typeSerializer, T defaultValue) {
                super(name, typeSerializer, defaultValue);
        }
 
+       /**
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
type
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #ValueStateDescriptor(String, 
TypeInformation)} constructor.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeClass The type of the values in the state.
+        */
+       public ValueStateDescriptor(String name, Class<T> typeClass) {
+               super(name, typeClass, null);
+       }
+
+       /**
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
type.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeInfo The type of the values in the state.
+        */
+       public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
+               super(name, typeInfo, null);
+       }
+
+       /**
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
the specific serializer.
+        *
+        * @param name The (unique) name for the state.
+        * @param typeSerializer The type serializer of the values in the state.
+        */
+       public ValueStateDescriptor(String name, TypeSerializer<T> 
typeSerializer) {
+               super(name, typeSerializer, null);
+       }
+
        // 
------------------------------------------------------------------------
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b5601ef..832a0ba 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -102,8 +102,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                        nfaOperatorState = getPartitionedState(
                                        new ValueStateDescriptor<NFA<IN>>(
                                                NFA_OPERATOR_STATE_NAME,
-                                               new NFA.Serializer<IN>(),
-                                               null));
+                                               new NFA.Serializer<IN>()));
                }
 
                @SuppressWarnings("unchecked,rawtypes")
@@ -116,8 +115,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
                                                PRIORIRY_QUEUE_STATE_NAME,
                                                new PriorityQueueSerializer<>(
                                                                
streamRecordSerializer,
-                                                               new 
PriorityQueueStreamRecordFactory<IN>()),
-                                               null));
+                                                               new 
PriorityQueueStreamRecordFactory<IN>())));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 2076c08..2c385c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -268,7 +268,7 @@ public class QueryableStateClientTest {
                                servers[i] = new 
KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], 
serverStats[i]);
                                servers[i].start();
                                ValueStateDescriptor<Integer> descriptor =
-                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 
                                RegisteredBackendStateMetaInfo<VoidNamespace, 
Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(
                                                descriptor.getType(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 0db8b31..86f8766 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -562,7 +562,7 @@ public class KvStateClientTest {
                        clientTaskExecutor = 
Executors.newFixedThreadPool(numClientsTasks);
 
                        // Create state
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                        desc.setQueryable("any");
 
                        // Create servers

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 348d4d9..e8caf57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -88,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                desc.setQueryable("vanilla");
 
                int numKeyGroups =1;
@@ -227,7 +227,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                registry.registerListener(registryListener);
 
                // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                desc.setQueryable("vanilla");
 
                backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
@@ -372,7 +372,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                registry.registerListener(registryListener);
 
                // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                desc.setQueryable("vanilla");
 
                backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
@@ -511,7 +511,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                registry.registerListener(registryListener);
 
                // Register state
-               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                desc.setQueryable("vanilla");
 
                ValueState<Integer> state = backend.getPartitionedState(
@@ -607,7 +607,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                registry.registerListener(registryListener);
 
                // Register state
-               ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE, null);
+               ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
                desc.setQueryable("vanilla");
 
                ValueState<byte[]> state = backend.getPartitionedState(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
index b1c4a9f..249b225 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
@@ -105,7 +105,7 @@ public class KvStateServerTest {
 
                        registry.registerListener(registryListener);
 
-                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
                        desc.setQueryable("vanilla");
 
                        ValueState<Integer> state = backend.getPartitionedState(

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9bc4c53..641e14b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -155,7 +155,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                CheckpointStreamFactory streamFactory = createStreamFactory();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
                kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
@@ -253,8 +253,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                new KeyGroupRange(0, 0),
                                new DummyEnvironment("test_op", 1, 0));
 
-               ValueStateDescriptor<String> desc1 = new 
ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE, null);
-               ValueStateDescriptor<Integer> desc2 = new 
ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE, null);
+               ValueStateDescriptor<String> desc1 = new 
ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
+               ValueStateDescriptor<Integer> desc2 = new 
ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
 
                desc1.initializeSerializerUnlessSet(new ExecutionConfig());
                desc2.initializeSerializerUnlessSet(new ExecutionConfig());
@@ -822,7 +822,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                new KeyGroupRange(0, MAX_PARALLELISM - 1),
                                new DummyEnvironment("test", 1, 0));
 
-               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+               ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
                kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
@@ -904,7 +904,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        CheckpointStreamFactory streamFactory = 
createStreamFactory();
                        AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
-                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class, null);
+                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
                        kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                        ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
@@ -927,7 +927,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                (TypeSerializer<String>) (TypeSerializer<?>) 
FloatSerializer.INSTANCE;
 
                        try {
-                               kvId = new ValueStateDescriptor<>("id", 
fakeStringSerializer, null);
+                               kvId = new ValueStateDescriptor<>("id", 
fakeStringSerializer);
 
                                state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1259,8 +1259,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
                                "test",
-                               IntSerializer.INSTANCE,
-                               null);
+                               IntSerializer.INSTANCE);
                desc.setQueryable("banana");
 
                backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 560ecab..73d8926 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -652,8 +652,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        public QueryableStateStream<KEY, T> asQueryableState(String 
queryableStateName) {
                ValueStateDescriptor<T> valueStateDescriptor = new 
ValueStateDescriptor<T>(
                                UUID.randomUUID().toString(),
-                               getType(),
-                               null);
+                               getType());
 
                return asQueryableState(queryableStateName, 
valueStateDescriptor);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 86fd8e4..76a18d8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -71,7 +71,7 @@ public class StreamGroupedFold<IN, OUT, KEY>
                        initialValue = outTypeSerializer.deserialize(in);
                }
                
-               ValueStateDescriptor<OUT> stateId = new 
ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
+               ValueStateDescriptor<OUT> stateId = new 
ValueStateDescriptor<>(STATE_NAME, outTypeSerializer);
                values = getPartitionedState(stateId);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 48b9c2d..156f336 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -45,7 +45,7 @@ public class StreamGroupedReduce<IN> extends 
AbstractUdfStreamOperator<IN, Reduc
        @Override
        public void open() throws Exception {
                super.open();
-               ValueStateDescriptor<IN> stateId = new 
ValueStateDescriptor<>(STATE_NAME, serializer, null);
+               ValueStateDescriptor<IN> stateId = new 
ValueStateDescriptor<>(STATE_NAME, serializer);
                values = getPartitionedState(stateId);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 4a7262a..89cca22 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -46,7 +46,7 @@ public class DeltaTrigger<T, W extends Window> extends 
Trigger<T, W> {
        private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, 
TypeSerializer<T> stateSerializer) {
                this.deltaFunction = deltaFunction;
                this.threshold = threshold;
-               stateDesc = new ValueStateDescriptor<>("last-element", 
stateSerializer, null);
+               stateDesc = new ValueStateDescriptor<>("last-element", 
stateSerializer);
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 2844fbb..f4051c9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -491,7 +491,7 @@ public class AbstractStreamOperatorTest {
                private transient InternalTimerService<VoidNamespace> 
timerService;
 
                private final ValueStateDescriptor<String> stateDescriptor =
-                               new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<>("state", 
StringSerializer.INSTANCE);
 
                @Override
                public void open() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 74fd044..89d9899 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -349,7 +349,7 @@ public class ProcessOperatorTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<Integer> state =
-                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE,  null);
+                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE);
 
                private final TimeDomain timeDomain;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 155a16f..87c57f8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -75,7 +75,7 @@ public class StreamingRuntimeContextTest {
                                createMockEnvironment(),
                                Collections.<String, Accumulator<?, 
?>>emptyMap());
 
-               ValueStateDescriptor<TaskInfo> descr = new 
ValueStateDescriptor<>("name", TaskInfo.class, null);
+               ValueStateDescriptor<TaskInfo> descr = new 
ValueStateDescriptor<>("name", TaskInfo.class);
                context.getState(descr);
                
                StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, 
?>) descriptorCapture.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index a449359..eea428f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -398,7 +398,7 @@ public class CoProcessOperatorTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<String> state =
-                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE);
 
                @Override
                public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
@@ -479,7 +479,7 @@ public class CoProcessOperatorTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<String> state =
-                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE);
 
                @Override
                public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index f2999b3..b251ca6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -474,8 +474,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
   def asQueryableState(queryableStateName: String) : QueryableStateStream[K, 
T] = {
     val stateDescriptor = new ValueStateDescriptor(
       queryableStateName,
-      dataType.createSerializer(executionConfig),
-      null.asInstanceOf[T])
+      dataType.createSerializer(executionConfig))
 
     asQueryableState(queryableStateName, stateDescriptor)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 2cb2761..52dc1a6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -47,7 +47,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
   }
 
   override def open(c: Configuration) = {
-    val info = new ValueStateDescriptor[S]("state", stateSerializer, 
null.asInstanceOf[S])
+    val info = new ValueStateDescriptor[S]("state", stateSerializer)
     state = getRuntimeContext().getState(info)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
index cc21683..10a8998 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
@@ -358,7 +358,7 @@ public class StatefulUDFSavepointMigrationITCase extends 
SavepointMigrationTestB
                                new Tuple2<>("hello", 42L);
 
                private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
 
                @Override
                public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -385,7 +385,7 @@ public class StatefulUDFSavepointMigrationITCase extends 
SavepointMigrationTestB
                private transient Tuple2<String, Long> restoredState;
 
                private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
 
                @Override
                public void open(Configuration parameters) throws Exception {
@@ -419,7 +419,7 @@ public class StatefulUDFSavepointMigrationITCase extends 
SavepointMigrationTestB
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
 
                @Override
                public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -434,7 +434,7 @@ public class StatefulUDFSavepointMigrationITCase extends 
SavepointMigrationTestB
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<Long> stateDescriptor =
-                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE, null);
+                               new ValueStateDescriptor<Long>("state-name", 
LongSerializer.INSTANCE);
 
                @Override
                public void open(Configuration parameters) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40daeaf..a5ed6ad 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -603,8 +603,7 @@ public class QueryableStateITCase extends TestLogger {
                        // Value state
                        ValueStateDescriptor<Tuple2<Integer, Long>> valueState 
= new ValueStateDescriptor<>(
                                        "any",
-                                       source.getType(),
-                                       null);
+                                       source.getType());
 
                        QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
                                        source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {

Reply via email to