This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfdc6db0955a8ed7c7a3dcafa265f8f5df73169e Author: Zakelly <[email protected]> AuthorDate: Fri Feb 7 17:22:25 2025 +0800 [FLINK-37276] Add missing state v2 access interfaces in `RuntimeContext` --- .../flink/api/common/functions/RuntimeContext.java | 90 ++++++++++++++++++++++ .../functions/util/AbstractRuntimeUDFContext.java | 37 +++++++++ .../flink/cep/operator/CepRuntimeContext.java | 32 ++++++++ .../flink/cep/operator/CepRuntimeContextTest.java | 45 +++++++++++ .../state/api/runtime/SavepointRuntimeContext.java | 32 ++++++++ .../api/operators/StreamingRuntimeContext.java | 10 +++ .../api/functions/async/RichAsyncFunction.java | 38 +++++++++ .../api/functions/async/RichAsyncFunctionTest.java | 73 ++++++++++++++++++ .../async/AsyncStateGroupAggFunction.java | 3 +- .../AsyncStateDeduplicateFunctionBase.java | 3 +- .../rank/async/AbstractAsyncStateTopNFunction.java | 4 +- .../async/AsyncStateAppendOnlyTopNFunction.java | 3 +- .../rank/async/AsyncStateFastTop1Function.java | 4 +- 13 files changed, 362 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index b36c0adaa2a..9270ed7acd3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobInfo; @@ -410,6 +411,95 @@ public interface RuntimeContext { @PublicEvolving <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); + // ------------------------------------------------------------------------ + // Methods for accessing state V2 + // ------------------------------------------------------------------------ + + /** + * Gets a handle to the system's key/value state. The key/value state is only accessible if the + * function is executed on a KeyedStream. On each access, the state exposes the value for the + * key of the element currently processed by the function. Each function may have multiple + * partitioned states, addressed with different names. + * + * <p>Because the scope of each value is the key of the currently processed element, and the + * elements are distributed by the Flink runtime, the system can transparently scale out and + * redistribute the state and KeyedStream. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key/value list state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that holds + * lists. One can add elements to the list, or retrieve the list as a whole. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part os a KeyedStream). + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key/value reducing state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * aggregates values. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <T> The type of value stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key/value aggregating state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * aggregates values with different types. + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <IN> The type of the values that are added to the state. + * @param <ACC> The type of the accumulator (intermediate aggregation state). + * @param <OUT> The type of the values that are returned from the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> + stateProperties); + + /** + * Gets a handle to the system's key/value map state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that is + * composed of user-defined key-value pairs + * + * @param stateProperties The descriptor defining the properties of the stats. + * @param <UK> The type of the user keys stored in the state. + * @param <UV> The type of the user values stored in the state. + * @return The partitioned state object. + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @Experimental + <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties); + /** * Get the meta information of current job. * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 75efc2a82d8..38a24043947 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -232,6 +232,43 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { "This state is only accessible by functions executed on a KeyedStream"); } + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> + stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + @Internal @VisibleForTesting public String getAllocationIDAsString() { diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java index ee8f7dd77dd..2ae1221b28a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java @@ -194,4 +194,36 @@ class CepRuntimeContext implements RuntimeContext { public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) { throw new UnsupportedOperationException("State is not supported."); } + + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> + stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException("State is not supported."); + } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java index ba39e5ae085..85312f857b2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java @@ -191,6 +191,51 @@ public class CepRuntimeContextTest extends TestLogger { // expected } + try { + runtimeContext.getState( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( + "foobar", Integer.class)); + fail("Expected getState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getListState( + new org.apache.flink.api.common.state.v2.ListStateDescriptor<>( + "foobar", Integer.class)); + fail("Expected getListState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getReducingState( + new org.apache.flink.api.common.state.v2.ReducingStateDescriptor<>( + "foobar", mock(ReduceFunction.class), Integer.class)); + fail("Expected getReducingState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getAggregatingState( + new org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<>( + "foobar", mock(AggregateFunction.class), Integer.class)); + fail("Expected getAggregatingState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + runtimeContext.getMapState( + new org.apache.flink.api.common.state.v2.MapStateDescriptor<>( + "foobar", Integer.class, String.class)); + fail("Expected getMapState to fail with unsupported operation exception."); + } catch (UnsupportedOperationException e) { + // expected + } + try { runtimeContext.addAccumulator("foobar", mock(Accumulator.class)); fail("Expected addAccumulator to fail with unsupported operation exception."); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java index e39d18c68a6..10ba1b840f1 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java @@ -232,6 +232,38 @@ public final class SavepointRuntimeContext implements RuntimeContext { return keyedStateStore.getMapState(stateProperties); } + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State processor api does not support state v2."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State processor api does not support state v2."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException("State processor api does not support state v2."); + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> + stateProperties) { + throw new UnsupportedOperationException("State processor api does not support state v2."); + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException("State processor api does not support state v2."); + } + public List<StateDescriptor<?, ?>> getStateDescriptors() { if (registeredDescriptors.isEmpty()) { return Collections.emptyList(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index d352b526490..e36508e544f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -247,6 +247,12 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore; } + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + return getValueState(stateProperties); + } + // TODO: Reconstruct this after StateManager is ready in FLIP-410. public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState( org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { @@ -255,6 +261,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore.getValueState(stateProperties); } + @Override public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); @@ -262,6 +269,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore.getListState(stateProperties); } + @Override public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); @@ -269,6 +277,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore.getMapState(stateProperties); } + @Override public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); @@ -276,6 +285,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return keyedStateStore.getReducingState(stateProperties); } + @Override public <IN, ACC, OUT> org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java index 5f9b33fe7e5..781af9721a8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @@ -186,6 +186,44 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction "State is not supported in rich async functions."); } + @Override + public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState( + org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } + + @Override + public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState( + org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } + + @Override + public <IN, ACC, OUT> + org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState( + org.apache.flink.api.common.state.v2.AggregatingStateDescriptor< + IN, ACC, OUT> + stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } + + @Override + public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException( + "State is not supported in rich async functions."); + } + @Override public <V, A extends Serializable> void addAccumulator( String name, Accumulator<V, A> accumulator) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java index 076cb1d66c0..c27b4e679fb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java @@ -205,6 +205,79 @@ class RichAsyncFunctionTest { "foobar", Integer.class, String.class))) .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy( + () -> + runtimeContext.getState( + new org.apache.flink.api.common.state.v2 + .ValueStateDescriptor<>("foobar", Integer.class))) + .isInstanceOf(UnsupportedOperationException.class); + + assertThatThrownBy( + () -> + runtimeContext.getListState( + new org.apache.flink.api.common.state.v2 + .ListStateDescriptor<>("foobar", Integer.class))) + .isInstanceOf(UnsupportedOperationException.class); + + assertThatThrownBy( + () -> + runtimeContext.getReducingState( + new org.apache.flink.api.common.state.v2 + .ReducingStateDescriptor<>( + "foobar", + new ReduceFunction<Integer>() { + private static final long serialVersionUID = + 2136425961884441050L; + + @Override + public Integer reduce( + Integer value1, Integer value2) { + return value1; + } + }, + Integer.class))) + .isInstanceOf(UnsupportedOperationException.class); + + assertThatThrownBy( + () -> + runtimeContext.getAggregatingState( + new org.apache.flink.api.common.state.v2 + .AggregatingStateDescriptor<>( + "foobar", + new AggregateFunction<Integer, Integer, Integer>() { + + @Override + public Integer createAccumulator() { + return null; + } + + @Override + public Integer add( + Integer value, Integer accumulator) { + return null; + } + + @Override + public Integer getResult(Integer accumulator) { + return null; + } + + @Override + public Integer merge(Integer a, Integer b) { + return null; + } + }, + Integer.class))) + .isInstanceOf(UnsupportedOperationException.class); + + assertThatThrownBy( + () -> + runtimeContext.getMapState( + new org.apache.flink.api.common.state.v2 + .MapStateDescriptor<>( + "foobar", Integer.class, String.class))) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy( () -> runtimeContext.addAccumulator( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java index 50a7490da2a..f852e167d62 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java @@ -21,7 +21,6 @@ package org.apache.flink.table.runtime.operators.aggregate.async; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.state.v2.ValueStateDescriptor; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; @@ -86,7 +85,7 @@ public class AsyncStateGroupAggFunction extends GroupAggFunctionBase { accDesc.enableTimeToLive(ttlConfig); } - accState = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(accDesc); + accState = getRuntimeContext().getState(accDesc); aggHelper = new AsyncStateGroupAggHelper(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java index 1b203cc4df7..ce1b661cf76 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase; import org.slf4j.Logger; @@ -68,6 +67,6 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT> if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } - state = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(stateDesc); + state = getRuntimeContext().getState(stateDesc); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java index 12d2b9c7a56..d79978fa06d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -78,8 +77,7 @@ public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunctio if (ttlConfig.isEnabled()) { rankStateDesc.enableTimeToLive(ttlConfig); } - rankEndState = - ((StreamingRuntimeContext) getRuntimeContext()).getValueState(rankStateDesc); + rankEndState = getRuntimeContext().getState(rankStateDesc); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java index 962298bb989..c67615c7e23 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.ListTypeInfo; import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -97,7 +96,7 @@ public class AsyncStateAppendOnlyTopNFunction extends AbstractAsyncStateTopNFunc if (ttlConfig.isEnabled()) { mapStateDescriptor.enableTimeToLive(ttlConfig); } - dataState = ((StreamingRuntimeContext) getRuntimeContext()).getMapState(mapStateDescriptor); + dataState = getRuntimeContext().getMapState(mapStateDescriptor); helper = new AsyncStateAppendOnlyTopNHelper(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java index 2704d5547fe..7b22e926e86 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; @@ -98,8 +97,7 @@ public class AsyncStateFastTop1Function extends AbstractAsyncStateTopNFunction if (ttlConfig.isEnabled()) { valueStateDescriptor.enableTimeToLive(ttlConfig); } - dataState = - ((StreamingRuntimeContext) getRuntimeContext()).getValueState(valueStateDescriptor); + dataState = getRuntimeContext().getState(valueStateDescriptor); helper = new AsyncStateFastTop1Helper();
