[FLINK-2991] Add Folding State and use in WindowOperator This enables efficient incremental aggregation of fold window.
This also adds: - WindowedStream.apply(initVal, foldFunction, windowFunction) - AllWindowedStream.apply(initVal, foldFunction, windowFunction) This closes #1605 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94cba899 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94cba899 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94cba899 Branch: refs/heads/master Commit: 94cba8998c726092e2cc80fd022ca40bf0c38ec2 Parents: d93b154 Author: Aljoscha Krettek <[email protected]> Authored: Mon Feb 8 14:56:19 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 12 18:51:01 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBFoldingState.java | 187 ++++++++++++++++++ .../streaming/state/RocksDBListState.java | 9 +- .../streaming/state/RocksDBReducingState.java | 14 +- .../streaming/state/RocksDBStateBackend.java | 20 +- .../streaming/state/RocksDBValueState.java | 14 +- .../contrib/streaming/state/DbStateBackend.java | 18 ++ .../flink/api/common/state/FoldingState.java | 37 ++++ .../common/state/FoldingStateDescriptor.java | 108 ++++++++++ .../flink/api/common/state/StateBackend.java | 8 + .../flink/api/common/state/StateDescriptor.java | 2 +- .../runtime/state/AbstractStateBackend.java | 20 ++ .../runtime/state/GenericFoldingState.java | 133 +++++++++++++ .../flink/runtime/state/GenericListState.java | 6 + .../runtime/state/GenericReducingState.java | 7 + .../state/filesystem/FsFoldingState.java | 145 ++++++++++++++ .../state/filesystem/FsStateBackend.java | 7 + .../runtime/state/memory/MemFoldingState.java | 118 +++++++++++ .../state/memory/MemoryStateBackend.java | 7 + .../runtime/state/StateBackendTestBase.java | 105 ++++++++++ .../api/datastream/AllWindowedStream.java | 103 ++++++++-- .../api/datastream/WindowedStream.java | 198 +++++++++++++------ .../windowing/FoldAllWindowFunction.java | 92 --------- .../windowing/FoldApplyAllWindowFunction.java | 95 +++++++++ .../windowing/FoldApplyWindowFunction.java | 95 +++++++++ .../functions/windowing/FoldWindowFunction.java | 91 --------- .../windowing/PassThroughAllWindowFunction.java | 30 +++ .../windowing/PassThroughWindowFunction.java | 30 +++ .../windowing/ReduceAllWindowFunction.java | 30 --- .../windowing/ReduceWindowFunction.java | 30 --- .../ReduceWindowFunctionWithWindow.java | 31 --- .../operators/FoldApplyWindowFunctionTest.java | 143 ++++++++++++++ .../api/operators/FoldWindowFunctionTest.java | 132 ------------- .../operators/windowing/WindowOperatorTest.java | 10 +- .../runtime/state/StateBackendITCase.java | 8 + .../streaming/api/scala/AllWindowedStream.scala | 60 ++++++ .../streaming/api/scala/WindowedStream.scala | 60 ++++++ 36 files changed, 1707 insertions(+), 496 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java new file mode 100644 index 0000000..7e4e573 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.rocksdb.RocksDBException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ReducingState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend> + extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> + implements FoldingState<T, ACC> { + + /** Serializer for the values */ + private final TypeSerializer<ACC> valueSerializer; + + /** This holds the name of the state and can create an initial default value for the state. */ + protected final FoldingStateDescriptor<T, ACC> stateDesc; + + /** User-specified fold function */ + private final FoldFunction<T, ACC> foldFunction; + + /** + * Creates a new {@code RocksDBFoldingState}. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. + */ + protected RocksDBFoldingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath); + this.stateDesc = requireNonNull(stateDesc); + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + /** + * Creates a {@code RocksDBFoldingState} by restoring from a directory. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. + * @param restorePath The path on the local file system that we are restoring from. + */ + protected RocksDBFoldingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath, + String restorePath) { + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + this.stateDesc = stateDesc; + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + if (valueBytes == null) { + return stateDesc.getDefaultValue(); + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); + } catch (IOException|RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void add(T value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); + try { + writeKeyAndNamespace(out); + byte[] key = baos.toByteArray(); + byte[] valueBytes = db.get(key); + + if (valueBytes == null) { + baos.reset(); + valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); + db.put(key, baos.toByteArray()); + } else { + ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); + ACC newValue = foldFunction.fold(oldValue, value); + baos.reset(); + valueSerializer.serialize(newValue, out); + db.put(key, baos.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + + @Override + protected KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBSnapshot( + URI backupUri, + long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); + } + + private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> { + private static final long serialVersionUID = 1L; + + public Snapshot(File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) { + super(dbPath, + checkpointPath, + backupUri, + checkpointId, + keySerializer, + namespaceSerializer, + stateDesc); + } + + @Override + protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath, + String restorePath) throws Exception { + return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index da07f75..6c55566 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -62,8 +62,9 @@ public class RocksDBListState<K, N, V> * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. + * and can create a default state value. * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. */ protected RocksDBListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, @@ -78,13 +79,15 @@ public class RocksDBListState<K, N, V> } /** - * Creates a new {@code RocksDBListState}. + * Creates a {@code RocksDBListState} by restoring from a directory. * * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. + * and can create a default state value. * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. + * @param restorePath The path on the local file system that we are restoring from. */ protected RocksDBListState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index 81f9ffb..b7ba3c7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -63,8 +63,9 @@ public class RocksDBReducingState<K, N, V> * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. + * and can create a default state value. * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. */ protected RocksDBReducingState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, @@ -79,6 +80,17 @@ public class RocksDBReducingState<K, N, V> this.reduceFunction = stateDesc.getReduceFunction(); } + /** + * Creates a {@code RocksDBReducingState} by restoring from a directory. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. + * @param restorePath The path on the local file system that we are restoring from. + */ protected RocksDBReducingState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<V> stateDesc, http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 5b16e86..b323c5e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Random; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -242,7 +244,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { @Override protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<T> stateDesc) throws Exception { + ValueStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); @@ -252,7 +255,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { @Override protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<T> stateDesc) throws Exception { + ListStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); @@ -262,7 +266,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { @Override protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<T> stateDesc) throws Exception { + ReducingStateDescriptor<T> stateDesc) throws Exception { + File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); @@ -271,6 +276,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override + protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + + File dbPath = getDbPath(stateDesc.getName()); + String checkpointPath = getCheckpointPath(stateDesc.getName()); + return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + } + + @Override public CheckpointStateOutputStream createCheckpointStateOutputStream( long checkpointID, long timestamp) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 388f099..7a19153 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -54,13 +54,14 @@ public class RocksDBValueState<K, N, V> protected final ValueStateDescriptor<V> stateDesc; /** - * Creates a new {@code RocksDBReducingState}. + * Creates a new {@code RocksDBValueState}. * * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. */ protected RocksDBValueState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, @@ -74,6 +75,17 @@ public class RocksDBValueState<K, N, V> this.valueSerializer = stateDesc.getSerializer(); } + /** + * Creates a {@code RocksDBValueState} by restoring from a directory. + * + * @param keySerializer The serializer for the keys. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param dbPath The path on the local system where RocksDB data should be stored. + * @param backupPath The path where to store backups. + * @param restorePath The path on the local file system that we are restoring from. + */ protected RocksDBValueState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc, http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index 5162983..d82bfb2 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -17,6 +17,8 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -27,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.GenericFoldingState; import org.apache.flink.runtime.state.GenericListState; import org.apache.flink.runtime.state.GenericReducingState; import org.apache.flink.runtime.state.StateHandle; @@ -241,6 +244,21 @@ public class DbStateBackend extends AbstractStateBackend { } @Override + protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + + if (!stateDesc.isSerializerInitialized()) { + throw new IllegalArgumentException("state descriptor serializer not initialized"); + } + + ValueStateDescriptor<ACC> valueStateDescriptor = new ValueStateDescriptor<>( + stateDesc.getName(), stateDesc.getSerializer(), stateDesc.getDefaultValue()); + + ValueState<ACC> valueState = createValueState(namespaceSerializer, valueStateDescriptor); + return new GenericFoldingState<>(valueState, stateDesc.getFoldFunction()); + } + + @Override public void initializeForJob(final Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java new file mode 100644 index 0000000..d328c2e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +/** + * {@link State} interface for folding state. Elements can be added to the state, they will + * be successively added to the initial value using a + * {@link org.apache.flink.api.common.functions.FoldFunction}. The current state can be inspected. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <T> Type of the values folded into the state + * @param <ACC> Type of the value in the state + */ +public interface FoldingState<T, ACC> extends MergingState<T, ACC> {} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java new file mode 100644 index 0000000..52ad712 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import static java.util.Objects.requireNonNull; + +/** + * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned + * folding state. + * + * @param <T> Type of the values folded int othe state + * @param <ACC> Type of the value in the state + */ +public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> { + private static final long serialVersionUID = 1L; + + + private final FoldFunction<T, ACC> foldFunction; + + /** + * Creates a new {@code FoldingStateDescriptor} with the given name, type, and initial value. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #FoldingStateDescriptor(String, ACC, FoldFunction, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeClass The type of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, Class<ACC> typeClass) { + super(name, typeClass, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + /** + * Creates a new {@code FoldingStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeInfo The type of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeInformation<ACC> typeInfo) { + super(name, typeInfo, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeSerializer The type serializer of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC> foldFunction, TypeSerializer<ACC> typeSerializer) { + super(name, typeSerializer, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + // ------------------------------------------------------------------------ + + @Override + public FoldingState<T, ACC> bind(StateBackend stateBackend) throws Exception { + return stateBackend.createFoldingState(this); + } + + /** + * Returns the fold function to be used for the folding state. + */ + public FoldFunction<T, ACC> getFoldFunction() { + return foldFunction; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java index 8c7c608..a61433a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java @@ -48,4 +48,12 @@ public interface StateBackend { */ <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception; + /** + * Creates and returns a new {@link FoldingState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <T> Type of the values folded into the state + * @param <ACC> Type of the value in the state + */ + <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 38087fc..4cf2371 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -229,7 +229,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl @Override public String toString() { return getClass().getSimpleName() + - "{ name=" + name + + "{name=" + name + ", defaultValue=" + defaultValue + ", serializer=" + serializer + '}'; http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index d0c4f82..beccd86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -148,6 +150,18 @@ public abstract class AbstractStateBackend implements java.io.Serializable { protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception; /** + * Creates and returns a new {@link FoldingState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> Type of the values folded into the state + * @param <ACC> Type of the value in the state * + */ + abstract protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; + + /** * Sets the current key that is used for partitioned state. * @param currentKey The current key. */ @@ -223,6 +237,12 @@ public abstract class AbstractStateBackend implements java.io.Serializable { public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception { return AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc); } + + @Override + public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return AbstractStateBackend.this.createFoldingState(namespaceSerializer, stateDesc); + } + }); keyValueStatesByName.put(stateDescriptor.getName(), (KvState) kvstate); http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java new file mode 100644 index 0000000..ef1d796 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Generic implementation of {@link FoldingState} based on a wrapped {@link ValueState}. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}. + * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}. + */ +public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend, W extends ValueState<ACC> & KvState<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend>> + implements FoldingState<T, ACC>, KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> { + + private final W wrappedState; + private final FoldFunction<T, ACC> foldFunction; + + /** + * Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The + * {@code ValueState} must have the initial value of the fold as default value. + * + * @param wrappedState The wrapped {@code ValueState} + * @param foldFunction The {@code FoldFunction} to use for folding values into the state + */ + @SuppressWarnings("unchecked") + public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) { + if (!(wrappedState instanceof KvState)) { + throw new IllegalArgumentException("Wrapped state must be a KvState."); + } + this.wrappedState = (W) wrappedState; + this.foldFunction = foldFunction; + } + + @Override + public void setCurrentKey(K key) { + wrappedState.setCurrentKey(key); + } + + @Override + public void setCurrentNamespace(N namespace) { + wrappedState.setCurrentNamespace(namespace); + } + + @Override + public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> snapshot( + long checkpointId, + long timestamp) throws Exception { + KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot = wrappedState.snapshot( + checkpointId, + timestamp); + return new Snapshot<>(wrappedSnapshot, foldFunction); + } + + @Override + public void dispose() { + wrappedState.dispose(); + } + + @Override + public ACC get() throws Exception { + return wrappedState.value(); + } + + @Override + public void add(T value) throws Exception { + ACC currentValue = wrappedState.value(); + wrappedState.update(foldFunction.fold(currentValue, value)); + } + + @Override + public void clear() { + wrappedState.clear(); + } + + private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> { + private static final long serialVersionUID = 1L; + + private final KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot; + + private final FoldFunction<T, ACC> foldFunction; + + public Snapshot(KvStateSnapshot<K, N, ValueState<ACC>, ValueStateDescriptor<ACC>, Backend> wrappedSnapshot, + FoldFunction<T, ACC> foldFunction) { + this.wrappedSnapshot = wrappedSnapshot; + this.foldFunction = foldFunction; + } + + @Override + @SuppressWarnings("unchecked") + public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState( + Backend stateBackend, + TypeSerializer<K> keySerializer, + ClassLoader classLoader, + long recoveryTimestamp) throws Exception { + return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), foldFunction); + } + + @Override + public void discardState() throws Exception { + wrappedSnapshot.discardState(); + } + + @Override + public long getStateSize() throws Exception { + return wrappedSnapshot.getStateSize(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java index c20962f..fbb0170 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java @@ -40,6 +40,12 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e private final W wrappedState; + /** + * Creates a new {@code ListState} that wraps the given {@link ValueState}. The + * {@code ValueState} must have a default value of {@code null}. + * + * @param wrappedState The wrapped {@code ValueState} + */ @SuppressWarnings("unchecked") public GenericListState(ValueState<ArrayList<T>> wrappedState) { if (!(wrappedState instanceof KvState)) { http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java index 1181c66..102e25e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java @@ -39,6 +39,13 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend, private final W wrappedState; private final ReduceFunction<T> reduceFunction; + /** + * Creates a new {@code ReducingState} that wraps the given {@link ValueState}. The + * {@code ValueState} must have a default value of {@code null}. + * + * @param wrappedState The wrapped {@code ValueState} + * @param reduceFunction The {@code ReduceFunction} to use for combining values. + */ @SuppressWarnings("unchecked") public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) { if (!(wrappedState instanceof KvState)) { http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java new file mode 100644 index 0000000..bba6df5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Heap-backed partitioned {@link FoldingState} that is + * snapshotted into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + */ +public class FsFoldingState<K, N, T, ACC> + extends AbstractFsState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> + implements FoldingState<T, ACC> { + + private final FoldFunction<T, ACC> foldFunction; + + /** + * Creates a new and empty partitioned state. + * + * @param backend The file system state backend backing snapshots of this state + * @param keySerializer The serializer for the key. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + */ + public FsFoldingState(FsStateBackend backend, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) { + super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); + this.foldFunction = stateDesc.getFoldFunction(); + } + + /** + * Creates a new key/value state with the given state contents. + * This method is used to re-create key/value state with existing data, for example from + * a snapshot. + * + * @param backend The file system state backend backing snapshots of this state + * @param keySerializer The serializer for the key. + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name +* and can create a default state value. + * @param state The map of key/value pairs to initialize the state with. + */ + public FsFoldingState(FsStateBackend backend, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + HashMap<N, Map<K, ACC>> state) { + super(backend, keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + if (currentNSState == null) { + currentNSState = state.get(currentNamespace); + } + if (currentNSState != null) { + ACC value = currentNSState.get(currentKey); + return value != null ? value : stateDesc.getDefaultValue(); + } + return stateDesc.getDefaultValue(); + } + + @Override + public void add(T value) throws IOException { + if (currentKey == null) { + throw new RuntimeException("No key available."); + } + + if (currentNSState == null) { + currentNSState = new HashMap<>(); + state.put(currentNamespace, currentNSState); + } + + ACC currentValue = currentNSState.get(currentKey); + try { + if (currentValue == null) { + currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value)); + } else { + currentNSState.put(currentKey, foldFunction.fold(currentValue, value)); + + } + } catch (Exception e) { + throw new RuntimeException("Could not add value to folding state.", e); + } + } + + @Override + public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createHeapSnapshot(Path filePath) { + return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); + } + + + public static class Snapshot<K, N, T, ACC> extends AbstractFsStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { + private static final long serialVersionUID = 1L; + + public Snapshot(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<ACC> stateSerializer, + FoldingStateDescriptor<T, ACC> stateDescs, + Path filePath) { + super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); + } + + @Override + public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, FsStateBackend> createFsState(FsStateBackend backend, HashMap<N, Map<K, ACC>> stateMap) { + return new FsFoldingState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 37c1392..77d540b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -287,6 +289,11 @@ public class FsStateBackend extends AbstractStateBackend { return new FsReducingState<>(this, keySerializer, namespaceSerializer, stateDesc); } + @Override + protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return new FsFoldingState<>(this, keySerializer, namespaceSerializer, stateDesc); + } @Override public <S extends Serializable> StateHandle<S> checkpointStateSerializable( http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java new file mode 100644 index 0000000..07b677b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.KvStateSnapshot; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Heap-backed partitioned {@link FoldingState} that is + * snapshotted into a serialized memory copy. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + */ +public class MemFoldingState<K, N, T, ACC> + extends AbstractMemState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> + implements FoldingState<T, ACC> { + + private final FoldFunction<T, ACC> foldFunction; + + public MemFoldingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) { + super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc); + this.foldFunction = stateDesc.getFoldFunction(); + } + + public MemFoldingState(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + HashMap<N, Map<K, ACC>> state) { + super(keySerializer, namespaceSerializer, stateDesc.getSerializer(), stateDesc, state); + this.foldFunction = stateDesc.getFoldFunction(); + } + + @Override + public ACC get() { + if (currentNSState == null) { + currentNSState = state.get(currentNamespace); + } + if (currentNSState != null) { + ACC value = currentNSState.get(currentKey); + return value != null ? value : stateDesc.getDefaultValue(); + } + return stateDesc.getDefaultValue(); + } + + @Override + public void add(T value) throws IOException { + if (currentKey == null) { + throw new RuntimeException("No key available."); + } + + if (currentNSState == null) { + currentNSState = new HashMap<>(); + state.put(currentNamespace, currentNSState); + } + + ACC currentValue = currentNSState.get(currentKey); + try { + if (currentValue == null) { + currentNSState.put(currentKey, foldFunction.fold(stateDesc.getDefaultValue(), value)); + } else { + currentNSState.put(currentKey, foldFunction.fold(currentValue, value)); + + } + } catch (Exception e) { + throw new RuntimeException("Could not add value to folding state.", e); + } + } + + @Override + public KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createHeapSnapshot(byte[] bytes) { + return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, bytes); + } + + public static class Snapshot<K, N, T, ACC> extends AbstractMemStateSnapshot<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { + private static final long serialVersionUID = 1L; + + public Snapshot(TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + TypeSerializer<ACC> stateSerializer, + FoldingStateDescriptor<T, ACC> stateDescs, byte[] data) { + super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, data); + } + + @Override + public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, MemoryStateBackend> createMemState(HashMap<N, Map<K, ACC>> stateMap) { + return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc, stateMap); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 2b7b5f1..7b9d21b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.state.memory; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -97,6 +99,11 @@ public class MemoryStateBackend extends AbstractStateBackend { return new MemReducingState<>(keySerializer, namespaceSerializer, stateDesc); } + @Override + public <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc); + } + /** * Serialized the given state into bytes using Java serialization and creates a state handle that * can re-create that state. http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 6083bd6..27dee6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -22,7 +22,10 @@ import com.google.common.base.Joiner; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -410,6 +413,108 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } } + @Test + @SuppressWarnings("unchecked,rawtypes") + public void testFoldingState() { + try { + backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); + + FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id", + "Fold-Initial:", + new FoldFunction<Integer, String>() { + private static final long serialVersionUID = 1L; + + @Override + public String fold(String acc, Integer value) throws Exception { + return acc + "," + value; + } + }, + String.class); + FoldingState<Integer, String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); + + @SuppressWarnings("unchecked") + KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> kv = + (KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B>) state; + + Joiner joiner = Joiner.on(","); + // some modifications to the state + kv.setCurrentKey(1); + assertEquals("Fold-Initial:", state.get()); + state.add(1); + kv.setCurrentKey(2); + assertEquals("Fold-Initial:", state.get()); + state.add(2); + kv.setCurrentKey(1); + assertEquals("Fold-Initial:,1", state.get()); + + // draw a snapshot + KvStateSnapshot<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> snapshot1 = + kv.snapshot(682375462378L, 2); + + // make some more modifications + kv.setCurrentKey(1); + state.clear(); + state.add(101); + kv.setCurrentKey(2); + state.add(102); + kv.setCurrentKey(3); + state.add(103); + + // draw another snapshot + KvStateSnapshot<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> snapshot2 = + kv.snapshot(682375462379L, 4); + + // validate the original state + kv.setCurrentKey(1); + assertEquals("Fold-Initial:,101", state.get()); + kv.setCurrentKey(2); + assertEquals("Fold-Initial:,2,102", state.get()); + kv.setCurrentKey(3); + assertEquals("Fold-Initial:,103", state.get()); + + kv.dispose(); + + // restore the first snapshot and validate it + KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> restored1 = snapshot1.restoreState( + backend, + IntSerializer.INSTANCE, + this.getClass().getClassLoader(), 10); + + snapshot1.discardState(); + + @SuppressWarnings("unchecked") + FoldingState<Integer, String> restored1State = (FoldingState<Integer, String>) restored1; + + restored1.setCurrentKey(1); + assertEquals("Fold-Initial:,1", restored1State.get()); + restored1.setCurrentKey(2); + assertEquals("Fold-Initial:,2", restored1State.get()); + + restored1.dispose(); + + // restore the second snapshot and validate it + KvState<Integer, Void, FoldingState<Integer, String>, FoldingStateDescriptor<Integer, String>, B> restored2 = snapshot2.restoreState( + backend, + IntSerializer.INSTANCE, + this.getClass().getClassLoader(), 20); + + snapshot2.discardState(); + + @SuppressWarnings("unchecked") + FoldingState<Integer, String> restored2State = (FoldingState<Integer, String>) restored2; + + restored2.setCurrentKey(1); + assertEquals("Fold-Initial:,101", restored2State.get()); + restored2.setCurrentKey(2); + assertEquals("Fold-Initial:,2,102", restored2State.get()); + restored2.setCurrentKey(3); + assertEquals("Fold-Initial:,103", restored2State.get()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } @Test public void testValueStateRestoreWithWrongSerializers() { http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 2902795..d8da998 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -33,8 +33,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; -import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -144,7 +145,7 @@ public class AllWindowedStream<T, W extends Window> { function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); - String udfName = "Reduce at " + callLocation; + String udfName = "AllWindowedStream." + callLocation; SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName); if (result != null) { @@ -185,13 +186,15 @@ public class AllWindowedStream<T, W extends Window> { * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) { - //clean the closure - function = input.getExecutionEnvironment().clean(function); + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + + "Please use apply(FoldFunction, WindowFunction) instead."); + } TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), - Utils.getCallLocationName(), true); + Utils.getCallLocationName(), true); - return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType); + return fold(initialValue, function, resultType); } /** @@ -203,9 +206,12 @@ public class AllWindowedStream<T, W extends Window> { * @return The data stream that is the result of applying the fold function to the window. */ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { - //clean the closure - function = input.getExecutionEnvironment().clean(function); - return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType); + if (function instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " + + "Please use apply(FoldFunction, WindowFunction) instead."); + } + + return apply(initialValue, function, new PassThroughAllWindowFunction<W, R>(), resultType); } /** @@ -244,7 +250,7 @@ public class AllWindowedStream<T, W extends Window> { function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); - String udfName = "WindowApply at " + callLocation; + String udfName = "AllWindowedStream." + callLocation; SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName); if (result != null) { @@ -321,7 +327,7 @@ public class AllWindowedStream<T, W extends Window> { preAggregator = input.getExecutionEnvironment().clean(preAggregator); String callLocation = Utils.getCallLocationName(); - String udfName = "WindowApply at " + callLocation; + String udfName = "AllWindowedStream." + callLocation; String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; @@ -348,6 +354,81 @@ public class AllWindowedStream<T, W extends Window> { return input.transform(opName, resultType, operator).setParallelism(1); } + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) { + TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + return apply(initialValue, foldFunction, function, resultType); + } + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * <p> + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold. + * @param foldFunction The fold function that is used for incremental aggregation. + * @param function The window function. + * @param resultType Type information for the result type of the window function + * @return The data stream that is the result of applying the window function to the window. + */ + public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) { + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); + } + + //clean the closures + function = input.getExecutionEnvironment().clean(function); + foldFunction = input.getExecutionEnvironment().clean(foldFunction); + + String callLocation = Utils.getCallLocationName(); + String udfName = "AllWindowedStream." + callLocation; + + String opName; + + OneInputStreamOperator<T, R> operator; + + boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + if (evictor != null) { + opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + evictor + ", " + udfName + ")"; + + operator = new EvictingNonKeyedWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + new HeapWindowBuffer.Factory<T>(), + new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function), + trigger, + evictor).enableSetProcessingTime(setProcessingTime); + + } else { + opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; + + operator = new NonKeyedWindowOperator<>(windowAssigner, + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + new HeapWindowBuffer.Factory<T>(), + new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function), + trigger).enableSetProcessingTime(setProcessingTime); + } + + return input.transform(opName, resultType, operator).setParallelism(1); + } + // ------------------------------------------------------------------------ // Aggregations on the windows // ------------------------------------------------------------------------
