This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04802953a1e77182ac8a0bfc9d1a5a269dc2c0ab
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 8 19:38:20 2025 +0800

    [FLINK-37045][Runtime] Merge two versions of KeyedStateStore
---
 .../flink/api/common/state/KeyedStateStore.java    | 115 +++++++++++++++++
 .../apache/flink/cep/utils/TestSharedBuffer.java   |  44 +++++++
 .../AbstractAsyncStateStreamOperator.java          |   8 +-
 .../AbstractAsyncStateStreamOperatorV2.java        |   7 +-
 .../runtime/state/DefaultKeyedStateStore.java      | 138 ++++++++++++++++++++-
 .../runtime/state/v2/DefaultKeyedStateStore.java   | 114 -----------------
 .../flink/runtime/state/v2/KeyedStateStore.java    | 110 ----------------
 .../api/operators/StreamOperatorStateHandler.java  |  25 +---
 .../api/operators/StreamingRuntimeContext.java     |  59 ++-------
 .../api/operators/StreamingRuntimeContextTest.java |  19 +--
 10 files changed, 324 insertions(+), 315 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 8ce0d9c2c64..69c513c388a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 
+import javax.annotation.Nonnull;
+
 /** This interface contains methods for registering keyed state with a managed 
store. */
 @PublicEvolving
 public interface KeyedStateStore {
@@ -223,4 +226,116 @@ public interface KeyedStateStore {
      */
     @PublicEvolving
     <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties);
+
+    // --------------------------
+    // State V2 creation methods
+    // --------------------------
+
+    /**
+     * Gets a handle to the system's {@link 
org.apache.flink.api.common.state.v2.ValueState}. The
+     * key/value state is only accessible if the function is executed on a 
KeyedStream. On each
+     * access, the state exposes the value for the key of the element 
currently processed by the
+     * function. Each function may have multiple partitioned states, addressed 
with different names.
+     *
+     * <p>Because the scope of each value is the key of the currently 
processed element, and the
+     * elements are distributed by the Flink runtime, the system can 
transparently scale out and
+     * redistribute the state and KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
state.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     */
+    @Experimental
+    default <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            @Nonnull 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
+        return getValueState(stateProperties);
+    }
+
+    /**
+     * Gets a handle to the system's {@link 
org.apache.flink.api.common.state.v2.ValueState}. The
+     * key/value state is only accessible if the function is executed on a 
KeyedStream. On each
+     * access, the state exposes the value for the key of the element 
currently processed by the
+     * function. Each function may have multiple partitioned states, addressed 
with different names.
+     *
+     * <p>Because the scope of each value is the key of the currently 
processed element, and the
+     * elements are distributed by the Flink runtime, the system can 
transparently scale out and
+     * redistribute the state and KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
state.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState(
+            @Nonnull 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties);
+
+    /**
+     * Gets a handle to the system's key / value list state. This state is 
optimized for state that
+     * holds lists. One can adds elements to the list, or retrieve the list as 
a whole. This state
+     * is only accessible if the function is executed on a KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
state.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part os a KeyedStream).
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            @Nonnull 
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value map state. This state is only 
accessible if the
+     * function is executed on a KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
state.
+     * @param <UK> The type of the user keys stored in the state.
+     * @param <UV> The type of the user values stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
+            @Nonnull
+                    
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV>
+                            stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value reducing state. This state is 
optimized for state
+     * that aggregates values.
+     *
+     * <p>This state is only accessible if the function is executed on a 
KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
+            @Nonnull
+                    
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T>
+                            stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value aggregating state. This state 
is only accessible if
+     * the function is executed on a KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <IN> The type of the values that are added to the state.
+     * @param <ACC> The type of the accumulator (intermediate aggregation 
state).
+     * @param <OUT> The type of the values that are returned from the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    @Nonnull
+                            
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
+                                            IN, ACC, OUT>
+                                    stateProperties);
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 169df8eafa8..1676c3de5c4 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.configuration.SharedBufferCacheConfig;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -243,6 +245,48 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
             };
         }
 
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
+                @Nonnull
+                        
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T>
+                                stateProperties) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ListState<T> 
getListState(
+                @Nonnull
+                        
org.apache.flink.api.common.state.v2.ListStateDescriptor<T>
+                                stateProperties) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+                @Nonnull
+                        
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV>
+                                stateProperties) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+                @Nonnull
+                        
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T>
+                                stateProperties) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <IN, ACC, OUT>
+                org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                        @Nonnull
+                                
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
+                                                IN, ACC, OUT>
+                                        stateProperties) {
+            throw new UnsupportedOperationException();
+        }
+
         private class CountingIterator<T> implements Iterator<T> {
 
             private final Iterator<T> iterator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index ad9ce787cac..20b93542b37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,6 +34,7 @@ import 
org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
 import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Input;
@@ -91,7 +93,11 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
     public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
         super.initializeState(streamTaskStateManager);
-        
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
+        KeyedStateStore stateStore = 
stateHandler.getKeyedStateStore().orElse(null);
+        if (stateStore instanceof DefaultKeyedStateStore) {
+            ((DefaultKeyedStateStore) 
stateStore).setSupportKeyedStateApiSetV2();
+        }
+
         final StreamTask<?, ?> containingTask = 
checkNotNull(getContainingTask());
         environment = containingTask.getEnvironment();
         final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index ab3eff901fa..b9757d816bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.asyncprocessing.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.asyncprocessing.RecordContext;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
 import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -90,7 +92,10 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
     public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
         super.initializeState(streamTaskStateManager);
-        
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
+        KeyedStateStore stateStore = 
stateHandler.getKeyedStateStore().orElse(null);
+        if (stateStore instanceof DefaultKeyedStateStore) {
+            ((DefaultKeyedStateStore) 
stateStore).setSupportKeyedStateApiSetV2();
+        }
 
         final int inFlightRecordsLimit = 
getExecutionConfig().getAsyncInflightRecordsLimit();
         final int asyncBufferSize = 
getExecutionConfig().getAsyncStateBufferSize();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index 16c122e5ee2..f43e6b8ad0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -35,7 +35,11 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Default implementation of KeyedStateStore that currently forwards state 
registration to a {@link
@@ -43,13 +47,38 @@ import static java.util.Objects.requireNonNull;
  */
 public class DefaultKeyedStateStore implements KeyedStateStore {
 
-    protected final KeyedStateBackend<?> keyedStateBackend;
+    @Nullable protected final KeyedStateBackend<?> keyedStateBackend;
+
+    @Nullable protected final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;
     protected final SerializerFactory serializerFactory;
 
+    protected SupportKeyedStateApiSet supportKeyedStateApiSet;
+
     public DefaultKeyedStateStore(
             KeyedStateBackend<?> keyedStateBackend, SerializerFactory 
serializerFactory) {
-        this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
+        this(keyedStateBackend, null, serializerFactory);
+    }
+
+    public DefaultKeyedStateStore(
+            AsyncKeyedStateBackend<?> asyncKeyedStateBackend, 
SerializerFactory serializerFactory) {
+        this(null, asyncKeyedStateBackend, serializerFactory);
+    }
+
+    public DefaultKeyedStateStore(
+            @Nullable KeyedStateBackend<?> keyedStateBackend,
+            @Nullable AsyncKeyedStateBackend<?> asyncKeyedStateBackend,
+            SerializerFactory serializerFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.asyncKeyedStateBackend = asyncKeyedStateBackend;
         this.serializerFactory = Preconditions.checkNotNull(serializerFactory);
+        if (keyedStateBackend != null) {
+            // By default, we support state v1
+            this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1;
+        } else if (asyncKeyedStateBackend != null) {
+            this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2;
+        } else {
+            throw new IllegalArgumentException("The state backend must not be 
null.");
+        }
     }
 
     @Override
@@ -112,7 +141,112 @@ public class DefaultKeyedStateStore implements 
KeyedStateStore {
 
     protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor)
             throws Exception {
+        checkState(
+                keyedStateBackend != null
+                        && supportKeyedStateApiSet == 
SupportKeyedStateApiSet.STATE_V1,
+                "Current operator does not integrate the async processing 
logic, "
+                        + "thus only supports state v1 APIs. Please use 
StateDescriptor under "
+                        + "'org.apache.flink.runtime.state'.");
         return keyedStateBackend.getPartitionedState(
                 VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateDescriptor);
     }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
+            @Nonnull 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
+        requireNonNull(stateProperties, "The state properties must not be 
null");
+        try {
+            stateProperties.initializeSerializerUnlessSet(serializerFactory);
+            return getPartitionedState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            @Nonnull 
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
+        requireNonNull(stateProperties, "The state properties must not be 
null");
+        try {
+            stateProperties.initializeSerializerUnlessSet(serializerFactory);
+            return getPartitionedState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+
+    @Override
+    public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+            @Nonnull
+                    
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV>
+                            stateProperties) {
+        requireNonNull(stateProperties, "The state properties must not be 
null");
+        try {
+            stateProperties.initializeSerializerUnlessSet(serializerFactory);
+            return getPartitionedState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+            @Nonnull
+                    
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T>
+                            stateProperties) {
+        requireNonNull(stateProperties, "The state properties must not be 
null");
+        try {
+            stateProperties.initializeSerializerUnlessSet(serializerFactory);
+            return getPartitionedState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+
+    @Override
+    public <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    @Nonnull
+                            
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
+                                            IN, ACC, OUT>
+                                    stateProperties) {
+        requireNonNull(stateProperties, "The state properties must not be 
null");
+        try {
+            stateProperties.initializeSerializerUnlessSet(serializerFactory);
+            return getPartitionedState(stateProperties);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while getting state", e);
+        }
+    }
+
+    protected <S extends org.apache.flink.api.common.state.v2.State, SV> S 
getPartitionedState(
+            org.apache.flink.api.common.state.v2.StateDescriptor<SV> 
stateDescriptor)
+            throws Exception {
+        checkState(
+                asyncKeyedStateBackend != null
+                        && supportKeyedStateApiSet == 
SupportKeyedStateApiSet.STATE_V2,
+                "Current operator integrates the async processing logic, "
+                        + "thus only supports state v2 APIs. Please use 
StateDescriptor under "
+                        + "'org.apache.flink.runtime.state.v2'.");
+        return asyncKeyedStateBackend.getOrCreateKeyedState(
+                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateDescriptor);
+    }
+
+    public void setSupportKeyedStateApiSetV2() {
+        requireNonNull(
+                asyncKeyedStateBackend,
+                "Current operator integrates the logic of async processing, "
+                        + "thus only support state v2 APIs. Please use 
StateDescriptor under "
+                        + "'org.apache.flink.runtime.state.v2'.");
+        supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2;
+    }
+
+    /**
+     * Currently, we only support one keyed state api set. This is determined 
by the stream
+     * operator.
+     */
+    private enum SupportKeyedStateApiSet {
+        STATE_V1,
+        STATE_V2
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
deleted file mode 100644
index 52565b7fad6..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/DefaultKeyedStateStore.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.v2;
-
-import org.apache.flink.api.common.functions.SerializerFactory;
-import org.apache.flink.api.common.state.v2.AggregatingState;
-import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.v2.ListState;
-import org.apache.flink.api.common.state.v2.ListStateDescriptor;
-import org.apache.flink.api.common.state.v2.MapState;
-import org.apache.flink.api.common.state.v2.MapStateDescriptor;
-import org.apache.flink.api.common.state.v2.ReducingState;
-import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.v2.ValueState;
-import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
-import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-/** Default implementation of KeyedStateStore. */
-public class DefaultKeyedStateStore implements KeyedStateStore {
-
-    private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;
-    protected final SerializerFactory serializerFactory;
-
-    public DefaultKeyedStateStore(
-            @Nonnull AsyncKeyedStateBackend asyncKeyedStateBackend,
-            SerializerFactory serializerFactory) {
-        this.asyncKeyedStateBackend = 
Preconditions.checkNotNull(asyncKeyedStateBackend);
-        this.serializerFactory = Preconditions.checkNotNull(serializerFactory);
-    }
-
-    @Override
-    public <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> 
stateProperties) {
-        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
-        try {
-            stateProperties.initializeSerializerUnlessSet(serializerFactory);
-            return asyncKeyedStateBackend.getOrCreateKeyedState(
-                    VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateProperties);
-        } catch (Exception e) {
-            throw new RuntimeException("Error while getting state", e);
-        }
-    }
-
-    @Override
-    public <T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> 
stateProperties) {
-        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
-        try {
-            stateProperties.initializeSerializerUnlessSet(serializerFactory);
-            return asyncKeyedStateBackend.getOrCreateKeyedState(
-                    VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateProperties);
-        } catch (Exception e) {
-            throw new RuntimeException("Error while getting state", e);
-        }
-    }
-
-    @Override
-    public <UK, UV> MapState<UK, UV> getMapState(
-            @Nonnull MapStateDescriptor<UK, UV> stateProperties) {
-        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
-        try {
-            stateProperties.initializeSerializerUnlessSet(serializerFactory);
-            return asyncKeyedStateBackend.getOrCreateKeyedState(
-                    VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateProperties);
-        } catch (Exception e) {
-            throw new RuntimeException("Error while getting state", e);
-        }
-    }
-
-    @Override
-    public <T> ReducingState<T> getReducingState(
-            @Nonnull ReducingStateDescriptor<T> stateProperties) {
-        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
-        try {
-            stateProperties.initializeSerializerUnlessSet(serializerFactory);
-            return asyncKeyedStateBackend.getOrCreateKeyedState(
-                    VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateProperties);
-        } catch (Exception e) {
-            throw new RuntimeException("Error while getting state", e);
-        }
-    }
-
-    @Override
-    public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
-            @Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) 
{
-        Preconditions.checkNotNull(stateProperties, "The state properties must 
not be null");
-        try {
-            stateProperties.initializeSerializerUnlessSet(serializerFactory);
-            return asyncKeyedStateBackend.getOrCreateKeyedState(
-                    VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
stateProperties);
-        } catch (Exception e) {
-            throw new RuntimeException("Error while getting state", e);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
deleted file mode 100644
index b47b3b51623..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/KeyedStateStore.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.v2;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.v2.AggregatingState;
-import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.v2.ListState;
-import org.apache.flink.api.common.state.v2.ListStateDescriptor;
-import org.apache.flink.api.common.state.v2.MapState;
-import org.apache.flink.api.common.state.v2.MapStateDescriptor;
-import org.apache.flink.api.common.state.v2.ReducingState;
-import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.v2.State;
-import org.apache.flink.api.common.state.v2.ValueState;
-import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
-
-import javax.annotation.Nonnull;
-
-/** This interface contains methods for registering {@link State}. */
-@Internal
-public interface KeyedStateStore {
-
-    /**
-     * Gets a handle to the system's {@link ValueState}. The key/value state 
is only accessible if
-     * the function is executed on a KeyedStream. On each access, the state 
exposes the value for
-     * the key of the element currently processed by the function. Each 
function may have multiple
-     * partitioned states, addressed with different names.
-     *
-     * <p>Because the scope of each value is the key of the currently 
processed element, and the
-     * elements are distributed by the Flink runtime, the system can 
transparently scale out and
-     * redistribute the state and KeyedStream.
-     *
-     * @param stateProperties The descriptor defining the properties of the 
state.
-     * @param <T> The type of value stored in the state.
-     * @return The partitioned state object.
-     */
-    <T> ValueState<T> getValueState(@Nonnull ValueStateDescriptor<T> 
stateProperties);
-
-    /**
-     * Gets a handle to the system's key / value list state. This state is 
optimized for state that
-     * holds lists. One can adds elements to the list, or retrieve the list as 
a whole. This state
-     * is only accessible if the function is executed on a KeyedStream.
-     *
-     * @param stateProperties The descriptor defining the properties of the 
state.
-     * @param <T> The type of value stored in the state.
-     * @return The partitioned state object.
-     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
-     *     function (function is not part os a KeyedStream).
-     */
-    <T> ListState<T> getListState(@Nonnull ListStateDescriptor<T> 
stateProperties);
-
-    /**
-     * Gets a handle to the system's key/value map state. This state is only 
accessible if the
-     * function is executed on a KeyedStream.
-     *
-     * @param stateProperties The descriptor defining the properties of the 
state.
-     * @param <UK> The type of the user keys stored in the state.
-     * @param <UV> The type of the user values stored in the state.
-     * @return The partitioned state object.
-     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
-     *     function (function is not part of a KeyedStream).
-     */
-    <UK, UV> MapState<UK, UV> getMapState(@Nonnull MapStateDescriptor<UK, UV> 
stateProperties);
-
-    /**
-     * Gets a handle to the system's key/value reducing state. This state is 
optimized for state
-     * that aggregates values.
-     *
-     * <p>This state is only accessible if the function is executed on a 
KeyedStream.
-     *
-     * @param stateProperties The descriptor defining the properties of the 
stats.
-     * @param <T> The type of value stored in the state.
-     * @return The partitioned state object.
-     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
-     *     function (function is not part of a KeyedStream).
-     */
-    <T> ReducingState<T> getReducingState(@Nonnull ReducingStateDescriptor<T> 
stateProperties);
-
-    /**
-     * Gets a handle to the system's key/value aggregating state. This state 
is only accessible if
-     * the function is executed on a KeyedStream.
-     *
-     * @param stateProperties The descriptor defining the properties of the 
stats.
-     * @param <IN> The type of the values that are added to the state.
-     * @param <ACC> The type of the accumulator (intermediate aggregation 
state).
-     * @param <OUT> The type of the values that are returned from the state.
-     * @return The partitioned state object.
-     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
-     *     function (function is not part of a KeyedStream).
-     */
-    <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
-            @Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index bf01d4df0c7..7ea79afa1ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -85,8 +85,6 @@ public class StreamOperatorStateHandler {
 
     @Nullable private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;
 
-    @Nullable private final org.apache.flink.runtime.state.v2.KeyedStateStore 
keyedStateStoreV2;
-
     /** Backend for keyed state. This might be empty if we're not on a keyed 
stream. */
     @Nullable private final CheckpointableKeyedStateBackend<?> 
keyedStateBackend;
 
@@ -103,12 +101,14 @@ public class StreamOperatorStateHandler {
         this.keySerializer = context.keySerializer();
         this.operatorStateBackend = context.operatorStateBackend();
         this.keyedStateBackend = context.keyedStateBackend();
+        this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();
         this.closeableRegistry = closeableRegistry;
 
-        if (keyedStateBackend != null) {
+        if (keyedStateBackend != null || asyncKeyedStateBackend != null) {
             keyedStateStore =
                     new DefaultKeyedStateStore(
                             keyedStateBackend,
+                            asyncKeyedStateBackend,
                             new SerializerFactory() {
                                 @Override
                                 public <T> TypeSerializer<T> createSerializer(
@@ -120,21 +120,6 @@ public class StreamOperatorStateHandler {
         } else {
             keyedStateStore = null;
         }
-
-        this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();
-        this.keyedStateStoreV2 =
-                asyncKeyedStateBackend != null
-                        ? new 
org.apache.flink.runtime.state.v2.DefaultKeyedStateStore(
-                                asyncKeyedStateBackend,
-                                new SerializerFactory() {
-                                    @Override
-                                    public <T> TypeSerializer<T> 
createSerializer(
-                                            TypeInformation<T> 
typeInformation) {
-                                        return 
typeInformation.createSerializer(
-                                                
executionConfig.getSerializerConfig());
-                                    }
-                                })
-                        : null;
     }
 
     public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator)
@@ -496,10 +481,6 @@ public class StreamOperatorStateHandler {
         return Optional.ofNullable(keyedStateStore);
     }
 
-    public Optional<org.apache.flink.runtime.state.v2.KeyedStateStore> 
getKeyedStateStoreV2() {
-        return Optional.ofNullable(keyedStateStoreV2);
-    }
-
     /** Custom state handling hooks to be invoked by {@link 
StreamOperatorStateHandler}. */
     public interface CheckpointedStreamOperator {
         void initializeState(StateInitializationContext context) throws 
Exception;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 1859ffc2f8a..d352b526490 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -55,7 +55,6 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Implementation of the {@link 
org.apache.flink.api.common.functions.RuntimeContext}, for streaming
@@ -71,8 +70,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     private final String operatorUniqueID;
     private final ProcessingTimeService processingTimeService;
     private @Nullable KeyedStateStore keyedStateStore;
-    private @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore 
keyedStateStoreV2;
-    private SupportKeyedStateApiSet supportKeyedStateApiSet;
     private final ExternalResourceInfoProvider externalResourceInfoProvider;
 
     @VisibleForTesting
@@ -111,8 +108,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         this.operatorUniqueID = checkNotNull(operatorID).toString();
         this.processingTimeService = processingTimeService;
         this.keyedStateStore = keyedStateStore;
-        // By default, support state v1
-        this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V1;
         this.externalResourceInfoProvider = externalResourceInfoProvider;
     }
 
@@ -120,15 +115,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         this.keyedStateStore = keyedStateStore;
     }
 
-    public void setKeyedStateStoreV2(
-            @Nullable org.apache.flink.runtime.state.v2.KeyedStateStore 
keyedStateStore) {
-        if (keyedStateStore != null) {
-            // Only if the keyedStateStore is set, this context is switch to 
support state v2
-            this.keyedStateStoreV2 = keyedStateStore;
-            this.supportKeyedStateApiSet = SupportKeyedStateApiSet.STATE_V2;
-        }
-    }
-
     // ------------------------------------------------------------------------
 
     /**
@@ -253,12 +239,6 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
             StateDescriptor<?, ?> stateDescriptor) {
         checkNotNull(stateDescriptor, "The state properties must not be null");
-        checkState(
-                supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,
-                "Current operator integrates the logic of async processing, "
-                        + "thus only support state v2 APIs. Please use 
StateDescriptor under "
-                        + "'org.apache.flink.runtime.state.v2' or make current 
operator extend "
-                        + "from 
AbstractStreamOperator/AbstractStreamOperatorV2.");
         checkNotNull(
                 keyedStateStore,
                 String.format(
@@ -270,32 +250,28 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     // TODO: Reconstruct this after StateManager is ready in FLIP-410.
     public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
             org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
-        org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
-                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
         return keyedStateStore.getValueState(stateProperties);
     }
 
     public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
             org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
-        org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
-                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
         return keyedStateStore.getListState(stateProperties);
     }
 
     public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
             org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
-        org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
-                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
         return keyedStateStore.getMapState(stateProperties);
     }
 
     public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
             org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
-        org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
-                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
         return keyedStateStore.getReducingState(stateProperties);
     }
@@ -304,28 +280,20 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
             org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
                     
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
                             stateProperties) {
-        org.apache.flink.runtime.state.v2.KeyedStateStore keyedStateStore =
-                checkPreconditionsAndGetKeyedStateStoreV2(stateProperties);
+        KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
         stateProperties.initializeSerializerUnlessSet(this::createSerializer);
         return keyedStateStore.getAggregatingState(stateProperties);
     }
 
-    private org.apache.flink.runtime.state.v2.KeyedStateStore
-            checkPreconditionsAndGetKeyedStateStoreV2(
-                    org.apache.flink.api.common.state.v2.StateDescriptor<?> 
stateDescriptor) {
+    private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
+            org.apache.flink.api.common.state.v2.StateDescriptor<?> 
stateDescriptor) {
         checkNotNull(stateDescriptor, "The state properties must not be null");
-        checkState(
-                supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V2,
-                "Current operator does not integrate the logic of async 
processing, "
-                        + "thus only support state v1 APIs. Please use 
StateDescriptor under "
-                        + "'org.apache.flink.runtime.state' or make current 
operator extend from "
-                        + 
"AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2.");
         checkNotNull(
-                keyedStateStoreV2,
+                keyedStateStore,
                 String.format(
                         "Keyed state '%s' with type %s can only be used on a 
'keyed stream', i.e., after a 'keyBy()' operation.",
                         stateDescriptor.getStateId(), 
stateDescriptor.getType()));
-        return keyedStateStoreV2;
+        return keyedStateStore;
     }
 
     // ------------------ expose (read only) relevant information from the 
stream config -------- //
@@ -338,13 +306,4 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
     public boolean isCheckpointingEnabled() {
         return streamConfig.isCheckpointingEnabled();
     }
-
-    /**
-     * Currently, we only support one keyed state api set. This is determined 
by the stream
-     * operator. This will be set via {@link #setKeyedStateStore} or {@link 
#setKeyedStateStoreV2}.
-     */
-    private enum SupportKeyedStateApiSet {
-        STATE_V1,
-        STATE_V2
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index f61696c7485..b1b71029eed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -463,6 +463,7 @@ class StreamingRuntimeContextTest {
         DefaultKeyedStateStore keyedStateStore =
                 new DefaultKeyedStateStore(
                         keyedStateBackend,
+                        asyncKeyedStateBackend,
                         new SerializerFactory() {
                             @Override
                             public <T> TypeSerializer<T> createSerializer(
@@ -497,22 +498,10 @@ class StreamingRuntimeContextTest {
                         
any(org.apache.flink.api.common.state.v2.StateDescriptor.class));
 
         operator.initializeState(streamTaskStateManager);
-        if (!stateV2) {
-            operator.getRuntimeContext().setKeyedStateStore(keyedStateStore);
-        } else {
-            operator.getRuntimeContext()
-                    .setKeyedStateStoreV2(
-                            new 
org.apache.flink.runtime.state.v2.DefaultKeyedStateStore(
-                                    asyncKeyedStateBackend,
-                                    new SerializerFactory() {
-                                        @Override
-                                        public <T> TypeSerializer<T> 
createSerializer(
-                                                TypeInformation<T> 
typeInformation) {
-                                            return 
typeInformation.createSerializer(
-                                                    
config.getSerializerConfig());
-                                        }
-                                    }));
+        if (stateV2) {
+            keyedStateStore.setSupportKeyedStateApiSetV2();
         }
+        operator.getRuntimeContext().setKeyedStateStore(keyedStateStore);
 
         return operator;
     }

Reply via email to