This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfdc6db0955a8ed7c7a3dcafa265f8f5df73169e
Author: Zakelly <[email protected]>
AuthorDate: Fri Feb 7 17:22:25 2025 +0800

    [FLINK-37276] Add missing state v2 access interfaces in `RuntimeContext`
---
 .../flink/api/common/functions/RuntimeContext.java | 90 ++++++++++++++++++++++
 .../functions/util/AbstractRuntimeUDFContext.java  | 37 +++++++++
 .../flink/cep/operator/CepRuntimeContext.java      | 32 ++++++++
 .../flink/cep/operator/CepRuntimeContextTest.java  | 45 +++++++++++
 .../state/api/runtime/SavepointRuntimeContext.java | 32 ++++++++
 .../api/operators/StreamingRuntimeContext.java     | 10 +++
 .../api/functions/async/RichAsyncFunction.java     | 38 +++++++++
 .../api/functions/async/RichAsyncFunctionTest.java | 73 ++++++++++++++++++
 .../async/AsyncStateGroupAggFunction.java          |  3 +-
 .../AsyncStateDeduplicateFunctionBase.java         |  3 +-
 .../rank/async/AbstractAsyncStateTopNFunction.java |  4 +-
 .../async/AsyncStateAppendOnlyTopNFunction.java    |  3 +-
 .../rank/async/AsyncStateFastTop1Function.java     |  4 +-
 13 files changed, 362 insertions(+), 12 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index b36c0adaa2a..9270ed7acd3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.functions;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobInfo;
@@ -410,6 +411,95 @@ public interface RuntimeContext {
     @PublicEvolving
     <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties);
 
+    // ------------------------------------------------------------------------
+    //  Methods for accessing state V2
+    // ------------------------------------------------------------------------
+
+    /**
+     * Gets a handle to the system's key/value state. The key/value state is 
only accessible if the
+     * function is executed on a KeyedStream. On each access, the state 
exposes the value for the
+     * key of the element currently processed by the function. Each function 
may have multiple
+     * partitioned states, addressed with different names.
+     *
+     * <p>Because the scope of each value is the key of the currently 
processed element, and the
+     * elements are distributed by the Flink runtime, the system can 
transparently scale out and
+     * redistribute the state and KeyedStream.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value list state. This state is 
similar to the state
+     * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized 
for state that holds
+     * lists. One can add elements to the list, or retrieve the list as a 
whole.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part os a KeyedStream).
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value reducing state. This state is 
similar to the state
+     * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized 
for state that
+     * aggregates values.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <T> The type of value stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
+            org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value aggregating state. This state 
is similar to the state
+     * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized 
for state that
+     * aggregates values with different types.
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <IN> The type of the values that are added to the state.
+     * @param <ACC> The type of the accumulator (intermediate aggregation 
state).
+     * @param <OUT> The type of the values that are returned from the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
+                            stateProperties);
+
+    /**
+     * Gets a handle to the system's key/value map state. This state is 
similar to the state
+     * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized 
for state that is
+     * composed of user-defined key-value pairs
+     *
+     * @param stateProperties The descriptor defining the properties of the 
stats.
+     * @param <UK> The type of the user keys stored in the state.
+     * @param <UV> The type of the user values stored in the state.
+     * @return The partitioned state object.
+     * @throws UnsupportedOperationException Thrown, if no partitioned state 
is available for the
+     *     function (function is not part of a KeyedStream).
+     */
+    @Experimental
+    <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties);
+
     /**
      * Get the meta information of current job.
      *
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 75efc2a82d8..38a24043947 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -232,6 +232,43 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
                 "This state is only accessible by functions executed on a 
KeyedStream");
     }
 
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException(
+                "This state is only accessible by functions executed on a 
KeyedStream");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException(
+                "This state is only accessible by functions executed on a 
KeyedStream");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+            org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException(
+                "This state is only accessible by functions executed on a 
KeyedStream");
+    }
+
+    @Override
+    public <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
+                            stateProperties) {
+        throw new UnsupportedOperationException(
+                "This state is only accessible by functions executed on a 
KeyedStream");
+    }
+
+    @Override
+    public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
+        throw new UnsupportedOperationException(
+                "This state is only accessible by functions executed on a 
KeyedStream");
+    }
+
     @Internal
     @VisibleForTesting
     public String getAllocationIDAsString() {
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index ee8f7dd77dd..2ae1221b28a 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -194,4 +194,36 @@ class CepRuntimeContext implements RuntimeContext {
     public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, 
UV> stateProperties) {
         throw new UnsupportedOperationException("State is not supported.");
     }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State is not supported.");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State is not supported.");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+            org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State is not supported.");
+    }
+
+    @Override
+    public <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
+                            stateProperties) {
+        throw new UnsupportedOperationException("State is not supported.");
+    }
+
+    @Override
+    public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
+        throw new UnsupportedOperationException("State is not supported.");
+    }
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
index ba39e5ae085..85312f857b2 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
@@ -191,6 +191,51 @@ public class CepRuntimeContextTest extends TestLogger {
             // expected
         }
 
+        try {
+            runtimeContext.getState(
+                    new 
org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
+                            "foobar", Integer.class));
+            fail("Expected getState to fail with unsupported operation 
exception.");
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
+
+        try {
+            runtimeContext.getListState(
+                    new 
org.apache.flink.api.common.state.v2.ListStateDescriptor<>(
+                            "foobar", Integer.class));
+            fail("Expected getListState to fail with unsupported operation 
exception.");
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
+
+        try {
+            runtimeContext.getReducingState(
+                    new 
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<>(
+                            "foobar", mock(ReduceFunction.class), 
Integer.class));
+            fail("Expected getReducingState to fail with unsupported operation 
exception.");
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
+
+        try {
+            runtimeContext.getAggregatingState(
+                    new 
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<>(
+                            "foobar", mock(AggregateFunction.class), 
Integer.class));
+            fail("Expected getAggregatingState to fail with unsupported 
operation exception.");
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
+
+        try {
+            runtimeContext.getMapState(
+                    new 
org.apache.flink.api.common.state.v2.MapStateDescriptor<>(
+                            "foobar", Integer.class, String.class));
+            fail("Expected getMapState to fail with unsupported operation 
exception.");
+        } catch (UnsupportedOperationException e) {
+            // expected
+        }
+
         try {
             runtimeContext.addAccumulator("foobar", mock(Accumulator.class));
             fail("Expected addAccumulator to fail with unsupported operation 
exception.");
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
index e39d18c68a6..10ba1b840f1 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java
@@ -232,6 +232,38 @@ public final class SavepointRuntimeContext implements 
RuntimeContext {
         return keyedStateStore.getMapState(stateProperties);
     }
 
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State processor api does not 
support state v2.");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State processor api does not 
support state v2.");
+    }
+
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+            org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
+        throw new UnsupportedOperationException("State processor api does not 
support state v2.");
+    }
+
+    @Override
+    public <IN, ACC, OUT>
+            org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                    
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
+                            stateProperties) {
+        throw new UnsupportedOperationException("State processor api does not 
support state v2.");
+    }
+
+    @Override
+    public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
+        throw new UnsupportedOperationException("State processor api does not 
support state v2.");
+    }
+
     public List<StateDescriptor<?, ?>> getStateDescriptors() {
         if (registeredDescriptors.isEmpty()) {
             return Collections.emptyList();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index d352b526490..e36508e544f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -247,6 +247,12 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore;
     }
 
+    @Override
+    public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+            org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+        return getValueState(stateProperties);
+    }
+
     // TODO: Reconstruct this after StateManager is ready in FLIP-410.
     public <T> org.apache.flink.api.common.state.v2.ValueState<T> 
getValueState(
             org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
@@ -255,6 +261,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore.getValueState(stateProperties);
     }
 
+    @Override
     public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
             org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
         KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
@@ -262,6 +269,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore.getListState(stateProperties);
     }
 
+    @Override
     public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
             org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> 
stateProperties) {
         KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
@@ -269,6 +277,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore.getMapState(stateProperties);
     }
 
+    @Override
     public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
             org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
         KeyedStateStore keyedStateStore = 
checkPreconditionsAndGetKeyedStateStore(stateProperties);
@@ -276,6 +285,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
         return keyedStateStore.getReducingState(stateProperties);
     }
 
+    @Override
     public <IN, ACC, OUT>
             org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
                     
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 5f9b33fe7e5..781af9721a8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -186,6 +186,44 @@ public abstract class RichAsyncFunction<IN, OUT> extends 
AbstractRichFunction
                     "State is not supported in rich async functions.");
         }
 
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
+                org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> 
stateProperties) {
+            throw new UnsupportedOperationException(
+                    "State is not supported in rich async functions.");
+        }
+
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ListState<T> 
getListState(
+                org.apache.flink.api.common.state.v2.ListStateDescriptor<T> 
stateProperties) {
+            throw new UnsupportedOperationException(
+                    "State is not supported in rich async functions.");
+        }
+
+        @Override
+        public <T> org.apache.flink.api.common.state.v2.ReducingState<T> 
getReducingState(
+                
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> 
stateProperties) {
+            throw new UnsupportedOperationException(
+                    "State is not supported in rich async functions.");
+        }
+
+        @Override
+        public <IN, ACC, OUT>
+                org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> 
getAggregatingState(
+                        
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
+                                        IN, ACC, OUT>
+                                stateProperties) {
+            throw new UnsupportedOperationException(
+                    "State is not supported in rich async functions.");
+        }
+
+        @Override
+        public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> 
getMapState(
+                org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, 
UV> stateProperties) {
+            throw new UnsupportedOperationException(
+                    "State is not supported in rich async functions.");
+        }
+
         @Override
         public <V, A extends Serializable> void addAccumulator(
                 String name, Accumulator<V, A> accumulator) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index 076cb1d66c0..c27b4e679fb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -205,6 +205,79 @@ class RichAsyncFunctionTest {
                                                 "foobar", Integer.class, 
String.class)))
                 .isInstanceOf(UnsupportedOperationException.class);
 
+        assertThatThrownBy(
+                        () ->
+                                runtimeContext.getState(
+                                        new 
org.apache.flink.api.common.state.v2
+                                                
.ValueStateDescriptor<>("foobar", Integer.class)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
+        assertThatThrownBy(
+                        () ->
+                                runtimeContext.getListState(
+                                        new 
org.apache.flink.api.common.state.v2
+                                                
.ListStateDescriptor<>("foobar", Integer.class)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
+        assertThatThrownBy(
+                        () ->
+                                runtimeContext.getReducingState(
+                                        new 
org.apache.flink.api.common.state.v2
+                                                .ReducingStateDescriptor<>(
+                                                "foobar",
+                                                new ReduceFunction<Integer>() {
+                                                    private static final long 
serialVersionUID =
+                                                            
2136425961884441050L;
+
+                                                    @Override
+                                                    public Integer reduce(
+                                                            Integer value1, 
Integer value2) {
+                                                        return value1;
+                                                    }
+                                                },
+                                                Integer.class)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
+        assertThatThrownBy(
+                        () ->
+                                runtimeContext.getAggregatingState(
+                                        new 
org.apache.flink.api.common.state.v2
+                                                .AggregatingStateDescriptor<>(
+                                                "foobar",
+                                                new AggregateFunction<Integer, 
Integer, Integer>() {
+
+                                                    @Override
+                                                    public Integer 
createAccumulator() {
+                                                        return null;
+                                                    }
+
+                                                    @Override
+                                                    public Integer add(
+                                                            Integer value, 
Integer accumulator) {
+                                                        return null;
+                                                    }
+
+                                                    @Override
+                                                    public Integer 
getResult(Integer accumulator) {
+                                                        return null;
+                                                    }
+
+                                                    @Override
+                                                    public Integer 
merge(Integer a, Integer b) {
+                                                        return null;
+                                                    }
+                                                },
+                                                Integer.class)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
+        assertThatThrownBy(
+                        () ->
+                                runtimeContext.getMapState(
+                                        new 
org.apache.flink.api.common.state.v2
+                                                .MapStateDescriptor<>(
+                                                "foobar", Integer.class, 
String.class)))
+                .isInstanceOf(UnsupportedOperationException.class);
+
         assertThatThrownBy(
                         () ->
                                 runtimeContext.addAccumulator(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
index 50a7490da2a..f852e167d62 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.table.runtime.operators.aggregate.async;
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -86,7 +85,7 @@ public class AsyncStateGroupAggFunction extends 
GroupAggFunctionBase {
             accDesc.enableTimeToLive(ttlConfig);
         }
 
-        accState = ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(accDesc);
+        accState = getRuntimeContext().getState(accDesc);
         aggHelper = new AsyncStateGroupAggHelper();
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
index 1b203cc4df7..ce1b661cf76 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase;
 
 import org.slf4j.Logger;
@@ -68,6 +67,6 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, 
OUT>
         if (ttlConfig.isEnabled()) {
             stateDesc.enableTimeToLive(ttlConfig);
         }
-        state = ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(stateDesc);
+        state = getRuntimeContext().getState(stateDesc);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
index 12d2b9c7a56..d79978fa06d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -78,8 +77,7 @@ public abstract class AbstractAsyncStateTopNFunction extends 
AbstractTopNFunctio
             if (ttlConfig.isEnabled()) {
                 rankStateDesc.enableTimeToLive(ttlConfig);
             }
-            rankEndState =
-                    ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(rankStateDesc);
+            rankEndState = getRuntimeContext().getState(rankStateDesc);
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
index 962298bb989..c67615c7e23 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -97,7 +96,7 @@ public class AsyncStateAppendOnlyTopNFunction extends 
AbstractAsyncStateTopNFunc
         if (ttlConfig.isEnabled()) {
             mapStateDescriptor.enableTimeToLive(ttlConfig);
         }
-        dataState = ((StreamingRuntimeContext) 
getRuntimeContext()).getMapState(mapStateDescriptor);
+        dataState = getRuntimeContext().getMapState(mapStateDescriptor);
 
         helper = new AsyncStateAppendOnlyTopNHelper();
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java
index 2704d5547fe..7b22e926e86 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
@@ -98,8 +97,7 @@ public class AsyncStateFastTop1Function extends 
AbstractAsyncStateTopNFunction
         if (ttlConfig.isEnabled()) {
             valueStateDescriptor.enableTimeToLive(ttlConfig);
         }
-        dataState =
-                ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(valueStateDescriptor);
+        dataState = getRuntimeContext().getState(valueStateDescriptor);
 
         helper = new AsyncStateFastTop1Helper();
 


Reply via email to