This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04802953a1e77182ac8a0bfc9d1a5a269dc2c0ab Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 8 19:38:20 2025 +0800 [FLINK-37045][Runtime] Merge two versions of KeyedStateStore --- .../flink/api/common/state/KeyedStateStore.java | 115 +++++++++++++++++ .../apache/flink/cep/utils/TestSharedBuffer.java | 44 +++++++ .../AbstractAsyncStateStreamOperator.java | 8 +- .../AbstractAsyncStateStreamOperatorV2.java | 7 +- .../runtime/state/DefaultKeyedStateStore.java | 138 ++++++++++++++++++++- .../runtime/state/v2/DefaultKeyedStateStore.java | 114 ----------------- .../flink/runtime/state/v2/KeyedStateStore.java | 110 ---------------- .../api/operators/StreamOperatorStateHandler.java | 25 +--- .../api/operators/StreamingRuntimeContext.java | 59 ++------- .../api/operators/StreamingRuntimeContextTest.java | 19 +-- 10 files changed, 324 insertions(+), 315 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java index 8ce0d9c2c64..69c513c388a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java @@ -18,8 +18,11 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; +import javax.annotation.Nonnull; + /** This interface contains methods for registering keyed state with a managed store. */ @PublicEvolving public interface KeyedStateStore { @@ -223,4 +226,116 @@ public interface KeyedStateStore { */ @PublicEvolving <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); + + // -------------------------- + // State V2 creation methods + // -------------------------- + + /** + * Gets a handle to the system's {@link org.apache.flink.api.common.state.v2.ValueState}. The + * key/value state is only accessible if the function is executed on a KeyedStream. On each + * access, the state exposes the value for the key of the element currently processed by the + * function. Each function may have multiple partitioned states, addressed with different names. + * + * <p>Because the scope of each value is the key of the currently processed element, and the + * elements are distributed by the Flink runtime, the system can transparently scale out and + * redistribute the state and KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + */ + @Experimental + default <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + @Nonnull org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + return getValueState(stateProperties); + } + + /** + * Gets a handle to the system's {@link org.apache.flink.api.common.state.v2.ValueState}. The + * key/value state is only accessible if the function is executed on a KeyedStream. On each + * access, the state exposes the value for the key of the element currently processed by the + * function. Each function may have multiple partitioned states, addressed with different names. + * + * <p>Because the scope of each value is the key of the currently processed element, and the + * elements are distributed by the Flink runtime, the system can transparently scale out and + * redistribute the state and KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( + @Nonnull org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key / value list state. This state is optimized for state that + * holds lists. One can adds elements to the list, or retrieve the list as a whole. This state + * is only accessible if the function is executed on a KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + @Nonnull org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key/value map state. This state is only accessible if the + * function is executed on a KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the state. + * @param <UK> The type of the user keys stored in the state. + * @param <UV> The type of the user values stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + @Nonnull + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> + stateProperties); + + /** + * Gets a handle to the system's key/value reducing state. This state is optimized for state + * that aggregates values. + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + @Nonnull + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> + stateProperties); + + /** + * Gets a handle to the system's key/value aggregating state. This state is only accessible if + * the function is executed on a KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <IN> The type of the values that are added to the state. + * @param <ACC> The type of the accumulator (intermediate aggregation state). + * @param <OUT> The type of the values that are returned from the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + @Nonnull + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor< + IN, ACC, OUT> + stateProperties); } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java index 169df8eafa8..1676c3de5c4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java @@ -33,6 +33,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.configuration.SharedBufferCacheConfig; import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -243,6 +245,48 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> { }; } + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( + @Nonnull + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> + stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + @Nonnull + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> + stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + @Nonnull + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> + stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + @Nonnull + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> + stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + @Nonnull + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor< + IN, ACC, OUT> + stateProperties) { + throw new UnsupportedOperationException(); + } + private class CountingIterator<T> implements Iterator<T> { private final Iterator<T> iterator; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index ad9ce787cac..20b93542b37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -33,6 +34,7 @@ import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; @@ -91,7 +93,11 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); - getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); + KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null); + if (stateStore instanceof DefaultKeyedStateStore) { + ((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2(); + } + final StreamTask<?, ?> containingTask = checkNotNull(getContainingTask()); environment = containingTask.getEnvironment(); final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index ab3eff901fa..b9757d816bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -30,6 +31,7 @@ import org.apache.flink.runtime.asyncprocessing.RecordContext; import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.runtime.state.DefaultKeyedStateStore; import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; @@ -90,7 +92,10 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { super.initializeState(streamTaskStateManager); - getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null)); + KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null); + if (stateStore instanceof DefaultKeyedStateStore) { + ((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2(); + } final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit(); final int asyncBufferSize = getExecutionConfig().getAsyncStateBufferSize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java index 16c122e5ee2..f43e6b8ad0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java @@ -35,7 +35,11 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Default implementation of KeyedStateStore that currently forwards state registration to a {@link @@ -43,13 +47,38 @@ import static java.util.Objects.requireNonNull; */ public class DefaultKeyedStateStore implements KeyedStateStore { - protected final KeyedStateBackend<?> keyedStateBackend; + @Nullable protected final KeyedStateBackend<?> keyedStateBackend; + + @Nullable protected final AsyncKeyedStateBackend<?> asyncKeyedStateBackend; protected final SerializerFactory serializerFactory; + protected SupportKeyedStateApiSet supportKeyedStateApiSet; + public DefaultKeyedStateStore( KeyedStateBackend<?> keyedStateBackend, SerializerFactory serializerFactory) { - this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend); + this(keyedStateBackend, null, serializerFactory); + } + + public DefaultKeyedStateStore( + AsyncKeyedStateBackend<?> asyncKeyedStateBackend, SerializerFactory serializerFactory) { + this(null, asyncKeyedStateBackend, serializerFactory); + } + + public DefaultKeyedStateStore( + @Nullable KeyedStateBackend<?> keyedStateBackend, + @Nullable AsyncKeyedStateBackend<?> asyncKeyedStateBackend, + SerializerFactory serializerFactory) { + this.keyedStateBackend = keyedStateBackend; + this.asyncKeyedStateBackend = asyncKeyedStateBackend; this.serializerFactory = Preconditions.checkNotNull(serializerFactory); + if (keyedStateBackend != null) { + // By default, we support state v1 + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1; + } else if (asyncKeyedStateBackend != null) { + this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; + } else { + throw new IllegalArgumentException("The state backend must not be null."); + } } @Override @@ -112,7 +141,112 @@ public class DefaultKeyedStateStore implements KeyedStateStore { protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception { + checkState( + keyedStateBackend != null + && supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1, + "Current operator does not integrate the async processing logic, " + + "thus only supports state v1 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state'."); return keyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); } + + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( + @Nonnull org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(serializerFactory); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + @Nonnull org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(serializerFactory); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + @Nonnull + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> + stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(serializerFactory); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + @Nonnull + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> + stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(serializerFactory); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + @Nonnull + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor< + IN, ACC, OUT> + stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(serializerFactory); + return getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + protected <S extends org.apache.flink.api.common.state.v2.State, SV> S getPartitionedState( + org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDescriptor) + throws Exception { + checkState( + asyncKeyedStateBackend != null + && supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2, + "Current operator integrates the async processing logic, " + + "thus only supports state v2 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state.v2'."); + return asyncKeyedStateBackend.getOrCreateKeyedState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); + } + + public void setSupportKeyedStateApiSetV2() { + requireNonNull( + asyncKeyedStateBackend, + "Current operator integrates the logic of async processing, " + + "thus only support state v2 APIs. Please use StateDescriptor under " + + "'org.apache.flink.runtime.state.v2'."); + supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; + } + + /** + * Currently, we only support one keyed state api set. This is determined by the stream + * operator. + */ + private enum SupportKeyedStateApiSet { + STATE_V1, + STATE_V2 + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java deleted file mode 100644 index 52565b7fad6..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.functions.SerializerFactory; -import org.apache.flink.api.common.state.v2.AggregatingState; -import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.v2.ListState; -import org.apache.flink.api.common.state.v2.ListStateDescriptor; -import org.apache.flink.api.common.state.v2.MapState; -import org.apache.flink.api.common.state.v2.MapStateDescriptor; -import org.apache.flink.api.common.state.v2.ReducingState; -import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; -import org.apache.flink.api.common.state.v2.ValueState; -import org.apache.flink.api.common.state.v2.ValueStateDescriptor; -import org.apache.flink.runtime.state.AsyncKeyedStateBackend; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nonnull; - -/** Default implementation of KeyedStateStore. */ -public class DefaultKeyedStateStore implements KeyedStateStore { - - private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend; - protected final SerializerFactory serializerFactory; - - public DefaultKeyedStateStore( - @Nonnull AsyncKeyedStateBackend asyncKeyedStateBackend, - SerializerFactory serializerFactory) { - this.asyncKeyedStateBackend = Preconditions.checkNotNull(asyncKeyedStateBackend); - this.serializerFactory = Preconditions.checkNotNull(serializerFactory); - } - - @Override - public <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> stateProperties) { - Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); - try { - stateProperties.initializeSerializerUnlessSet(serializerFactory); - return asyncKeyedStateBackend.getOrCreateKeyedState( - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateProperties); - } catch (Exception e) { - throw new RuntimeException("Error while getting state", e); - } - } - - @Override - public <T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> stateProperties) { - Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); - try { - stateProperties.initializeSerializerUnlessSet(serializerFactory); - return asyncKeyedStateBackend.getOrCreateKeyedState( - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateProperties); - } catch (Exception e) { - throw new RuntimeException("Error while getting state", e); - } - } - - @Override - public <UK, UV> MapState<UK, UV> getMapState( - @Nonnull MapStateDescriptor<UK, UV> stateProperties) { - Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); - try { - stateProperties.initializeSerializerUnlessSet(serializerFactory); - return asyncKeyedStateBackend.getOrCreateKeyedState( - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateProperties); - } catch (Exception e) { - throw new RuntimeException("Error while getting state", e); - } - } - - @Override - public <T> ReducingState<T> getReducingState( - @Nonnull ReducingStateDescriptor<T> stateProperties) { - Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); - try { - stateProperties.initializeSerializerUnlessSet(serializerFactory); - return asyncKeyedStateBackend.getOrCreateKeyedState( - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateProperties); - } catch (Exception e) { - throw new RuntimeException("Error while getting state", e); - } - } - - @Override - public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState( - @Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { - Preconditions.checkNotNull(stateProperties, "The state properties must not be null"); - try { - stateProperties.initializeSerializerUnlessSet(serializerFactory); - return asyncKeyedStateBackend.getOrCreateKeyedState( - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateProperties); - } catch (Exception e) { - throw new RuntimeException("Error while getting state", e); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java deleted file mode 100644 index b47b3b51623..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.v2.AggregatingState; -import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.v2.ListState; -import org.apache.flink.api.common.state.v2.ListStateDescriptor; -import org.apache.flink.api.common.state.v2.MapState; -import org.apache.flink.api.common.state.v2.MapStateDescriptor; -import org.apache.flink.api.common.state.v2.ReducingState; -import org.apache.flink.api.common.state.v2.ReducingStateDescriptor; -import org.apache.flink.api.common.state.v2.State; -import org.apache.flink.api.common.state.v2.ValueState; -import org.apache.flink.api.common.state.v2.ValueStateDescriptor; - -import javax.annotation.Nonnull; - -/** This interface contains methods for registering {@link State}. */ -@Internal -public interface KeyedStateStore { - - /** - * Gets a handle to the system's {@link ValueState}. The key/value state is only accessible if - * the function is executed on a KeyedStream. On each access, the state exposes the value for - * the key of the element currently processed by the function. Each function may have multiple - * partitioned states, addressed with different names. - * - * <p>Because the scope of each value is the key of the currently processed element, and the - * elements are distributed by the Flink runtime, the system can transparently scale out and - * redistribute the state and KeyedStream. - * - * @param stateProperties The descriptor defining the properties of the state. - * @param <T> The type of value stored in the state. - * @return The partitioned state object. - */ - <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> stateProperties); - - /** - * Gets a handle to the system's key / value list state. This state is optimized for state that - * holds lists. One can adds elements to the list, or retrieve the list as a whole. This state - * is only accessible if the function is executed on a KeyedStream. - * - * @param stateProperties The descriptor defining the properties of the state. - * @param <T> The type of value stored in the state. - * @return The partitioned state object. - * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the - * function (function is not part os a KeyedStream). - */ - <T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> stateProperties); - - /** - * Gets a handle to the system's key/value map state. This state is only accessible if the - * function is executed on a KeyedStream. - * - * @param stateProperties The descriptor defining the properties of the state. - * @param <UK> The type of the user keys stored in the state. - * @param <UV> The type of the user values stored in the state. - * @return The partitioned state object. - * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the - * function (function is not part of a KeyedStream). - */ - <UK, UV> MapState<UK, UV> getMapState(@Nonnull MapStateDescriptor<UK, UV> stateProperties); - - /** - * Gets a handle to the system's key/value reducing state. This state is optimized for state - * that aggregates values. - * - * <p>This state is only accessible if the function is executed on a KeyedStream. - * - * @param stateProperties The descriptor defining the properties of the stats. - * @param <T> The type of value stored in the state. - * @return The partitioned state object. - * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the - * function (function is not part of a KeyedStream). - */ - <T> ReducingState<T> getReducingState(@Nonnull ReducingStateDescriptor<T> stateProperties); - - /** - * Gets a handle to the system's key/value aggregating state. This state is only accessible if - * the function is executed on a KeyedStream. - * - * @param stateProperties The descriptor defining the properties of the stats. - * @param <IN> The type of the values that are added to the state. - * @param <ACC> The type of the accumulator (intermediate aggregation state). - * @param <OUT> The type of the values that are returned from the state. - * @return The partitioned state object. - * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the - * function (function is not part of a KeyedStream). - */ - <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState( - @Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java index bf01d4df0c7..7ea79afa1ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java @@ -85,8 +85,6 @@ public class StreamOperatorStateHandler { @Nullable private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend; - @Nullable private final org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStoreV2; - /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ @Nullable private final CheckpointableKeyedStateBackend<?> keyedStateBackend; @@ -103,12 +101,14 @@ public class StreamOperatorStateHandler { this.keySerializer = context.keySerializer(); this.operatorStateBackend = context.operatorStateBackend(); this.keyedStateBackend = context.keyedStateBackend(); + this.asyncKeyedStateBackend = context.asyncKeyedStateBackend(); this.closeableRegistry = closeableRegistry; - if (keyedStateBackend != null) { + if (keyedStateBackend != null || asyncKeyedStateBackend != null) { keyedStateStore = new DefaultKeyedStateStore( keyedStateBackend, + asyncKeyedStateBackend, new SerializerFactory() { @Override public <T> TypeSerializer<T> createSerializer( @@ -120,21 +120,6 @@ public class StreamOperatorStateHandler { } else { keyedStateStore = null; } - - this.asyncKeyedStateBackend = context.asyncKeyedStateBackend(); - this.keyedStateStoreV2 = - asyncKeyedStateBackend != null - ? new org.apache.flink.runtime.state.v2.DefaultKeyedStateStore( - asyncKeyedStateBackend, - new SerializerFactory() { - @Override - public <T> TypeSerializer<T> createSerializer( - TypeInformation<T> typeInformation) { - return typeInformation.createSerializer( - executionConfig.getSerializerConfig()); - } - }) - : null; } public void initializeOperatorState(CheckpointedStreamOperator streamOperator) @@ -496,10 +481,6 @@ public class StreamOperatorStateHandler { return Optional.ofNullable(keyedStateStore); } - public Optional<org.apache.flink.runtime.state.v2.KeyedStateStore> getKeyedStateStoreV2() { - return Optional.ofNullable(keyedStateStoreV2); - } - /** Custom state handling hooks to be invoked by {@link StreamOperatorStateHandler}. */ public interface CheckpointedStreamOperator { void initializeState(StateInitializationContext context) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 1859ffc2f8a..d352b526490 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -55,7 +55,6 @@ import java.util.Map; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext}, for streaming @@ -71,8 +70,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final String operatorUniqueID; private final ProcessingTimeService processingTimeService; private @Nullable KeyedStateStore keyedStateStore; - private @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStoreV2; - private SupportKeyedStateApiSet supportKeyedStateApiSet; private final ExternalResourceInfoProvider externalResourceInfoProvider; @VisibleForTesting @@ -111,8 +108,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { this.operatorUniqueID = checkNotNull(operatorID).toString(); this.processingTimeService = processingTimeService; this.keyedStateStore = keyedStateStore; - // By default, support state v1 - this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1; this.externalResourceInfoProvider = externalResourceInfoProvider; } @@ -120,15 +115,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { this.keyedStateStore = keyedStateStore; } - public void setKeyedStateStoreV2( - @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore) { - if (keyedStateStore != null) { - // Only if the keyedStateStore is set, this context is switch to support state v2 - this.keyedStateStoreV2 = keyedStateStore; - this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2; - } - } - // ------------------------------------------------------------------------ /** @@ -253,12 +239,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( StateDescriptor<?, ?> stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); - checkState( - supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1, - "Current operator integrates the logic of async processing, " - + "thus only support state v2 APIs. Please use StateDescriptor under " - + "'org.apache.flink.runtime.state.v2' or make current operator extend " - + "from AbstractStreamOperator/AbstractStreamOperatorV2."); checkNotNull( keyedStateStore, String.format( @@ -270,32 +250,28 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { // TODO: Reconstruct this after StateManager is ready in FLIP-410. public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { - org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = - checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); return keyedStateStore.getValueState(stateProperties); } public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { - org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = - checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); return keyedStateStore.getListState(stateProperties); } public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { - org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = - checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); return keyedStateStore.getMapState(stateProperties); } public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { - org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = - checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); return keyedStateStore.getReducingState(stateProperties); } @@ -304,28 +280,20 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) { - org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore = - checkPreconditionsAndGetKeyedStateStoreV2(stateProperties); + KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(this::createSerializer); return keyedStateStore.getAggregatingState(stateProperties); } - private org.apache.flink.runtime.state.v2.KeyedStateStore - checkPreconditionsAndGetKeyedStateStoreV2( - org.apache.flink.api.common.state.v2.StateDescriptor<?> stateDescriptor) { + private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( + org.apache.flink.api.common.state.v2.StateDescriptor<?> stateDescriptor) { checkNotNull(stateDescriptor, "The state properties must not be null"); - checkState( - supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2, - "Current operator does not integrate the logic of async processing, " - + "thus only support state v1 APIs. Please use StateDescriptor under " - + "'org.apache.flink.runtime.state' or make current operator extend from " - + "AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2."); checkNotNull( - keyedStateStoreV2, + keyedStateStore, String.format( "Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", stateDescriptor.getStateId(), stateDescriptor.getType())); - return keyedStateStoreV2; + return keyedStateStore; } // ------------------ expose (read only) relevant information from the stream config -------- // @@ -338,13 +306,4 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public boolean isCheckpointingEnabled() { return streamConfig.isCheckpointingEnabled(); } - - /** - * Currently, we only support one keyed state api set. This is determined by the stream - * operator. This will be set via {@link #setKeyedStateStore} or {@link #setKeyedStateStoreV2}. - */ - private enum SupportKeyedStateApiSet { - STATE_V1, - STATE_V2 - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index f61696c7485..b1b71029eed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -463,6 +463,7 @@ class StreamingRuntimeContextTest { DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore( keyedStateBackend, + asyncKeyedStateBackend, new SerializerFactory() { @Override public <T> TypeSerializer<T> createSerializer( @@ -497,22 +498,10 @@ class StreamingRuntimeContextTest { any(org.apache.flink.api.common.state.v2.StateDescriptor.class)); operator.initializeState(streamTaskStateManager); - if (!stateV2) { - operator.getRuntimeContext().setKeyedStateStore(keyedStateStore); - } else { - operator.getRuntimeContext() - .setKeyedStateStoreV2( - new org.apache.flink.runtime.state.v2.DefaultKeyedStateStore( - asyncKeyedStateBackend, - new SerializerFactory() { - @Override - public <T> TypeSerializer<T> createSerializer( - TypeInformation<T> typeInformation) { - return typeInformation.createSerializer( - config.getSerializerConfig()); - } - })); + if (stateV2) { + keyedStateStore.setSupportKeyedStateApiSetV2(); } + operator.getRuntimeContext().setKeyedStateStore(keyedStateStore); return operator; }