This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c999e7e9a5da54d403360d23e9a39694d32101c4
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 8 20:10:29 2025 +0800

    [FLINK-37045][Runtime] Merge two versions of OperatorStateStore
---
 .../flink/api/common/state/OperatorStateStore.java |  84 +++++++++++++++
 .../impl/context/AbstractPartitionedContext.java   |   2 +-
 .../impl/context/DefaultPartitionedContext.java    |   2 +-
 .../impl/context/DefaultStateManager.java          |   2 +-
 .../DefaultTwoOutputPartitionedContext.java        |   2 +-
 .../impl/operators/TwoOutputProcessOperator.java   |   2 +-
 .../flink/runtime/state/OperatorStateBackend.java  |   4 -
 .../flink/runtime/state/v2/OperatorStateStore.java | 119 ---------------------
 .../collect/utils/MockOperatorStateStore.java      |   3 +-
 9 files changed, 90 insertions(+), 130 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index cb005c64319..437fd28a1b4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.Set;
@@ -99,6 +100,89 @@ public interface OperatorStateStore {
      */
     <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) 
throws Exception;
 
+    /**
+     * Creates (or restores) a {@link BroadcastState broadcast state}. This 
type of state can only
+     * be created to store the state of a {@code BroadcastStream}. Each state 
is registered under a
+     * unique name. The provided serializer is used to de/serialize the state 
in case of
+     * checkpointing (snapshot/restore). The returned broadcast state has 
{@code key-value} format.
+     *
+     * <p><b>CAUTION: the user has to guarantee that all task instances store 
the same elements in
+     * this type of state.</b>
+     *
+     * <p>Each operator instance individually maintains and stores elements in 
the broadcast state.
+     * The fact that the incoming stream is a broadcast one guarantees that 
all instances see all
+     * the elements. Upon recovery or re-scaling, the same state is given to 
each of the instances.
+     * To avoid hotspots, each task reads its previous partition, and if there 
are more tasks (scale
+     * up), then the new instances read from the old instances in a round 
robin fashion. This is why
+     * each instance has to guarantee that it stores the same elements as the 
rest. If not, upon
+     * recovery or rescaling you may have unpredictable redistribution of the 
partitions, thus
+     * unpredictable results.
+     *
+     * @param stateDescriptor The descriptor for this state, providing a name, 
a serializer for the
+     *     keys and one for the values.
+     * @param <K> The type of the keys in the broadcast state.
+     * @param <V> The type of the values in the broadcast state.
+     * @return The Broadcast State
+     */
+    @Experimental
+    <K, V> BroadcastState<K, V> getBroadcastState(
+            org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> 
stateDescriptor)
+            throws Exception;
+
+    /**
+     * Creates (or restores) a list state. Each state is registered under a 
unique name. The
+     * provided serializer is used to de/serialize the state in case of 
checkpointing
+     * (snapshot/restore).
+     *
+     * <p>Note the semantic differences between an operator list state and a 
keyed list state (see
+     * {@link
+     * 
KeyedStateStore#getListState(org.apache.flink.api.common.state.v2.ListStateDescriptor)}).
+     * Under the context of operator state, the list is a collection of state 
items that are
+     * independent of each other and eligible for redistribution across 
operator instances in case
+     * of changed operator parallelism. In other words, these state items are 
the finest granularity
+     * at which non-keyed state can be redistributed, and should not be 
correlated with each other.
+     *
+     * <p>The redistribution scheme of this list state upon operator rescaling 
is a round-robin
+     * pattern, such that the logical whole state (a concatenation of all the 
lists of state
+     * elements previously managed by each operator before the restore) is 
evenly divided into as
+     * many sublists as there are parallel operators.
+     *
+     * @param stateDescriptor The descriptor for this state, providing a name 
and serializer.
+     * @param <S> The generic type of the state
+     * @return A list for all state partitions.
+     */
+    @Experimental
+    <S> org.apache.flink.api.common.state.v2.ListState<S> getListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            throws Exception;
+
+    /**
+     * Creates (or restores) a list state. Each state is registered under a 
unique name. The
+     * provided serializer is used to de/serialize the state in case of 
checkpointing
+     * (snapshot/restore).
+     *
+     * <p>Note the semantic differences between an operator list state and a 
keyed list state (see
+     * {@link
+     * 
KeyedStateStore#getListState(org.apache.flink.api.common.state.v2.ListStateDescriptor)}).
+     * Under the context of operator state, the list is a collection of state 
items that are
+     * independent of each other and eligible for redistribution across 
operator instances in case
+     * of changed operator parallelism. In other words, these state items are 
the finest granularity
+     * at which non-keyed state can be redistributed, and should not be 
correlated with each other.
+     *
+     * <p>The redistribution scheme of this list state upon operator rescaling 
is a broadcast
+     * pattern, such that the logical whole state (a concatenation of all the 
lists of state
+     * elements previously managed by each operator before the restore) is 
restored to all parallel
+     * operators so that each of them will get the union of all state items 
before the restore.
+     *
+     * @param stateDescriptor The descriptor for this state, providing a name 
and serializer.
+     * @param <S> The generic type of the state
+     * @return A list for all state partitions.
+     */
+    @Experimental
+    <S> org.apache.flink.api.common.state.v2.ListState<S> getUnionListState(
+            org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
stateDescriptor)
+            throws Exception;
+
     /**
      * Returns a set with the names of all currently registered states.
      *
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java
index 1c2bfa29efb..b9776a719e8 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.datastream.api.context.BasePartitionedContext;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.v2.OperatorStateStore;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.function.BiConsumer;
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index 589d289e02e..52b3e08b9a8 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.PartitionedContext;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
-import org.apache.flink.runtime.state.v2.OperatorStateStore;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.function.BiConsumer;
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
index 39018a4b5ff..71ef3855369 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.BroadcastStateDeclaration;
 import org.apache.flink.api.common.state.ListStateDeclaration;
 import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.state.ReducingStateDeclaration;
 import org.apache.flink.api.common.state.StateDeclaration;
 import org.apache.flink.api.common.state.ValueStateDeclaration;
@@ -38,7 +39,6 @@ import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.datastream.api.context.StateManager;
-import org.apache.flink.runtime.state.v2.OperatorStateStore;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java
index 557ce04f8e0..591b6ae4084 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
 import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
-import org.apache.flink.runtime.state.v2.OperatorStateStore;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.function.BiConsumer;
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index de189c83b1a..aefbc55a1f6 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.datastream.impl.operators;
 
 import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
 import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
@@ -32,7 +33,6 @@ import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedConte
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
-import org.apache.flink.runtime.state.v2.OperatorStateStore;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index eb2433e384a..b2e7f76a005 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -26,13 +26,9 @@ import java.io.Closeable;
 /**
  * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system
  * interface {@link Snapshotable}.
- *
- * <p>This also combines the state v1's and v2's {@link OperatorStateStore}, 
but there is no good
- * reason behind this. We will split these when there's different 
implementation for state v2.
  */
 public interface OperatorStateBackend
         extends OperatorStateStore,
-                org.apache.flink.runtime.state.v2.OperatorStateStore,
                 Snapshotable<SnapshotResult<OperatorStateHandle>>,
                 Closeable,
                 Disposable {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
deleted file mode 100644
index 50bd0c0a8d0..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.v2;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.v2.ListState;
-import org.apache.flink.api.common.state.v2.ListStateDescriptor;
-import org.apache.flink.api.common.state.v2.MapStateDescriptor;
-
-import java.util.Set;
-
-/** This interface contains methods for registering operator state with a 
managed store. */
-@Internal
-public interface OperatorStateStore {
-
-    /**
-     * Creates (or restores) a {@link BroadcastState broadcast state}. This 
type of state can only
-     * be created to store the state of a {@code BroadcastStream}. Each state 
is registered under a
-     * unique name. The provided serializer is used to de/serialize the state 
in case of
-     * checkpointing (snapshot/restore). The returned broadcast state has 
{@code key-value} format.
-     *
-     * <p><b>CAUTION: the user has to guarantee that all task instances store 
the same elements in
-     * this type of state.</b>
-     *
-     * <p>Each operator instance individually maintains and stores elements in 
the broadcast state.
-     * The fact that the incoming stream is a broadcast one guarantees that 
all instances see all
-     * the elements. Upon recovery or re-scaling, the same state is given to 
each of the instances.
-     * To avoid hotspots, each task reads its previous partition, and if there 
are more tasks (scale
-     * up), then the new instances read from the old instances in a round 
robin fashion. This is why
-     * each instance has to guarantee that it stores the same elements as the 
rest. If not, upon
-     * recovery or rescaling you may have unpredictable redistribution of the 
partitions, thus
-     * unpredictable results.
-     *
-     * @param stateDescriptor The descriptor for this state, providing a name, 
a serializer for the
-     *     keys and one for the values.
-     * @param <K> The type of the keys in the broadcast state.
-     * @param <V> The type of the values in the broadcast state.
-     * @return The Broadcast State
-     */
-    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> 
stateDescriptor)
-            throws Exception;
-
-    /**
-     * Creates (or restores) a list state. Each state is registered under a 
unique name. The
-     * provided serializer is used to de/serialize the state in case of 
checkpointing
-     * (snapshot/restore).
-     *
-     * <p>Note the semantic differences between an operator list state and a 
keyed list state (see
-     * {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the 
context of operator
-     * state, the list is a collection of state items that are independent of 
each other and
-     * eligible for redistribution across operator instances in case of 
changed operator
-     * parallelism. In other words, these state items are the finest 
granularity at which non-keyed
-     * state can be redistributed, and should not be correlated with each 
other.
-     *
-     * <p>The redistribution scheme of this list state upon operator rescaling 
is a round-robin
-     * pattern, such that the logical whole state (a concatenation of all the 
lists of state
-     * elements previously managed by each operator before the restore) is 
evenly divided into as
-     * many sublists as there are parallel operators.
-     *
-     * @param stateDescriptor The descriptor for this state, providing a name 
and serializer.
-     * @param <S> The generic type of the state
-     * @return A list for all state partitions.
-     */
-    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) 
throws Exception;
-
-    /**
-     * Creates (or restores) a list state. Each state is registered under a 
unique name. The
-     * provided serializer is used to de/serialize the state in case of 
checkpointing
-     * (snapshot/restore).
-     *
-     * <p>Note the semantic differences between an operator list state and a 
keyed list state (see
-     * {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the 
context of operator
-     * state, the list is a collection of state items that are independent of 
each other and
-     * eligible for redistribution across operator instances in case of 
changed operator
-     * parallelism. In other words, these state items are the finest 
granularity at which non-keyed
-     * state can be redistributed, and should not be correlated with each 
other.
-     *
-     * <p>The redistribution scheme of this list state upon operator rescaling 
is a broadcast
-     * pattern, such that the logical whole state (a concatenation of all the 
lists of state
-     * elements previously managed by each operator before the restore) is 
restored to all parallel
-     * operators so that each of them will get the union of all state items 
before the restore.
-     *
-     * @param stateDescriptor The descriptor for this state, providing a name 
and serializer.
-     * @param <S> The generic type of the state
-     * @return A list for all state partitions.
-     */
-    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) 
throws Exception;
-
-    /**
-     * Returns a set with the names of all currently registered states.
-     *
-     * @return set of names for all registered states.
-     */
-    Set<String> getRegisteredStateNames();
-
-    /**
-     * Returns a set with the names of all currently registered broadcast 
states.
-     *
-     * @return set of names for all registered broadcast states.
-     */
-    Set<String> getRegisteredBroadcastStateNames();
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
index 4c6a2e82968..87e9b4312ea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java
@@ -33,8 +33,7 @@ import java.util.Set;
 
 /** An {@link OperatorStateStore} for testing purpose. */
 @SuppressWarnings("rawtypes")
-public class MockOperatorStateStore
-        implements OperatorStateStore, 
org.apache.flink.runtime.state.v2.OperatorStateStore {
+public class MockOperatorStateStore implements OperatorStateStore {
 
     private final Map<Long, Map<String, TestUtils.MockListState>> 
historyStateMap;
 

Reply via email to