http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index a1e9d7d..a419d1e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -31,7 +31,9 @@ import org.apache.flink.api.common.accumulators.Histogram; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; /** @@ -187,8 +189,58 @@ public interface RuntimeContext { // -------------------------------------------------------------------------------------------- /** + * Gets the partitioned state, which is only accessible if the function is executed on + * a KeyedStream. When interacting with the state only the instance bound to the key of the + * element currently processed by the function is changed. + * Each operator may maintain multiple partitioned states, addressed with different names. + * + * <p>Because the scope of each value is the key of the currently processed element, + * and the elements are distributed by the Flink runtime, the system can transparently + * scale out and redistribute the state and KeyedStream. + * + * <p>The following code example shows how to implement a continuous counter that counts + * how many times elements of a certain key occur, and emits an updated count for that + * element on each occurrence. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * + * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { + * + * private ValueStateDescriptor<Long> countIdentifier = + * new ValueStateDescriptor<>("count", 0L, LongSerializer.INSTANCE); + * + * private ValueState<Long> count; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getPartitionedState(countIdentifier); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * long count = state.value(); + * state.update(value + 1); + * return new Tuple2<>(value, count); + * } + * }); + * + * }</pre> + * + * @param stateDescriptor The StateDescriptor that contains the name and type of the + * state that is being accessed. + * + * @param <S> The type of the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor); + + /** * Gets the key/value state, which is only accessible if the function is executed on - * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will + * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will * return the value bound to the key of the element currently processed by the function. * Each operator may maintain multiple key/value states, addressed with different names. * @@ -226,11 +278,13 @@ public interface RuntimeContext { * the TypeInformation object must be manually passed via * {@link #getKeyValueState(String, TypeInformation, Object)}. * + * * @param name The name of the key/value state. * @param stateType The class of the type that is stored in the state. Used to generate * serializers for managed memory and checkpointing. * @param defaultState The default state value, returned when the state is accessed and * no value has yet been set for the key. May be null. + * * @param <S> The type of the state. * * @return The key/value state access. @@ -238,11 +292,12 @@ public interface RuntimeContext { * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). */ - <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); + @Deprecated + <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); /** * Gets the key/value state, which is only accessible if the function is executed on - * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will + * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will * return the value bound to the key of the element currently processed by the function. * Each operator may maintain multiple key/value states, addressed with different names. * @@ -275,17 +330,19 @@ public interface RuntimeContext { * * }</pre> * + * * @param name The name of the key/value state. * @param stateType The type information for the type that is stored in the state. - * Used to create serializers for managed memory and checkpoints. + * Used to create serializers for managed memory and checkpoints. * @param defaultState The default state value, returned when the state is accessed and * no value has yet been set for the key. May be null. * @param <S> The type of the state. - * + * * @return The key/value state access. * * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). */ - <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); + @Deprecated + <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); }
http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 8f1b6b1..fe18994 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -34,7 +34,9 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; @@ -169,13 +171,22 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { } @Override - public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + + } + + @Override + @Deprecated + public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } @Override - public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + @Deprecated + public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java new file mode 100644 index 0000000..f803105 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +/** + * {@link State} interface for partitioned list state in Operations. + * The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <T> Type of values that this list state keeps. + */ +public interface ListState<T> extends MergingState<T, Iterable<T>> {} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java new file mode 100644 index 0000000..e391126 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import static java.util.Objects.requireNonNull; + +/** + * {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned + * list state using + * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * + * @param <T> The type of the values that can be added to the list state. + */ +public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> { + private static final long serialVersionUID = 1L; + + private final TypeSerializer<T> serializer; + + /** + * Creates a new {@code ListStateDescriptor} with the given name. + * + * @param name The (unique) name for the state. + * @param serializer {@link TypeSerializer} for the state values. + */ + public ListStateDescriptor(String name, TypeSerializer<T> serializer) { + super(requireNonNull(name)); + this.serializer = requireNonNull(serializer); + } + + @Override + public ListState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createListState(this); + } + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + */ + public TypeSerializer<T> getSerializer() { + return serializer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ListStateDescriptor<?> that = (ListStateDescriptor<?>) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ListStateDescriptor{" + + "serializer=" + serializer + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java new file mode 100644 index 0000000..f6c0ecb --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import java.io.IOException; + +/** + * Base interface for partitioned state that supports adding elements and inspecting the current + * state of merged elements. Elements can either be kept in a buffer (list-like) or merged together + * into one value. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <IN> Type of the value that can be added to the state. + * @param <OUT> Type of the value that can be retrieved from the state. + */ +public interface MergingState<IN, OUT> extends State { + + /** + * Returns the current value for the state. When the state is not + * partitioned the returned value is the same for all inputs in a given + * operator instance. If state partitioning is applied, the value returned + * depends on the current operator input, as the operator maintains an + * independent state for each partition. + * + * @return The operator state value corresponding to the current input. + * + * @throws Exception Thrown if the system cannot access the state. + */ + OUT get() throws Exception ; + + /** + * Updates the operator state accessible by {@link #get()} by adding the given value + * to the list of values. The next time {@link #get()} is called (for the same state + * partition) the returned state will represent the updated list. + * + * @param value + * The new value for the state. + * + * @throws IOException Thrown if the system cannot access the state. + */ + void add(IN value) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java index ec30f82..32ffa7f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java @@ -35,6 +35,7 @@ import java.io.IOException; * @param <T> Type of the value in the operator state */ @Public +@Deprecated public interface OperatorState<T> { /** http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java new file mode 100644 index 0000000..3e2c543 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +/** + * {@link State} interface for reducing state. Elements can be added to the state, they will + * be combined using a reduce function. The current state can be inspected. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <T> Type of the value in the operator state + */ +public interface ReducingState<T> extends MergingState<T, T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java new file mode 100644 index 0000000..7153a05 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import static java.util.Objects.requireNonNull; + +/** + * {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned + * reducing state using + * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * + * @param <T> The type of the values that can be added to the list state. + */ +public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>> { + private static final long serialVersionUID = 1L; + + private final TypeSerializer<T> serializer; + + private final ReduceFunction<T> reduceFunction; + + /** + * Creates a new {@code ReducingStateDescriptor} with the given name and reduce function. + * + * @param name The (unique) name for the state. + * @param serializer {@link TypeSerializer} for the state values. + */ + public ReducingStateDescriptor(String name, + ReduceFunction<T> reduceFunction, + TypeSerializer<T> serializer) { + super(requireNonNull(name)); + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); + } + this.serializer = requireNonNull(serializer); + this.reduceFunction = reduceFunction; + } + + @Override + public ReducingState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createReducingState(this); + } + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + */ + public TypeSerializer<T> getSerializer() { + return serializer; + } + + /** + * Returns the reduce function to be used for the reducing state. + */ + public ReduceFunction<T> getReduceFunction() { + return reduceFunction; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ReducingStateDescriptor{" + + "serializer=" + serializer + + ", reduceFunction=" + reduceFunction + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/State.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java new file mode 100644 index 0000000..255a735 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.state; + +/** + * Interface that different types of partitioned state must implement. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + */ +public interface State { + void clear(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java new file mode 100644 index 0000000..d5adf9b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.state; + +/** + * The {@code StateBackend} is used by {@link StateDescriptor} instances to create actual state + * representations. + */ +public interface StateBackend { + + /** + * Creates and returns a new {@link ValueState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <T> The type of the value that the {@code ValueState} can store. + */ + <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ListState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <T> The type of the values that the {@code ListState} can store. + */ + <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ReducingState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <T> The type of the values that the {@code ListState} can store. + */ + <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java new file mode 100644 index 0000000..f62118d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.state; + +import java.io.Serializable; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned + * {@link State} in stateful operations. This contains the name and can create an actual state + * object given a {@link StateBackend} using {@link #bind(StateBackend)}. + * + * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}. + * + * @param <S> The type of the State objects created from this {@code StateDescriptor}. + */ +public abstract class StateDescriptor<S extends State> implements Serializable { + private static final long serialVersionUID = 1L; + + /** Name that uniquely identifies state created from this StateDescriptor. */ + protected final String name; + + /** + * Create a new {@code StateDescriptor} with the given name. + * @param name The name of the {@code StateDescriptor}. + */ + public StateDescriptor(String name) { + this.name = requireNonNull(name);; + } + + /** + * Returns the name of this {@code StateDescriptor}. + */ + public String getName() { + return name; + } + + /** + * Creates a new {@link State} on the given {@link StateBackend}. + * + * @param stateBackend The {@code StateBackend} on which to create the {@link State}. + */ + public abstract S bind(StateBackend stateBackend) throws Exception ; + + // Force subclasses to implement + public abstract boolean equals(Object o); + + // Force subclasses to implement + public abstract int hashCode(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java new file mode 100644 index 0000000..ddb048f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Public; + +import java.io.IOException; + +/** + * {@link State} interface for partitioned single-value state. The value can be retrieved or + * updated. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <T> Type of the value in the state. + */ +@Public +public interface ValueState<T> extends State, OperatorState<T> { + + /** + * Returns the current value for the state. When the state is not + * partitioned the returned value is the same for all inputs in a given + * operator instance. If state partitioning is applied, the value returned + * depends on the current operator input, as the operator maintains an + * independent state for each partition. + * + * @return The operator state value corresponding to the current input. + * + * @throws IOException Thrown if the system cannot access the state. + */ + T value() throws IOException; + + /** + * Updates the operator state accessible by {@link #value()} to the given + * value. The next time {@link #value()} is called (for the same state + * partition) the returned state will represent the updated value. When a + * partitioned state is updated with null, the state for the current key + * will be removed and the default value is returned on the next access. + * + * @param value + * The new value for the state. + * + * @throws IOException Thrown if the system cannot access the state. + */ + void update(T value) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java new file mode 100644 index 0000000..bcfa46f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static java.util.Objects.requireNonNull; + +/** + * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned + * value state using + * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}. + * + * @param <T> The type of the values that the value state can hold. + */ +public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> { + private static final long serialVersionUID = 1L; + + private transient T defaultValue; + + private final TypeSerializer<T> serializer; + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + * @param serializer {@link TypeSerializer} for the state values. + */ + public ValueStateDescriptor(String name, T defaultValue, TypeSerializer<T> serializer) { + super(requireNonNull(name)); + this.defaultValue = defaultValue; + this.serializer = requireNonNull(serializer); + } + + private void writeObject(final ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + + if (defaultValue == null) { + // we don't have a default value + out.writeBoolean(false); + } else { + out.writeBoolean(true); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = + new DataOutputViewStreamWrapper(new DataOutputStream(baos)); + + try { + serializer.serialize(defaultValue, outView); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize default value of type " + + defaultValue.getClass().getSimpleName() + ".", ioe); + } + + outView.close(); + + out.writeInt(baos.size()); + out.write(baos.toByteArray()); + } + + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + boolean hasDefaultValue = in.readBoolean(); + + if (hasDefaultValue) { + int size = in.readInt(); + byte[] buffer = new byte[size]; + int bytesRead = in.read(buffer); + + if (bytesRead != size) { + throw new RuntimeException("Read size does not match expected size."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = + new DataInputViewStreamWrapper(new DataInputStream(bais)); + defaultValue = serializer.deserialize(inView); + } else { + defaultValue = null; + } + } + + @Override + public ValueState<T> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createValueState(this); + } + + /** + * Returns the default value. + */ + public T getDefaultValue() { + if (defaultValue != null) { + return serializer.copy(defaultValue); + } else { + return null; + } + } + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + */ + public TypeSerializer<T> getSerializer() { + return serializer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ValueStateDescriptor{" + + "name=" + name + + ", defaultValue=" + defaultValue + + ", serializer=" + serializer + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index 68842d6..a9d5cd6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -33,7 +33,7 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{ @Override public int hashCode() { - return TypeSerializerSingleton.class.hashCode(); + return this.getClass().hashCode(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index baa4af8..b86830d 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.examples.windowing; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -108,7 +108,7 @@ public class SessionWindowing { @Override public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { - OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); Long lastSeen = lastSeenState.value(); Long timeSinceLastEvent = timestamp - lastSeen; @@ -127,7 +127,7 @@ public class SessionWindowing { @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { - OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + ValueState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); Long lastSeen = lastSeenState.value(); if (time - lastSeen >= sessionTimeout) { http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 49dfc21..942a3c9 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -25,11 +25,14 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.api.common.typeutils.base.IntSerializer; + +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.hadoop.conf.Configuration; @@ -55,7 +58,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class FileStateBackendTest { +public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { private static File TEMP_DIR; @@ -99,6 +102,20 @@ public class FileStateBackendTest { catch (Exception ignored) {} } + private URI stateBaseURI; + + @Override + protected FsStateBackend getStateBackend() throws Exception { + stateBaseURI = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString()); + return new FsStateBackend(stateBaseURI); + + } + + @Override + protected void cleanup() throws Exception { + FileSystem.get(stateBaseURI).delete(new Path(stateBaseURI), true); + } + // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ @@ -128,7 +145,7 @@ public class FileStateBackendTest { // supreme! } - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "dummy", IntSerializer.INSTANCE); assertNotNull(backend.getCheckpointDirectory()); Path checkpointDir = backend.getCheckpointDirectory(); @@ -149,9 +166,8 @@ public class FileStateBackendTest { @Test public void testSerializableState() { try { - FsStateBackend backend = CommonTestUtils.createCopySerializable( - new FsStateBackend(randomHdfsFileUri(), 40)); - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri(), 40)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "dummy", IntSerializer.INSTANCE); Path checkpointDir = backend.getCheckpointDirectory(); @@ -183,9 +199,8 @@ public class FileStateBackendTest { @Test public void testStateOutputStream() { try { - FsStateBackend backend = CommonTestUtils.createCopySerializable( - new FsStateBackend(randomHdfsFileUri(), 15)); - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri(), 15)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "dummy", IntSerializer.INSTANCE); Path checkpointDir = backend.getCheckpointDirectory(); @@ -219,14 +234,14 @@ public class FileStateBackendTest { // use with try-with-resources StreamStateHandle handle4; - try (StateBackend.CheckpointStateOutputStream stream4 = + try (AbstractStateBackend.CheckpointStateOutputStream stream4 = backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { stream4.write(state4); handle4 = stream4.closeAndGetHandle(); } // close before accessing handle - StateBackend.CheckpointStateOutputStream stream5 = + AbstractStateBackend.CheckpointStateOutputStream stream5 = backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); stream5.write(state4); stream5.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java deleted file mode 100644 index 23703b3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - -/** - * Base class for key/value state implementations that are backed by a regular heap hash map. The - * concrete implementations define how the state is checkpointed. - * - * @param <K> The type of the key. - * @param <V> The type of the value. - * @param <Backend> The type of the backend that snapshots this key/value state. - */ -public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> { - - /** Map containing the actual key/value pairs */ - private final HashMap<K, V> state; - - /** The serializer for the keys */ - private final TypeSerializer<K> keySerializer; - - /** The serializer for the values */ - private final TypeSerializer<V> valueSerializer; - - /** The value that is returned when no other value has been associated with a key, yet */ - private final V defaultValue; - - /** The current key, which the next value methods will refer to */ - private K currentKey; - - /** - * Creates a new empty key/value state. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - */ - protected AbstractHeapKvState(TypeSerializer<K> keySerializer, - TypeSerializer<V> valueSerializer, - V defaultValue) { - this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>()); - } - - /** - * Creates a new key/value state for the given hash map of key/value pairs. - * - * @param keySerializer The serializer for the keys. - * @param valueSerializer The serializer for the values. - * @param defaultValue The value that is returned when no other value has been associated with a key, yet. - * @param state The state map to use in this kev/value state. May contain initial state. - */ - protected AbstractHeapKvState(TypeSerializer<K> keySerializer, - TypeSerializer<V> valueSerializer, - V defaultValue, - HashMap<K, V> state) { - this.state = requireNonNull(state); - this.keySerializer = requireNonNull(keySerializer); - this.valueSerializer = requireNonNull(valueSerializer); - this.defaultValue = defaultValue; - } - - // ------------------------------------------------------------------------ - - @Override - public V value() { - V value = state.get(currentKey); - return value != null ? value : - (defaultValue == null ? null : valueSerializer.copy(defaultValue)); - } - - @Override - public void update(V value) { - if (value != null) { - state.put(currentKey, value); - } - else { - state.remove(currentKey); - } - } - - @Override - public void setCurrentKey(K currentKey) { - this.currentKey = currentKey; - } - - @Override - public int size() { - return state.size(); - } - - @Override - public void dispose() { - state.clear(); - } - - /** - * Gets the serializer for the keys. - * @return The serializer for the keys. - */ - public TypeSerializer<K> getKeySerializer() { - return keySerializer; - } - - /** - * Gets the serializer for the values. - * @return The serializer for the values. - */ - public TypeSerializer<V> getValueSerializer() { - return valueSerializer; - } - - // ------------------------------------------------------------------------ - // checkpointing utilities - // ------------------------------------------------------------------------ - - protected void writeStateToOutputView(final DataOutputView out) throws IOException { - for (Map.Entry<K, V> entry : state.entrySet()) { - keySerializer.serialize(entry.getKey(), out); - valueSerializer.serialize(entry.getValue(), out); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java new file mode 100644 index 0000000..206be64 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for partitioned {@link ListState} implementations that are backed by a regular + * heap hash map. The concrete implementations define how the state is checkpointed. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <SV> The type of the values in the state. + * @param <S> The type of State + * @param <SD> The type of StateDescriptor for the State S + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> + implements KvState<K, N, S, SD, Backend>, State { + + /** Map containing the actual key/value pairs */ + protected final HashMap<N, Map<K, SV>> state; + + /** Serializer for the state value. The state value could be a List<V>, for example. */ + protected final TypeSerializer<SV> stateSerializer; + + /** The serializer for the keys */ + protected final TypeSerializer<K> keySerializer; + + /** The serializer for the namespace */ + protected final TypeSerializer<N> namespaceSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final SD stateDesc; + + /** The current key, which the next value methods will refer to */ + protected K currentKey; + + /** The current namespace, which the access methods will refer to. */ + protected N currentNamespace = null; + + /** Cache the state map for the current key. */ + protected Map<K, SV> currentNSState; + + /** + * Creates a new empty key/value state. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + */ + protected AbstractHeapState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<SV> stateSerializer, + SD stateDesc) { + this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap<N, Map<K, SV>>()); + } + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param keySerializer The serializer for the keys. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param state The state map to use in this kev/value state. May contain initial state. + */ + protected AbstractHeapState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<SV> stateSerializer, + SD stateDesc, + HashMap<N, Map<K, SV>> state) { + this.state = requireNonNull(state); + this.keySerializer = requireNonNull(keySerializer); + this.namespaceSerializer = requireNonNull(namespaceSerializer); + this.stateSerializer = stateSerializer; + this.stateDesc = stateDesc; + } + + // ------------------------------------------------------------------------ + + @Override + public final void clear() { + if (currentNSState != null) { + currentNSState.remove(currentKey); + if (currentNSState.isEmpty()) { + state.remove(currentNamespace); + currentNSState = null; + } + } + } + + @Override + public final void setCurrentKey(K currentKey) { + this.currentKey = currentKey; + } + + @Override + public final void setCurrentNamespace(N namespace) { + if (namespace != null && namespace.equals(this.currentNamespace)) { + return; + } + this.currentNamespace = namespace; + this.currentNSState = state.get(currentNamespace); + } + + /** + * Returns the number of all state pairs in this state, across namespaces. + */ + protected final int size() { + int size = 0; + for (Map<K, SV> namespace: state.values()) { + size += namespace.size(); + } + return size; + } + + @Override + public void dispose() { + state.clear(); + } + + /** + * Gets the serializer for the keys. + * + * @return The serializer for the keys. + */ + public final TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + /** + * Gets the serializer for the namespace. + * + * @return The serializer for the namespace. + */ + public final TypeSerializer<N> getNamespaceSerializer() { + return namespaceSerializer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java new file mode 100644 index 0000000..958b4dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateBackend; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * A state backend defines how state is stored and snapshotted during checkpoints. + */ +public abstract class AbstractStateBackend implements java.io.Serializable { + + private static final long serialVersionUID = 4620413814639220247L; + + protected transient TypeSerializer<?> keySerializer; + + protected transient ClassLoader userCodeClassLoader; + + protected transient Object currentKey; + + /** For efficient access in setCurrentKey() */ + private transient KvState<?, ?, ?, ?, ?>[] keyValueStates; + + /** So that we can give out state when the user uses the same key. */ + private transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName; + + /** For caching the last accessed partitioned state */ + private transient String lastName; + + @SuppressWarnings("rawtypes") + private transient KvState lastState; + + // ------------------------------------------------------------------------ + // initialization and cleanup + // ------------------------------------------------------------------------ + + /** + * This method is called by the task upon deployment to initialize the state backend for + * data for a specific job. + * + * @param env The {@link Environment} of the task that instantiated the state backend + * @param operatorIdentifier Unique identifier for naming states created by this backend + * @throws Exception Overwritten versions of this method may throw exceptions, in which + * case the job that uses the state backend is considered failed during + * deployment. + */ + public void initializeForJob(Environment env, + String operatorIdentifier, + TypeSerializer<?> keySerializer) throws Exception { + this.userCodeClassLoader = env.getUserClassLoader(); + this.keySerializer = keySerializer; + } + + /** + * Disposes all state associated with the current job. + * + * @throws Exception Exceptions may occur during disposal of the state and should be forwarded. + */ + public abstract void disposeAllStateForCurrentJob() throws Exception; + + /** + * Closes the state backend, releasing all internal resources, but does not delete any persistent + * checkpoint data. + * + * @throws Exception Exceptions can be forwarded and will be logged by the system + */ + public abstract void close() throws Exception; + + public void dispose() { + if (keyValueStates != null) { + for (KvState<?, ?, ?, ?, ?> state : keyValueStates) { + state.dispose(); + } + } + } + + // ------------------------------------------------------------------------ + // key/value state + // ------------------------------------------------------------------------ + + /** + * Creates and returns a new {@link ValueState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the value that the {@code ValueState} can store. + */ + abstract protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ListState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the values that the {@code ListState} can store. + */ + abstract protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ReducingState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the values that the {@code ListState} can store. + */ + abstract protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception; + + /** + * Sets the current key that is used for partitioned state. + * @param currentKey The current key. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setCurrentKey(Object currentKey) { + this.currentKey = currentKey; + if (keyValueStates != null) { + for (KvState kv : keyValueStates) { + kv.setCurrentKey(currentKey); + } + } + } + + public Object getCurrentKey() { + return currentKey; + } + + /** + * Creates or retrieves a partitioned state backed by this state backend. + * + * @param stateDescriptor The state identifier for the state. This contains name + * and can create a default state value. + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <S> The type of the state. + * + * @return A new key/value state backed by this backend. + * + * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> stateDescriptor) throws Exception { + + if (keySerializer == null) { + throw new Exception("State key serializer has not been configured in the config. " + + "This operation cannot use partitioned state."); + } + + if (keyValueStatesByName == null) { + keyValueStatesByName = new HashMap<>(); + } + + if (lastName != null && lastName.equals(stateDescriptor.getName())) { + lastState.setCurrentNamespace(namespace); + return (S) lastState; + } + + KvState<?, ?, ?, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName()); + if (previous != null) { + lastState = previous; + lastState.setCurrentNamespace(namespace); + lastName = stateDescriptor.getName(); + return (S) previous; + } + + // create a new blank key/value state + S kvstate = stateDescriptor.bind(new StateBackend() { + @Override + public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception { + return AbstractStateBackend.this.createValueState(namespaceSerializer, stateDesc); + } + + @Override + public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception { + return AbstractStateBackend.this.createListState(namespaceSerializer, stateDesc); + } + + @Override + public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception { + return AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc); + } + }); + + keyValueStatesByName.put(stateDescriptor.getName(), (KvState) kvstate); + keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]); + + lastName = stateDescriptor.getName(); + lastState = (KvState<?, ?, ?, ?, ?>) kvstate; + + ((KvState) kvstate).setCurrentKey(currentKey); + ((KvState) kvstate).setCurrentNamespace(namespace); + + return kvstate; + } + + public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception { + if (keyValueStates != null) { + HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size()); + + for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) { + KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp); + snapshots.put(entry.getKey(), snapshot); + } + return snapshots; + } + + return null; + } + + public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { + // We check whether the KvStates require notifications + if (keyValueStates != null) { + for (KvState<?, ?, ?, ?, ?> kvstate : keyValueStates) { + if (kvstate instanceof CheckpointListener) { + ((CheckpointListener) kvstate).notifyCheckpointComplete(checkpointId); + } + } + } + } + + /** + * Injects K/V state snapshots for lazy restore. + * @param keyValueStateSnapshots The Map of snapshots + */ + @SuppressWarnings("unchecked,rawtypes") + public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception { + if (keyValueStateSnapshots != null) { + if (keyValueStatesByName == null) { + keyValueStatesByName = new HashMap<>(); + } + + for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) { + KvState kvState = state.getValue().restoreState(this, + keySerializer, + userCodeClassLoader, + recoveryTimestamp); + keyValueStatesByName.put(state.getKey(), kvState); + } + keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]); + } + } + + // ------------------------------------------------------------------------ + // storing state for a checkpoint + // ------------------------------------------------------------------------ + + /** + * Creates an output stream that writes into the state of the given checkpoint. When the stream + * is closes, it returns a state handle that can retrieve the state back. + * + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @return An output stream that writes state for the given checkpoint. + * + * @throws Exception Exceptions may occur while creating the stream and should be forwarded. + */ + public abstract CheckpointStateOutputStream createCheckpointStateOutputStream( + long checkpointID, long timestamp) throws Exception; + + /** + * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint. + * When the stream is closes, it returns a state handle that can retrieve the state back. + * + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @return An DataOutputView stream that writes state for the given checkpoint. + * + * @throws Exception Exceptions may occur while creating the stream and should be forwarded. + */ + public CheckpointStateOutputView createCheckpointStateOutputView( + long checkpointID, long timestamp) throws Exception { + return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp)); + } + + /** + * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back. + * + * @param state The state to be checkpointed. + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param <S> The type of the state. + * + * @return A state handle that can retrieve the checkpoined state. + * + * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded. + */ + public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable( + S state, long checkpointID, long timestamp) throws Exception; + + + // ------------------------------------------------------------------------ + // Checkpoint state output stream + // ------------------------------------------------------------------------ + + /** + * A dedicated output stream that produces a {@link StreamStateHandle} when closed. + */ + public static abstract class CheckpointStateOutputStream extends OutputStream { + + /** + * Closes the stream and gets a state handle that can create an input stream + * producing the data written to this stream. + * + * @return A state handle that can create an input stream producing the data written to this stream. + * @throws IOException Thrown, if the stream cannot be closed. + */ + public abstract StreamStateHandle closeAndGetHandle() throws IOException; + } + + /** + * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed. + */ + public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper { + + private final CheckpointStateOutputStream out; + + public CheckpointStateOutputView(CheckpointStateOutputStream out) { + super(out); + this.out = out; + } + + /** + * Closes the stream and gets a state handle that can create a DataInputView. + * producing the data written to this stream. + * + * @return A state handle that can create an input stream producing the data written to this stream. + * @throws IOException Thrown, if the stream cannot be closed. + */ + public StateHandle<DataInputView> closeAndGetHandle() throws IOException { + return new DataInputViewHandle(out.closeAndGetHandle()); + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + /** + * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle. + */ + private static final class DataInputViewHandle implements StateHandle<DataInputView> { + + private static final long serialVersionUID = 2891559813513532079L; + + private final StreamStateHandle stream; + + private DataInputViewHandle(StreamStateHandle stream) { + this.stream = stream; + } + + @Override + public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception { + return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); + } + + @Override + public void discardState() throws Exception { + stream.discardState(); + } + + @Override + public long getStateSize() throws Exception { + return stream.getStateSize(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java new file mode 100644 index 0000000..3bad8b0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.ArrayList; + +@SuppressWarnings("ForLoopReplaceableByForEach") +final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { + + private static final long serialVersionUID = 1119562170939152304L; + + private final TypeSerializer<T> elementSerializer; + + public ArrayListSerializer(TypeSerializer<T> elementSerializer) { + this.elementSerializer = elementSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<ArrayList<T>> duplicate() { + TypeSerializer<T> duplicateElement = elementSerializer.duplicate(); + return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement); + } + + @Override + public ArrayList<T> createInstance() { + return new ArrayList<>(); + } + + @Override + public ArrayList<T> copy(ArrayList<T> from) { + ArrayList<T> newList = new ArrayList<>(from.size()); + for (int i = 0; i < from.size(); i++) { + newList.add(elementSerializer.copy(from.get(i))); + } + return newList; + } + + @Override + public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(ArrayList<T> list, DataOutputView target) throws IOException { + final int size = list.size(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elementSerializer.serialize(list.get(i), target); + } + } + + @Override + public ArrayList<T> deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + final ArrayList<T> list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add(elementSerializer.deserialize(source)); + } + return list; + } + + @Override + public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + // copy number of elements + final int num = source.readInt(); + target.writeInt(num); + for (int i = 0; i < num; i++) { + elementSerializer.copy(source, target); + } + } + + // -------------------------------------------------------------------- + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return elementSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java new file mode 100644 index 0000000..1f18805 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +/** + * This interface must be implemented by functions/operations that want to receive + * a commit notification once a checkpoint has been completely acknowledged by all + * participants. + */ +public interface CheckpointListener { + + /** + * This method is called as a notification once a distributed checkpoint has been completed. + * + * Note that any exception during this method will not cause the checkpoint to + * fail any more. + * + * @param checkpointId The ID of the checkpoint that has been completed. + * @throws Exception + */ + void notifyCheckpointComplete(long checkpointId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java new file mode 100644 index 0000000..c20962f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values stored in this {@code ListState}. + * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}. + * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}. + */ +public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W extends ValueState<ArrayList<T>> & KvState<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend>> + implements ListState<T>, KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> { + + private final W wrappedState; + + @SuppressWarnings("unchecked") + public GenericListState(ValueState<ArrayList<T>> wrappedState) { + if (!(wrappedState instanceof KvState)) { + throw new IllegalArgumentException("Wrapped state must be a KvState."); + } + this.wrappedState = (W) wrappedState; + } + + @Override + public void setCurrentKey(K key) { + wrappedState.setCurrentKey(key); + } + + @Override + public void setCurrentNamespace(N namespace) { + wrappedState.setCurrentNamespace(namespace); + } + + @Override + public KvStateSnapshot<K, N, ListState<T>, ListStateDescriptor<T>, Backend> snapshot( + long checkpointId, + long timestamp) throws Exception { + KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot = wrappedState.snapshot( + checkpointId, + timestamp); + return new Snapshot<>(wrappedSnapshot); + } + + @Override + public void dispose() { + wrappedState.dispose(); + } + + @Override + public Iterable<T> get() throws Exception { + ArrayList<T> result = wrappedState.value(); + if (result == null) { + return Collections.emptyList(); + } + return result; + } + + @Override + public void add(T value) throws Exception { + ArrayList<T> currentValue = wrappedState.value(); + if (currentValue == null) { + currentValue = new ArrayList<>(); + currentValue.add(value); + wrappedState.update(currentValue); + } else { + currentValue.add(value); + wrappedState.update(currentValue); + } + } + + @Override + public void clear() { + wrappedState.clear(); + } + + private static class Snapshot<K, N, T, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, ListState<T>, ListStateDescriptor<T>, Backend> { + private static final long serialVersionUID = 1L; + + private final KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot; + + public Snapshot(KvStateSnapshot<K, N, ValueState<ArrayList<T>>, ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot) { + this.wrappedSnapshot = wrappedSnapshot; + } + + @Override + @SuppressWarnings("unchecked") + public KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> restoreState( + Backend stateBackend, + TypeSerializer<K> keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { + return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp)); + } + + @Override + public void discardState() throws Exception { + wrappedSnapshot.discardState(); + } + + @Override + public long getStateSize() throws Exception { + return wrappedSnapshot.getStateSize(); + } + } +}
