This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c999e7e9a5da54d403360d23e9a39694d32101c4 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 8 20:10:29 2025 +0800 [FLINK-37045][Runtime] Merge two versions of OperatorStateStore --- .../flink/api/common/state/OperatorStateStore.java | 84 +++++++++++++++ .../impl/context/AbstractPartitionedContext.java | 2 +- .../impl/context/DefaultPartitionedContext.java | 2 +- .../impl/context/DefaultStateManager.java | 2 +- .../DefaultTwoOutputPartitionedContext.java | 2 +- .../impl/operators/TwoOutputProcessOperator.java | 2 +- .../flink/runtime/state/OperatorStateBackend.java | 4 - .../flink/runtime/state/v2/OperatorStateStore.java | 119 --------------------- .../collect/utils/MockOperatorStateStore.java | 3 +- 9 files changed, 90 insertions(+), 130 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index cb005c64319..437fd28a1b4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import java.util.Set; @@ -99,6 +100,89 @@ public interface OperatorStateStore { */ <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception; + /** + * Creates (or restores) a {@link BroadcastState broadcast state}. This type of state can only + * be created to store the state of a {@code BroadcastStream}. Each state is registered under a + * unique name. The provided serializer is used to de/serialize the state in case of + * checkpointing (snapshot/restore). The returned broadcast state has {@code key-value} format. + * + * <p><b>CAUTION: the user has to guarantee that all task instances store the same elements in + * this type of state.</b> + * + * <p>Each operator instance individually maintains and stores elements in the broadcast state. + * The fact that the incoming stream is a broadcast one guarantees that all instances see all + * the elements. Upon recovery or re-scaling, the same state is given to each of the instances. + * To avoid hotspots, each task reads its previous partition, and if there are more tasks (scale + * up), then the new instances read from the old instances in a round robin fashion. This is why + * each instance has to guarantee that it stores the same elements as the rest. If not, upon + * recovery or rescaling you may have unpredictable redistribution of the partitions, thus + * unpredictable results. + * + * @param stateDescriptor The descriptor for this state, providing a name, a serializer for the + * keys and one for the values. + * @param <K> The type of the keys in the broadcast state. + * @param <V> The type of the values in the broadcast state. + * @return The Broadcast State + */ + @Experimental + <K, V> BroadcastState<K, V> getBroadcastState( + org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> stateDescriptor) + throws Exception; + + /** + * Creates (or restores) a list state. Each state is registered under a unique name. The + * provided serializer is used to de/serialize the state in case of checkpointing + * (snapshot/restore). + * + * <p>Note the semantic differences between an operator list state and a keyed list state (see + * {@link + * KeyedStateStore#getListState(org.apache.flink.api.common.state.v2.ListStateDescriptor)}). + * Under the context of operator state, the list is a collection of state items that are + * independent of each other and eligible for redistribution across operator instances in case + * of changed operator parallelism. In other words, these state items are the finest granularity + * at which non-keyed state can be redistributed, and should not be correlated with each other. + * + * <p>The redistribution scheme of this list state upon operator rescaling is a round-robin + * pattern, such that the logical whole state (a concatenation of all the lists of state + * elements previously managed by each operator before the restore) is evenly divided into as + * many sublists as there are parallel operators. + * + * @param stateDescriptor The descriptor for this state, providing a name and serializer. + * @param <S> The generic type of the state + * @return A list for all state partitions. + */ + @Experimental + <S> org.apache.flink.api.common.state.v2.ListState<S> getListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) + throws Exception; + + /** + * Creates (or restores) a list state. Each state is registered under a unique name. The + * provided serializer is used to de/serialize the state in case of checkpointing + * (snapshot/restore). + * + * <p>Note the semantic differences between an operator list state and a keyed list state (see + * {@link + * KeyedStateStore#getListState(org.apache.flink.api.common.state.v2.ListStateDescriptor)}). + * Under the context of operator state, the list is a collection of state items that are + * independent of each other and eligible for redistribution across operator instances in case + * of changed operator parallelism. In other words, these state items are the finest granularity + * at which non-keyed state can be redistributed, and should not be correlated with each other. + * + * <p>The redistribution scheme of this list state upon operator rescaling is a broadcast + * pattern, such that the logical whole state (a concatenation of all the lists of state + * elements previously managed by each operator before the restore) is restored to all parallel + * operators so that each of them will get the union of all state items before the restore. + * + * @param stateDescriptor The descriptor for this state, providing a name and serializer. + * @param <S> The generic type of the state + * @return A list for all state partitions. + */ + @Experimental + <S> org.apache.flink.api.common.state.v2.ListState<S> getUnionListState( + org.apache.flink.api.common.state.v2.ListStateDescriptor<S> stateDescriptor) + throws Exception; + /** * Returns a set with the names of all currently registered states. * diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java index 1c2bfa29efb..b9776a719e8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AbstractPartitionedContext.java @@ -18,13 +18,13 @@ package org.apache.flink.datastream.impl.context; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.datastream.api.context.BasePartitionedContext; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.function.BiConsumer; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java index 589d289e02e..52b3e08b9a8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java @@ -18,11 +18,11 @@ package org.apache.flink.datastream.impl.context; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; -import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.function.BiConsumer; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java index 39018a4b5ff..71ef3855369 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.BroadcastStateDeclaration; import org.apache.flink.api.common.state.ListStateDeclaration; import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.state.ReducingStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.ValueStateDeclaration; @@ -38,7 +39,6 @@ import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.datastream.api.context.StateManager; -import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.Preconditions; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java index 557ce04f8e0..591b6ae4084 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputPartitionedContext.java @@ -18,11 +18,11 @@ package org.apache.flink.datastream.impl.context; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; -import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.function.BiConsumer; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index de189c83b1a..aefbc55a1f6 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.datastream.impl.operators; import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.watermark.WatermarkHandlingResult; import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; import org.apache.flink.datastream.api.context.ProcessingTimeManager; @@ -32,7 +33,6 @@ import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedConte import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; -import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java index eb2433e384a..b2e7f76a005 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java @@ -26,13 +26,9 @@ import java.io.Closeable; /** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system * interface {@link Snapshotable}. - * - * <p>This also combines the state v1's and v2's {@link OperatorStateStore}, but there is no good - * reason behind this. We will split these when there's different implementation for state v2. */ public interface OperatorStateBackend extends OperatorStateStore, - org.apache.flink.runtime.state.v2.OperatorStateStore, Snapshotable<SnapshotResult<OperatorStateHandle>>, Closeable, Disposable { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java deleted file mode 100644 index 50bd0c0a8d0..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/OperatorStateStore.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.state.BroadcastState; -import org.apache.flink.api.common.state.v2.ListState; -import org.apache.flink.api.common.state.v2.ListStateDescriptor; -import org.apache.flink.api.common.state.v2.MapStateDescriptor; - -import java.util.Set; - -/** This interface contains methods for registering operator state with a managed store. */ -@Internal -public interface OperatorStateStore { - - /** - * Creates (or restores) a {@link BroadcastState broadcast state}. This type of state can only - * be created to store the state of a {@code BroadcastStream}. Each state is registered under a - * unique name. The provided serializer is used to de/serialize the state in case of - * checkpointing (snapshot/restore). The returned broadcast state has {@code key-value} format. - * - * <p><b>CAUTION: the user has to guarantee that all task instances store the same elements in - * this type of state.</b> - * - * <p>Each operator instance individually maintains and stores elements in the broadcast state. - * The fact that the incoming stream is a broadcast one guarantees that all instances see all - * the elements. Upon recovery or re-scaling, the same state is given to each of the instances. - * To avoid hotspots, each task reads its previous partition, and if there are more tasks (scale - * up), then the new instances read from the old instances in a round robin fashion. This is why - * each instance has to guarantee that it stores the same elements as the rest. If not, upon - * recovery or rescaling you may have unpredictable redistribution of the partitions, thus - * unpredictable results. - * - * @param stateDescriptor The descriptor for this state, providing a name, a serializer for the - * keys and one for the values. - * @param <K> The type of the keys in the broadcast state. - * @param <V> The type of the values in the broadcast state. - * @return The Broadcast State - */ - <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) - throws Exception; - - /** - * Creates (or restores) a list state. Each state is registered under a unique name. The - * provided serializer is used to de/serialize the state in case of checkpointing - * (snapshot/restore). - * - * <p>Note the semantic differences between an operator list state and a keyed list state (see - * {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator - * state, the list is a collection of state items that are independent of each other and - * eligible for redistribution across operator instances in case of changed operator - * parallelism. In other words, these state items are the finest granularity at which non-keyed - * state can be redistributed, and should not be correlated with each other. - * - * <p>The redistribution scheme of this list state upon operator rescaling is a round-robin - * pattern, such that the logical whole state (a concatenation of all the lists of state - * elements previously managed by each operator before the restore) is evenly divided into as - * many sublists as there are parallel operators. - * - * @param stateDescriptor The descriptor for this state, providing a name and serializer. - * @param <S> The generic type of the state - * @return A list for all state partitions. - */ - <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception; - - /** - * Creates (or restores) a list state. Each state is registered under a unique name. The - * provided serializer is used to de/serialize the state in case of checkpointing - * (snapshot/restore). - * - * <p>Note the semantic differences between an operator list state and a keyed list state (see - * {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator - * state, the list is a collection of state items that are independent of each other and - * eligible for redistribution across operator instances in case of changed operator - * parallelism. In other words, these state items are the finest granularity at which non-keyed - * state can be redistributed, and should not be correlated with each other. - * - * <p>The redistribution scheme of this list state upon operator rescaling is a broadcast - * pattern, such that the logical whole state (a concatenation of all the lists of state - * elements previously managed by each operator before the restore) is restored to all parallel - * operators so that each of them will get the union of all state items before the restore. - * - * @param stateDescriptor The descriptor for this state, providing a name and serializer. - * @param <S> The generic type of the state - * @return A list for all state partitions. - */ - <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception; - - /** - * Returns a set with the names of all currently registered states. - * - * @return set of names for all registered states. - */ - Set<String> getRegisteredStateNames(); - - /** - * Returns a set with the names of all currently registered broadcast states. - * - * @return set of names for all registered broadcast states. - */ - Set<String> getRegisteredBroadcastStateNames(); -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java index 4c6a2e82968..87e9b4312ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java @@ -33,8 +33,7 @@ import java.util.Set; /** An {@link OperatorStateStore} for testing purpose. */ @SuppressWarnings("rawtypes") -public class MockOperatorStateStore - implements OperatorStateStore, org.apache.flink.runtime.state.v2.OperatorStateStore { +public class MockOperatorStateStore implements OperatorStateStore { private final Map<Long, Map<String, TestUtils.MockListState>> historyStateMap;