[FLINK-3312] Add simple constructors for State Descriptors
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/180cd3f6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/180cd3f6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/180cd3f6 Branch: refs/heads/master Commit: 180cd3f608731208f5b5ed71e3eb80ae5ccdf5fc Parents: 456d0ab Author: Stephan Ewen <[email protected]> Authored: Wed Feb 3 17:44:07 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:28:41 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 4 +- .../contrib/streaming/state/DbStateBackend.java | 22 +- .../state/DBStateCheckpointingTest.java | 13 +- .../streaming/state/DbStateBackendTest.java | 16 +- .../api/common/state/ListStateDescriptor.java | 82 +++--- .../common/state/ReducingStateDescriptor.java | 97 ++++--- .../apache/flink/api/common/state/State.java | 8 +- .../flink/api/common/state/StateBackend.java | 9 +- .../flink/api/common/state/StateDescriptor.java | 264 ++++++++++++++++++- .../api/common/state/ValueStateDescriptor.java | 162 +++--------- .../common/state/ListStateDescriptorTest.java | 91 +++++++ .../state/ReducingStateDescriptorTest.java | 105 ++++++++ .../common/state/ValueStateDescriptorTest.java | 104 ++++++++ .../examples/windowing/SessionWindowing.java | 6 +- .../flink/runtime/state/AbstractHeapState.java | 2 +- .../runtime/state/AbstractStateBackend.java | 7 +- .../org/apache/flink/runtime/state/KvState.java | 2 +- .../flink/runtime/state/KvStateSnapshot.java | 2 +- .../state/filesystem/AbstractFsState.java | 2 +- .../filesystem/AbstractFsStateSnapshot.java | 2 +- .../runtime/state/memory/AbstractMemState.java | 2 +- .../state/memory/AbstractMemStateSnapshot.java | 2 +- .../runtime/state/StateBackendTestBase.java | 31 ++- .../api/operators/AbstractStreamOperator.java | 4 +- .../api/operators/StreamGroupedFold.java | 11 +- .../api/operators/StreamGroupedReduce.java | 2 +- .../api/operators/StreamingRuntimeContext.java | 5 +- .../triggers/ContinuousEventTimeTrigger.java | 8 +- .../ContinuousProcessingTimeTrigger.java | 11 +- .../api/windowing/triggers/CountTrigger.java | 10 +- .../api/windowing/triggers/DeltaTrigger.java | 2 +- .../api/windowing/triggers/Trigger.java | 2 +- .../windowing/EvictingWindowOperator.java | 8 +- .../windowing/NonKeyedWindowOperator.java | 5 +- .../operators/windowing/WindowOperator.java | 13 +- .../operators/StreamingRuntimeContextTest.java | 156 +++++++++++ ...AlignedProcessingTimeWindowOperatorTest.java | 2 +- ...AlignedProcessingTimeWindowOperatorTest.java | 3 +- .../api/scala/function/StatefulFunction.scala | 2 +- .../EventTimeWindowCheckpointingITCase.java | 4 +- .../PartitionedStateCheckpointingITCase.java | 10 +- .../StreamCheckpointingITCase.java | 4 +- 42 files changed, 953 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 6dbe16c..3ecec5d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -62,7 +62,7 @@ import static java.util.Objects.requireNonNull; * @param <SD> The type of {@link StateDescriptor}. * @param <Backend> The type of the backend that snapshots this key/value state. */ -public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> +public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvState<K, N, S, SD, Backend>, State { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class); @@ -258,7 +258,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } } - public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> { + public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class); http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index c55b3c0..1d1ccd7 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -195,6 +195,11 @@ public class DbStateBackend extends AbstractStateBackend { @Override protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception { + + if (!stateDesc.isSerializerInitialized()) { + throw new IllegalArgumentException("state descriptor serializer not initialized"); + } + String stateName = operatorIdentifier + "_"+ stateDesc.getName(); return new LazyDbValueState<>( @@ -210,7 +215,14 @@ public class DbStateBackend extends AbstractStateBackend { @Override protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception { - ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, new ArrayListSerializer<>(stateDesc.getSerializer())); + + if (!stateDesc.isSerializerInitialized()) { + throw new IllegalArgumentException("state descriptor serializer not initialized"); + } + + ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), + new ArrayListSerializer<>(stateDesc.getSerializer()), null); + ValueState<ArrayList<T>> valueState = createValueState(namespaceSerializer, valueStateDescriptor); return new GenericListState<>(valueState); } @@ -220,7 +232,13 @@ public class DbStateBackend extends AbstractStateBackend { protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception { - ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer()); + if (!stateDesc.isSerializerInitialized()) { + throw new IllegalArgumentException("state descriptor serializer not initialized"); + } + + ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>( + stateDesc.getName(), stateDesc.getSerializer(), null); + ValueState<T> valueState = createValueState(namespaceSerializer, valueStateDescriptor); return new GenericReducingState<>(valueState, stateDesc.getReduceFunction()); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java index 0afdada..cc2147f 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java @@ -31,14 +31,13 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.FileUtils; + import org.apache.derby.drda.NetworkServerControl; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -210,7 +209,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; sum = getRuntimeContext().getState( - new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); + new ValueStateDescriptor<>("my_state", Long.class, 0L)); } @Override @@ -238,11 +237,9 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { @Override public void open(Configuration parameters) throws IOException { aCounts = getRuntimeContext().getState( - new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), - new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); + new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L))); - bCounts = getRuntimeContext().getState( - new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); + bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class, 0L)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java index 34adf75..d4883dd 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java @@ -63,7 +63,6 @@ import org.junit.Test; import com.google.common.base.Optional; import static org.junit.Assert.*; -import static org.junit.Assert.fail; public class DbStateBackendTest { @@ -201,7 +200,7 @@ public class DbStateBackendTest { backend.initializeForJob(env, "dummy_test_kv", IntSerializer.INSTANCE); ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE, - new ValueStateDescriptor<>("state1", null, StringSerializer.INSTANCE)); + new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, null)); LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state; @@ -455,9 +454,14 @@ public class DbStateBackendTest { backend2.initializeForJob(new DummyEnvironment("test", 3, 1), "dummy_2", StringSerializer.INSTANCE); backend3.initializeForJob(new DummyEnvironment("test", 3, 2), "dummy_3", StringSerializer.INSTANCE); - ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a1", null, StringSerializer.INSTANCE)); - ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a2", null, StringSerializer.INSTANCE)); - ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a3", null, StringSerializer.INSTANCE)); + ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, + new ValueStateDescriptor<>("a1", StringSerializer.INSTANCE, null)); + + ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, + new ValueStateDescriptor<>("a2", StringSerializer.INSTANCE, null)); + + ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, + new ValueStateDescriptor<>("a3", StringSerializer.INSTANCE, null)); LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) s1State; LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) s2State; @@ -520,7 +524,7 @@ public class DbStateBackendTest { backend.initializeForJob(env, "dummy_test_caching", IntSerializer.INSTANCE); ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE, - new ValueStateDescriptor<>("state1", "a", StringSerializer.INSTANCE)); + new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, "a")); LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state; http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java index e391126..9f266d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,82 +6,68 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.state; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import static java.util.Objects.requireNonNull; - /** - * {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned + * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned * list state using - * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}. * * @param <T> The type of the values that can be added to the list state. */ -public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> { +public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> { private static final long serialVersionUID = 1L; - private final TypeSerializer<T> serializer; - /** - * Creates a new {@code ListStateDescriptor} with the given name. + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. * * @param name The (unique) name for the state. - * @param serializer {@link TypeSerializer} for the state values. + * @param typeClass The type of the values in the state. */ - public ListStateDescriptor(String name, TypeSerializer<T> serializer) { - super(requireNonNull(name)); - this.serializer = requireNonNull(serializer); - } - - @Override - public ListState<T> bind(StateBackend stateBackend) throws Exception { - return stateBackend.createListState(this); + public ListStateDescriptor(String name, Class<T> typeClass) { + super(name, typeClass, null); } /** - * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. */ - public TypeSerializer<T> getSerializer() { - return serializer; + public ListStateDescriptor(String name, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ListStateDescriptor<?> that = (ListStateDescriptor<?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer for the list values. + */ + public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) { + super(name, typeSerializer, null); } + + // ------------------------------------------------------------------------ @Override - public String toString() { - return "ListStateDescriptor{" + - "serializer=" + serializer + - '}'; + public ListState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createListState(this); } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java index 7153a05..1ef65a3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ package org.apache.flink.api.common.state; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import static java.util.Objects.requireNonNull; @@ -27,80 +28,70 @@ import static java.util.Objects.requireNonNull; /** * {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned * reducing state using - * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * {@link org.apache.flink.api.common.functions.RuntimeContext#getReducingState(ReducingStateDescriptor)}. * * @param <T> The type of the values that can be added to the list state. */ -public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>> { +public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> { private static final long serialVersionUID = 1L; - - private final TypeSerializer<T> serializer; - + + private final ReduceFunction<T> reduceFunction; /** - * Creates a new {@code ReducingStateDescriptor} with the given name and reduce function. + * Creates a new {@code ReducingStateDescriptor} with the given name, type, and default value. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} constructor. * * @param name The (unique) name for the state. - * @param serializer {@link TypeSerializer} for the state values. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeClass The type of the values in the state. */ - public ReducingStateDescriptor(String name, - ReduceFunction<T> reduceFunction, - TypeSerializer<T> serializer) { - super(requireNonNull(name)); + public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) { + super(name, typeClass, null); + this.reduceFunction = requireNonNull(reduceFunction); + if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); } - this.serializer = requireNonNull(serializer); - this.reduceFunction = reduceFunction; - } - - @Override - public ReducingState<T> bind(StateBackend stateBackend) throws Exception { - return stateBackend.createReducingState(this); } /** - * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Creates a new {@code ReducingStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeInfo The type of the values in the state. */ - public TypeSerializer<T> getSerializer() { - return serializer; + public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) { + super(name, typeInfo, null); + this.reduceFunction = requireNonNull(reduceFunction); } /** - * Returns the reduce function to be used for the reducing state. + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeSerializer The type serializer of the values in the state. */ - public ReduceFunction<T> getReduceFunction() { - return reduceFunction; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - + public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeSerializer<T> typeSerializer) { + super(name, typeSerializer, null); + this.reduceFunction = requireNonNull(reduceFunction); } + // ------------------------------------------------------------------------ + @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; + public ReducingState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createReducingState(this); } - @Override - public String toString() { - return "ReducingStateDescriptor{" + - "serializer=" + serializer + - ", reduceFunction=" + reduceFunction + - '}'; + /** + * Returns the reduce function to be used for the reducing state. + */ + public ReduceFunction<T> getReduceFunction() { + return reduceFunction; } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/State.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java index 5a7650e..b97658b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java index d5adf9b..8c7c608 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,15 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.state; /** http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 737133f..38087fc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,17 +6,30 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.state; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import static java.util.Objects.requireNonNull; @@ -30,21 +43,80 @@ import static java.util.Objects.requireNonNull; * * @param <S> The type of the State objects created from this {@code StateDescriptor}. */ -public abstract class StateDescriptor<S extends State> implements Serializable { +public abstract class StateDescriptor<S extends State, T> implements Serializable { private static final long serialVersionUID = 1L; /** Name that uniquely identifies state created from this StateDescriptor. */ - protected final String name; + private final String name; + + /** The serializer for the type. May be eagerly initialized in the constructor, + * or lazily once the type is serialized or an ExecutionConfig is provided. */ + private TypeSerializer<T> serializer; + + /** The default value returned by the state when no other value is bound to a key */ + private transient T defaultValue; + /** The type information describing the value type. Only used to lazily create the serializer + * and dropped during serialization */ + private transient TypeInformation<T> typeInfo; + + // ------------------------------------------------------------------------ + /** - * Create a new {@code StateDescriptor} with the given name. + * Create a new {@code StateDescriptor} with the given name and the given type serializer. + * * @param name The name of the {@code StateDescriptor}. + * @param serializer The type serializer for the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. */ - public StateDescriptor(String name) { - this.name = requireNonNull(name); + protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + this.serializer = requireNonNull(serializer, "serializer must not be null"); + this.defaultValue = defaultValue; } /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + * @param name The name of the {@code StateDescriptor}. + * @param typeInfo The type information for the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + this.typeInfo = requireNonNull(typeInfo, "type information must not be null"); + this.defaultValue = defaultValue; + } + + /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. + * + * @param name The name of the {@code StateDescriptor}. + * @param type The class of the type of values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + protected StateDescriptor(String name, Class<T> type, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + requireNonNull(type, "type class must not be null"); + + try { + this.typeInfo = TypeExtractor.createTypeInfo(type); + } catch (Exception e) { + throw new RuntimeException("Cannot create full type information based on the given class. If the type has generics, please", e); + } + + this.defaultValue = defaultValue; + } + + // ------------------------------------------------------------------------ + + /** * Returns the name of this {@code StateDescriptor}. */ public String getName() { @@ -52,15 +124,179 @@ public abstract class StateDescriptor<S extends State> implements Serializable { } /** + * Returns the default value. + */ + public T getDefaultValue() { + if (defaultValue != null) { + if (serializer != null) { + return serializer.copy(defaultValue); + } else { + throw new IllegalStateException("Serializer not yet initialized."); + } + } else { + return null; + } + } + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Note that the serializer may initialized lazily and is only guaranteed to exist after + * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + */ + public TypeSerializer<T> getSerializer() { + if (serializer != null) { + return serializer; + } else { + throw new IllegalStateException("Serializer not yet initialized."); + } + } + + /** * Creates a new {@link State} on the given {@link StateBackend}. * * @param stateBackend The {@code StateBackend} on which to create the {@link State}. */ - public abstract S bind(StateBackend stateBackend) throws Exception ; + public abstract S bind(StateBackend stateBackend) throws Exception; + + // ------------------------------------------------------------------------ - // Force subclasses to implement - public abstract boolean equals(Object o); + /** + * Checks whether the serializer has been initialized. Serializer initialization is lazy, + * to allow parametrization of serializers with an {@link ExecutionConfig} via + * {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + * + * @return True if the serializers have been initialized, false otherwise. + */ + public boolean isSerializerInitialized() { + return serializer != null; + } + + /** + * Initializes the serializer, unless it has been initialized before. + * + * @param executionConfig The execution config to use when creating the serializer. + */ + public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { + if (serializer == null) { + if (typeInfo != null) { + serializer = typeInfo.createSerializer(executionConfig); + } else { + throw new IllegalStateException( + "Cannot initialize serializer after TypeInformation was dropped during serialization"); + } + } + } + + /** + * This method should be called by subclasses prior to serialization. Because the TypeInformation is + * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor + * needs to make sure that the serializer is created before the TypeInformation is dropped. + */ + private void ensureSerializerCreated() { + if (serializer == null) { + if (typeInfo != null) { + serializer = typeInfo.createSerializer(new ExecutionConfig()); + } else { + throw new IllegalStateException( + "Cannot initialize serializer after TypeInformation was dropped during serialization"); + } + } + } + + // ------------------------------------------------------------------------ + // Standard Utils + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + return name.hashCode() + 41; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o == null || getClass() != o.getClass()) { + return false; + } + else { + StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o; + return this.name.equals(that.name); + } + } - // Force subclasses to implement - public abstract int hashCode(); + @Override + public String toString() { + return getClass().getSimpleName() + + "{ name=" + name + + ", defaultValue=" + defaultValue + + ", serializer=" + serializer + + '}'; + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + private void writeObject(final ObjectOutputStream out) throws IOException { + // make sure we have a serializer before the type information gets lost + ensureSerializerCreated(); + + // write all the non-transient fields + out.defaultWriteObject(); + + // write the non-serializable default value field + if (defaultValue == null) { + // we don't have a default value + out.writeBoolean(false); + } else { + // we have a default value + out.writeBoolean(true); + + byte[] serializedDefaultValue; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) + { + serializer.serialize(defaultValue, outView); + outView.flush(); + serializedDefaultValue = baos.toByteArray(); + } + catch (Exception e) { + throw new IOException("Unable to serialize default value of type " + + defaultValue.getClass().getSimpleName() + ".", e); + } + + out.writeInt(serializedDefaultValue.length); + out.write(serializedDefaultValue); + } + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + // read the non-transient fields + in.defaultReadObject(); + + // read the default value field + boolean hasDefaultValue = in.readBoolean(); + if (hasDefaultValue) { + int size = in.readInt(); + byte[] buffer = new byte[size]; + int bytesRead = in.read(buffer); + + if (bytesRead != size) { + throw new RuntimeException("Read size does not match expected size."); + } + + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) + { + defaultValue = serializer.deserialize(inView); + } + catch (Exception e) { + throw new IOException("Unable to deserialize default value.", e); + } + } else { + defaultValue = null; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java index bcfa46f..f949c57 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,161 +6,75 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.state; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import static java.util.Objects.requireNonNull; /** * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned * value state using - * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}. * * @param <T> The type of the values that the value state can hold. */ -public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> { +public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> { private static final long serialVersionUID = 1L; - - private transient T defaultValue; - - private final TypeSerializer<T> serializer; - + /** - * Creates a new {@code ValueStateDescriptor} with the given name and default value. - * + * Creates a new {@code ValueStateDescriptor} with the given name, type, and default value. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor. + * * @param name The (unique) name for the state. + * @param typeClass The type of the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. - * @param serializer {@link TypeSerializer} for the state values. */ - public ValueStateDescriptor(String name, T defaultValue, TypeSerializer<T> serializer) { - super(requireNonNull(name)); - this.defaultValue = defaultValue; - this.serializer = requireNonNull(serializer); - } - - private void writeObject(final ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - - if (defaultValue == null) { - // we don't have a default value - out.writeBoolean(false); - } else { - out.writeBoolean(true); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper outView = - new DataOutputViewStreamWrapper(new DataOutputStream(baos)); - - try { - serializer.serialize(defaultValue, outView); - } catch (IOException ioe) { - throw new RuntimeException("Unable to serialize default value of type " + - defaultValue.getClass().getSimpleName() + ".", ioe); - } - - outView.close(); - - out.writeInt(baos.size()); - out.write(baos.toByteArray()); - } - - } - - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - boolean hasDefaultValue = in.readBoolean(); - - if (hasDefaultValue) { - int size = in.readInt(); - byte[] buffer = new byte[size]; - int bytesRead = in.read(buffer); - - if (bytesRead != size) { - throw new RuntimeException("Read size does not match expected size."); - } - - ByteArrayInputStream bais = new ByteArrayInputStream(buffer); - DataInputViewStreamWrapper inView = - new DataInputViewStreamWrapper(new DataInputStream(bais)); - defaultValue = serializer.deserialize(inView); - } else { - defaultValue = null; - } - } - - @Override - public ValueState<T> bind(StateBackend stateBackend) throws Exception { - return stateBackend.createValueState(this); + public ValueStateDescriptor(String name, Class<T> typeClass, T defaultValue) { + super(name, typeClass, defaultValue); } /** - * Returns the default value. + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. */ - public T getDefaultValue() { - if (defaultValue != null) { - return serializer.copy(defaultValue); - } else { - return null; - } + public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) { + super(name, typeInfo, defaultValue); } /** - * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific + * serializer. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer of the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. */ - public TypeSerializer<T> getSerializer() { - return serializer; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - - } - - @Override - public int hashCode() { - int result = serializer.hashCode(); - result = 31 * result + name.hashCode(); - return result; + public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) { + super(name, typeSerializer, defaultValue); } + // ------------------------------------------------------------------------ + @Override - public String toString() { - return "ValueStateDescriptor{" + - "name=" + name + - ", defaultValue=" + defaultValue + - ", serializer=" + serializer + - '}'; + public ValueState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createValueState(this); } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java new file mode 100644 index 0000000..6dc00f0 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ListStateDescriptorTest { + + @Test + public void testValueStateDescriptorEagerSerializer() throws Exception { + + TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + + ListStateDescriptor<String> descr = + new ListStateDescriptor<String>("testName", serializer); + + assertEquals("testName", descr.getName()); + assertNotNull(descr.getSerializer()); + assertEquals(serializer, descr.getSerializer()); + + ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); + assertEquals(serializer, copy.getSerializer()); + } + + @Test + public void testValueStateDescriptorLazySerializer() throws Exception { + // some different registered value + ExecutionConfig cfg = new ExecutionConfig(); + cfg.registerKryoType(TaskInfo.class); + + ListStateDescriptor<Path> descr = + new ListStateDescriptor<Path>("testName", Path.class); + + try { + descr.getSerializer(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) {} + + descr.initializeSerializerUnlessSet(cfg); + + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof KryoSerializer); + + assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); + } + + @Test + public void testValueStateDescriptorAutoSerializer() throws Exception { + + ListStateDescriptor<String> descr = + new ListStateDescriptor<String>("testName", String.class); + + ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); + assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java new file mode 100644 index 0000000..0bac930 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +public class ReducingStateDescriptorTest { + + @Test + public void testValueStateDescriptorEagerSerializer() throws Exception { + + @SuppressWarnings("unchecked") + ReduceFunction<String> reducer = mock(ReduceFunction.class); + + TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + + ReducingStateDescriptor<String> descr = + new ReducingStateDescriptor<String>("testName", reducer, serializer); + + assertEquals("testName", descr.getName()); + assertNotNull(descr.getSerializer()); + assertEquals(serializer, descr.getSerializer()); + + ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); + assertEquals(serializer, copy.getSerializer()); + } + + @Test + public void testValueStateDescriptorLazySerializer() throws Exception { + + @SuppressWarnings("unchecked") + ReduceFunction<Path> reducer = mock(ReduceFunction.class); + + // some different registered value + ExecutionConfig cfg = new ExecutionConfig(); + cfg.registerKryoType(TaskInfo.class); + + ReducingStateDescriptor<Path> descr = + new ReducingStateDescriptor<Path>("testName", reducer, Path.class); + + try { + descr.getSerializer(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) {} + + descr.initializeSerializerUnlessSet(cfg); + + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof KryoSerializer); + + assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); + } + + @Test + public void testValueStateDescriptorAutoSerializer() throws Exception { + + @SuppressWarnings("unchecked") + ReduceFunction<String> reducer = mock(ReduceFunction.class); + + ReducingStateDescriptor<String> descr = + new ReducingStateDescriptor<String>("testName", reducer, String.class); + + ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); + assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java new file mode 100644 index 0000000..d03cc47 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ValueStateDescriptorTest { + + @Test + public void testValueStateDescriptorEagerSerializer() throws Exception { + + TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + String defaultValue = "le-value-default"; + + ValueStateDescriptor<String> descr = + new ValueStateDescriptor<String>("testName", serializer, defaultValue); + + assertEquals("testName", descr.getName()); + assertEquals(defaultValue, descr.getDefaultValue()); + assertNotNull(descr.getSerializer()); + assertEquals(serializer, descr.getSerializer()); + + ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertEquals(defaultValue, copy.getDefaultValue()); + assertNotNull(copy.getSerializer()); + assertEquals(serializer, copy.getSerializer()); + } + + @Test + public void testValueStateDescriptorLazySerializer() throws Exception { + + // some default value that goes to the generic serializer + Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI()); + + // some different registered value + ExecutionConfig cfg = new ExecutionConfig(); + cfg.registerKryoType(TaskInfo.class); + + ValueStateDescriptor<Path> descr = + new ValueStateDescriptor<Path>("testName", Path.class, defaultValue); + + try { + descr.getSerializer(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) {} + + descr.initializeSerializerUnlessSet(cfg); + + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof KryoSerializer); + + assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); + } + + @Test + public void testValueStateDescriptorAutoSerializer() throws Exception { + + String defaultValue = "le-value-default"; + + ValueStateDescriptor<String> descr = + new ValueStateDescriptor<String>("testName", String.class, defaultValue); + + ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertEquals(defaultValue, copy.getDefaultValue()); + assertNotNull(copy.getSerializer()); + assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index dafe86f..bd82800 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -17,10 +17,8 @@ package org.apache.flink.streaming.examples.windowing; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -103,8 +101,8 @@ public class SessionWindowing { private final Long sessionTimeout; - private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", -1L, - BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + private final ValueStateDescriptor<Long> stateDesc = + new ValueStateDescriptor<>("last-seen", Long.class, -1L); public SessionTrigger(Long sessionTimeout) { http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java index 206be64..8e77752 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java @@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull; * @param <SD> The type of StateDescriptor for the State S * @param <Backend> The type of the backend that snapshots this key/value state. */ -public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> +public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvState<K, N, S, SD, Backend>, State { /** Map containing the actual key/value pairs */ http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 958b4dc..e989af3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -178,12 +179,16 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ @SuppressWarnings({"rawtypes", "unchecked"}) - public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> stateDescriptor) throws Exception { + public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception { if (keySerializer == null) { throw new Exception("State key serializer has not been configured in the config. " + "This operation cannot use partitioned state."); } + + if (!stateDescriptor.isSerializerInitialized()) { + stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + } if (keyValueStatesByName == null) { keyValueStatesByName = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java index 7a97dc0..89de000 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java @@ -35,7 +35,7 @@ import org.apache.flink.api.common.state.StateDescriptor; * @param <SD> The type of the {@link StateDescriptor} for state {@code S}. * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}. */ -public interface KvState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> { +public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> { /** * Sets the current key, which will be used when using the state access methods. http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java index ce72135..245427e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java @@ -39,7 +39,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; * @param <SD> The type of the {@link StateDescriptor} * @param <Backend> The type of the backend that can restore the state from this snapshot. */ -public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> extends java.io.Serializable { +public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> extends java.io.Serializable { /** * Loads the key/value state back from this snapshot. http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java index 5035953..3cae629 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java @@ -41,7 +41,7 @@ import java.util.Map; * @param <S> The type of State * @param <SD> The type of StateDescriptor for the State S */ -public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S>> +public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> { /** The file system state backend backing snapshots of this state */ http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java index c1e0f12..432a9e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java @@ -39,7 +39,7 @@ import java.util.Map; * @param <N> The type of the namespace in the snapshot state. * @param <SV> The type of the state value. */ -public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> { +public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java index 816c883..cae673d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java @@ -39,7 +39,7 @@ import java.util.Map; * @param <S> The type of State * @param <SD> The type of StateDescriptor for the State S */ -public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S>> +public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> { public AbstractMemState(TypeSerializer<K> keySerializer, http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java index d2efd53..5d4f0d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java @@ -36,7 +36,7 @@ import java.util.Map; * @param <N> The type of the namespace in the snapshot state. * @param <SV> The type of the value in the snapshot state. */ -public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> { +public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 82ab3b3..20a46a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.state; import com.google.common.base.Joiner; + +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -29,11 +31,10 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.IntValueSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.types.IntValue; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -67,7 +68,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", null, StringSerializer.INSTANCE); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") @@ -149,7 +152,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { try { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE); + ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") @@ -246,7 +249,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { return value1 + "," + value2; } }, - StringSerializer.INSTANCE); + String.class); ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") @@ -336,12 +339,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { "test_op", IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", - null, - StringSerializer.INSTANCE); - ValueState<String> state = backend.getPartitionedState(null, - VoidSerializer.INSTANCE, - kvId); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") KvState<Integer, Void, ValueState<String>, ValueStateDescriptor<String>, B> kv = @@ -379,7 +380,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { try { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE); + ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class); ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") @@ -427,7 +428,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { return value1 + "," + value2; } }, - StringSerializer.INSTANCE); + String.class); ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") @@ -468,7 +469,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { try { backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", new IntValue(-1), IntValueSerializer.INSTANCE); + ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1)); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + ValueState<IntValue> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index f8f26b5..6cb04b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -254,7 +254,7 @@ public abstract class AbstractStreamOperator<OUT> * @throws IllegalStateException Thrown, if the key/value state was already initialized. * @throws Exception Thrown, if the state backend cannot create the key/value state. */ - protected <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) throws Exception { + protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception { return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor); } @@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT> * @throws Exception Thrown, if the state backend cannot create the key/value state. */ @SuppressWarnings("unchecked") - protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S> stateDescriptor) throws Exception { + protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception { return getStateBackend().getPartitionedState(namespace, (TypeSerializer<Object>) namespaceSerializer, stateDescriptor); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index e627ec8..0b80884 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -64,10 +64,13 @@ public class StreamGroupedFold<IN, OUT, KEY> "operator. Probably the setOutputType method was not called."); } - ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); - DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - initialValue = outTypeSerializer.deserialize(in); - ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, null, outTypeSerializer); + try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) + { + initialValue = outTypeSerializer.deserialize(in); + } + + ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null); values = getPartitionedState(stateId); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index c054563..2dd7762 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -44,7 +44,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc @Override public void open() throws Exception { super.open(); - ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, null, serializer); + ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null); values = getPartitionedState(stateId); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index f99ab93..c9cc4a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -112,6 +112,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { requireNonNull(stateProperties, "The state properties must not be null"); try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); return operator.getPartitionedState(stateProperties); } catch (Exception e) { throw new RuntimeException("Error while getting state", e); @@ -122,6 +123,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { requireNonNull(stateProperties, "The state properties must not be null"); try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); return operator.getPartitionedState(stateProperties); } catch (Exception e) { throw new RuntimeException("Error while getting state", e); @@ -132,6 +134,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { requireNonNull(stateProperties, "The state properties must not be null"); try { + stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); return operator.getPartitionedState(stateProperties); } catch (Exception e) { throw new RuntimeException("Error while getting state", e); @@ -163,7 +166,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { requireNonNull(stateType, "The state type information must not be null"); ValueStateDescriptor<S> stateProps = - new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + new ValueStateDescriptor<>(name, stateType, defaultState); return getState(stateProps); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 21e35db..02a935c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -38,8 +38,8 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj private final long interval; - private final ValueStateDescriptor<Boolean> stateDesc = new ValueStateDescriptor<>("first", true, - BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new ExecutionConfig())); + private final ValueStateDescriptor<Boolean> stateDesc = + new ValueStateDescriptor<>("first", BooleanSerializer.INSTANCE, true); private ContinuousEventTimeTrigger(long interval) { this.interval = interval; http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 10c975f..25d9508 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -36,8 +37,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge private final long interval; - private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("fire-timestamp", 0L, - BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + private final ValueStateDescriptor<Long> stateDesc = + new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L); private ContinuousProcessingTimeTrigger(long interval) {
