Repository: flink Updated Branches: refs/heads/release-1.2 af244aaed -> 8b7331409
[FLINK-5155] Deprecate ValueStateDescriptor constructors with default value Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b733140 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b733140 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b733140 Branch: refs/heads/release-1.2 Commit: 8b7331409cd1b903d2ca46b81f3265bc2973e659 Parents: b4c60a9 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Jan 11 12:14:13 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Jan 13 11:38:44 2017 +0100 ---------------------------------------------------------------------- .../state/RocksDBAsyncSnapshotTest.java | 6 +-- .../flink/api/common/state/ValueState.java | 10 ++-- .../api/common/state/ValueStateDescriptor.java | 50 +++++++++++++++++++- .../AbstractKeyedCEPPatternOperator.java | 6 +-- .../runtime/query/QueryableStateClientTest.java | 2 +- .../runtime/query/netty/KvStateClientTest.java | 2 +- .../query/netty/KvStateServerHandlerTest.java | 10 ++-- .../runtime/query/netty/KvStateServerTest.java | 2 +- .../runtime/state/StateBackendTestBase.java | 15 +++--- .../streaming/api/datastream/KeyedStream.java | 3 +- .../api/operators/StreamGroupedFold.java | 2 +- .../api/operators/StreamGroupedReduce.java | 2 +- .../api/windowing/triggers/DeltaTrigger.java | 2 +- .../operators/AbstractStreamOperatorTest.java | 2 +- .../api/operators/ProcessOperatorTest.java | 2 +- .../operators/StreamingRuntimeContextTest.java | 2 +- .../api/operators/co/CoProcessOperatorTest.java | 4 +- .../flink/streaming/api/scala/KeyedStream.scala | 3 +- .../api/scala/function/StatefulFunction.scala | 2 +- .../StatefulUDFSavepointMigrationITCase.java | 8 ++-- .../flink/test/query/QueryableStateITCase.java | 3 +- 21 files changed, 90 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 002b16d..70f74b0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -371,8 +371,7 @@ public class RocksDBAsyncSnapshotTest { ValueState<String> state = getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("count", - StringSerializer.INSTANCE, "hello")); + new ValueStateDescriptor<>("count", StringSerializer.INSTANCE)); } @@ -383,8 +382,7 @@ public class RocksDBAsyncSnapshotTest { ValueState<String> state = getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("count", - StringSerializer.INSTANCE, "hello")); + new ValueStateDescriptor<>("count", StringSerializer.INSTANCE)); state.update(element.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java index de3250a..7e42daa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java @@ -45,8 +45,11 @@ public interface ValueState<T> extends State { * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. - * - * @return The operator state value corresponding to the current input. + * + * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor} + * this will return {@code null} when to value was previously set using {@link #update(Object)}. + * + * @return The state value corresponding to the current input. * * @throws IOException Thrown if the system cannot access the state. */ @@ -59,8 +62,7 @@ public interface ValueState<T> extends State { * partitioned state is updated with null, the state for the current key * will be removed and the default value is returned on the next access. * - * @param value - * The new value for the state. + * @param value The new value for the state. * * @throws IOException Thrown if the system cannot access the state. */ http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java index 7db9116..b3006c4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -27,6 +27,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; * value state using * {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}. * + * <p>If you don't use one of the constructors that set a default value the value that you + * get when reading a {@link ValueState} using {@link ValueState#value()} will be {@code null}. + * * @param <T> The type of the values that the value state can hold. */ @PublicEvolving @@ -38,12 +41,16 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> { * * <p>If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor. - * + * + * @deprecated Use {@link #ValueStateDescriptor(String, Class)} instead and manually manage + * the default value by checking whether the contents of the state is {@code null}. + * * @param name The (unique) name for the state. * @param typeClass The type of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ + @Deprecated public ValueStateDescriptor(String name, Class<T> typeClass, T defaultValue) { super(name, typeClass, defaultValue); } @@ -51,11 +58,15 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> { /** * Creates a new {@code ValueStateDescriptor} with the given name and default value. * + * @deprecated Use {@link #ValueStateDescriptor(String, TypeInformation)} instead and manually + * manage the default value by checking whether the contents of the state is {@code null}. + * * @param name The (unique) name for the state. * @param typeInfo The type of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ + @Deprecated public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) { super(name, typeInfo, defaultValue); } @@ -64,15 +75,52 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> { * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific * serializer. * + * @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually + * manage the default value by checking whether the contents of the state is {@code null}. + * * @param name The (unique) name for the state. * @param typeSerializer The type serializer of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ + @Deprecated public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) { super(name, typeSerializer, defaultValue); } + /** + * Creates a new {@code ValueStateDescriptor} with the given name and type + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ValueStateDescriptor(String, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param typeClass The type of the values in the state. + */ + public ValueStateDescriptor(String name, Class<T> typeClass) { + super(name, typeClass, null); + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and type. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. + */ + public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and the specific serializer. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer of the values in the state. + */ + public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) { + super(name, typeSerializer, null); + } + // ------------------------------------------------------------------------ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index b5601ef..832a0ba 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -102,8 +102,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst nfaOperatorState = getPartitionedState( new ValueStateDescriptor<NFA<IN>>( NFA_OPERATOR_STATE_NAME, - new NFA.Serializer<IN>(), - null)); + new NFA.Serializer<IN>())); } @SuppressWarnings("unchecked,rawtypes") @@ -116,8 +115,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst PRIORIRY_QUEUE_STATE_NAME, new PriorityQueueSerializer<>( streamRecordSerializer, - new PriorityQueueStreamRecordFactory<IN>()), - null)); + new PriorityQueueStreamRecordFactory<IN>()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index 2076c08..2c385c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -268,7 +268,7 @@ public class QueryableStateClientTest { servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]); servers[i].start(); ValueStateDescriptor<Integer> descriptor = - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>( descriptor.getType(), http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index 0db8b31..86f8766 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -562,7 +562,7 @@ public class KvStateClientTest { clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); // Create state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("any"); // Create servers http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index 348d4d9..e8caf57 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -88,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger { EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("vanilla"); int numKeyGroups =1; @@ -227,7 +227,7 @@ public class KvStateServerHandlerTest extends TestLogger { registry.registerListener(registryListener); // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("vanilla"); backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); @@ -372,7 +372,7 @@ public class KvStateServerHandlerTest extends TestLogger { registry.registerListener(registryListener); // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("vanilla"); backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); @@ -511,7 +511,7 @@ public class KvStateServerHandlerTest extends TestLogger { registry.registerListener(registryListener); // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("vanilla"); ValueState<Integer> state = backend.getPartitionedState( @@ -607,7 +607,7 @@ public class KvStateServerHandlerTest extends TestLogger { registry.registerListener(registryListener); // Register state - ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE, null); + ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE); desc.setQueryable("vanilla"); ValueState<byte[]> state = backend.getPartitionedState( http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java index b1c4a9f..249b225 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java @@ -105,7 +105,7 @@ public class KvStateServerTest { registry.registerListener(registryListener); - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null); + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); desc.setQueryable("vanilla"); ValueState<Integer> state = backend.getPartitionedState( http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 9bc4c53..641e14b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -155,7 +155,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten CheckpointStreamFactory streamFactory = createStreamFactory(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); kvId.initializeSerializerUnlessSet(new ExecutionConfig()); TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; @@ -253,8 +253,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0)); - ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE, null); - ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE, null); + ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE); + ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE); desc1.initializeSerializerUnlessSet(new ExecutionConfig()); desc2.initializeSerializerUnlessSet(new ExecutionConfig()); @@ -822,7 +822,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten new KeyGroupRange(0, MAX_PARALLELISM - 1), new DummyEnvironment("test", 1, 0)); - ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); kvId.initializeSerializerUnlessSet(new ExecutionConfig()); ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @@ -904,7 +904,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten CheckpointStreamFactory streamFactory = createStreamFactory(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); kvId.initializeSerializerUnlessSet(new ExecutionConfig()); ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @@ -927,7 +927,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten (TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE; try { - kvId = new ValueStateDescriptor<>("id", fakeStringSerializer, null); + kvId = new ValueStateDescriptor<>("id", fakeStringSerializer); state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @@ -1259,8 +1259,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>( "test", - IntSerializer.INSTANCE, - null); + IntSerializer.INSTANCE); desc.setQueryable("banana"); backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 560ecab..73d8926 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -652,8 +652,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> { public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName) { ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<T>( UUID.randomUUID().toString(), - getType(), - null); + getType()); return asQueryableState(queryableStateName, valueStateDescriptor); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 86fd8e4..76a18d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -71,7 +71,7 @@ public class StreamGroupedFold<IN, OUT, KEY> initialValue = outTypeSerializer.deserialize(in); } - ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null); + ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer); values = getPartitionedState(stateId); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index 48b9c2d..156f336 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -45,7 +45,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc @Override public void open() throws Exception { super.open(); - ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null); + ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer); values = getPartitionedState(stateId); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 4a7262a..89cca22 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -46,7 +46,7 @@ public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> { private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; - stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer, null); + stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer); } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 2844fbb..f4051c9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -491,7 +491,7 @@ public class AbstractStreamOperatorTest { private transient InternalTimerService<VoidNamespace> timerService; private final ValueStateDescriptor<String> stateDescriptor = - new ValueStateDescriptor<>("state", StringSerializer.INSTANCE, null); + new ValueStateDescriptor<>("state", StringSerializer.INSTANCE); @Override public void open() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java index 74fd044..89d9899 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java @@ -349,7 +349,7 @@ public class ProcessOperatorTest extends TestLogger { private static final long serialVersionUID = 1L; private final ValueStateDescriptor<Integer> state = - new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); + new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE); private final TimeDomain timeDomain; http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 155a16f..87c57f8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -75,7 +75,7 @@ public class StreamingRuntimeContextTest { createMockEnvironment(), Collections.<String, Accumulator<?, ?>>emptyMap()); - ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class, null); + ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class); context.getState(descr); StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get(); http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java index a449359..eea428f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java @@ -398,7 +398,7 @@ public class CoProcessOperatorTest extends TestLogger { private static final long serialVersionUID = 1L; private final ValueStateDescriptor<String> state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); @Override public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { @@ -479,7 +479,7 @@ public class CoProcessOperatorTest extends TestLogger { private static final long serialVersionUID = 1L; private final ValueStateDescriptor<String> state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE); @Override public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index f2999b3..b251ca6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -474,8 +474,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] def asQueryableState(queryableStateName: String) : QueryableStateStream[K, T] = { val stateDescriptor = new ValueStateDescriptor( queryableStateName, - dataType.createSerializer(executionConfig), - null.asInstanceOf[T]) + dataType.createSerializer(executionConfig)) asQueryableState(queryableStateName, stateDescriptor) } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala index 2cb2761..52dc1a6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala @@ -47,7 +47,7 @@ trait StatefulFunction[I, O, S] extends RichFunction { } override def open(c: Configuration) = { - val info = new ValueStateDescriptor[S]("state", stateSerializer, null.asInstanceOf[S]) + val info = new ValueStateDescriptor[S]("state", stateSerializer) state = getRuntimeContext().getState(info) } } http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java index cc21683..10a8998 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java @@ -358,7 +358,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB new Tuple2<>("hello", 42L); private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null); + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { @@ -385,7 +385,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB private transient Tuple2<String, Long> restoredState; private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null); + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); @Override public void open(Configuration parameters) throws Exception { @@ -419,7 +419,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB private static final long serialVersionUID = 1L; private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null); + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { @@ -434,7 +434,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB private static final long serialVersionUID = 1L; private final ValueStateDescriptor<Long> stateDescriptor = - new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null); + new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE); @Override public void open(Configuration parameters) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/8b733140/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 40daeaf..a5ed6ad 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -603,8 +603,7 @@ public class QueryableStateITCase extends TestLogger { // Value state ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>( "any", - source.getType(), - null); + source.getType()); QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {