This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7964466016da [FLINK-37045] Remove useless StateDescriptor of state v2 (#26009) 7964466016da is described below commit 7964466016daea1f53bf14821c85e0751e259227 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Mon Jan 20 09:27:07 2025 +0800 [FLINK-37045] Remove useless StateDescriptor of state v2 (#26009) --- .../state/api/input/KeyedStateInputFormatTest.java | 4 +- .../operators/AsyncIntervalJoinOperator.java | 2 +- .../state/v2/AggregatingStateDescriptor.java | 48 ---------------------- .../runtime/state/v2/ListStateDescriptor.java | 41 ------------------ .../flink/runtime/state/v2/MapStateDescriptor.java | 47 --------------------- .../runtime/state/v2/ReducingStateDescriptor.java | 48 ---------------------- .../runtime/state/v2/ValueStateDescriptor.java | 41 ------------------ .../async/AsyncStateGroupAggFunction.java | 2 +- .../AbstractAsyncStateWindowAggProcessor.java | 2 +- .../AsyncStateDeduplicateFunctionBase.java | 2 +- .../rank/async/AbstractAsyncStateTopNFunction.java | 2 +- .../rank/async/AsyncStateFastTop1Function.java | 2 +- .../tvf/common/AsyncStateWindowAggOperator.java | 2 +- 13 files changed, 9 insertions(+), 234 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java index 2e6faa0f4807..c2cb6aa597e8 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java @@ -333,12 +333,12 @@ class KeyedStateInputFormatTest { static class AsyncStatefulFunction extends RichFlatMapFunction<Integer, Void> { org.apache.flink.api.common.state.v2.ValueState<Integer> state; - org.apache.flink.runtime.state.v2.ValueStateDescriptor<Integer> asyncStateDescriptor; + org.apache.flink.api.common.state.v2.ValueStateDescriptor<Integer> asyncStateDescriptor; @Override public void open(OpenContext openContext) { asyncStateDescriptor = - new org.apache.flink.runtime.state.v2.ValueStateDescriptor<>( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( "state", Types.INT); state = ((StreamingRuntimeContext) getRuntimeContext()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java index 7d2c80bc5abb..1480fc0e4cdd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java @@ -21,13 +21,13 @@ package org.apache.flink.runtime.asyncprocessing.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable; -import org.apache.flink.runtime.state.v2.MapStateDescriptor; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java deleted file mode 100644 index 078b189fbe12..000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import javax.annotation.Nonnull; - -/** - * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never - * released before, and will be safely removed before 2.0 release. - */ -@Deprecated -public class AggregatingStateDescriptor<IN, ACC, OUT> - extends org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT> { - - public AggregatingStateDescriptor( - @Nonnull String stateId, - @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, - @Nonnull TypeInformation<ACC> typeInfo) { - super(stateId, aggregateFunction, typeInfo); - } - - public AggregatingStateDescriptor( - @Nonnull String stateId, - @Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, - @Nonnull TypeSerializer<ACC> serializer) { - super(stateId, aggregateFunction, serializer); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java deleted file mode 100644 index 6e22610602c5..000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ListStateDescriptor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import javax.annotation.Nonnull; - -/** - * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never - * released before, and will be safely removed before 2.0 release. - */ -@Deprecated -public class ListStateDescriptor<T> - extends org.apache.flink.api.common.state.v2.ListStateDescriptor<T> { - - public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) { - super(stateId, typeInfo); - } - - public ListStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) { - super(stateId, serializer); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java deleted file mode 100644 index 37573e25e645..000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import javax.annotation.Nonnull; - -/** - * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never - * released before, and will be safely removed before 2.0 release. - */ -@Deprecated -public class MapStateDescriptor<UK, UV> - extends org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> { - - public MapStateDescriptor( - @Nonnull String stateId, - @Nonnull TypeInformation<UK> userKeyTypeInfo, - @Nonnull TypeInformation<UV> userValueTypeInfo) { - super(stateId, userKeyTypeInfo, userValueTypeInfo); - } - - public MapStateDescriptor( - @Nonnull String stateId, - @Nonnull TypeSerializer<UK> userKeySerializer, - @Nonnull TypeSerializer<UV> userValueSerializer) { - super(stateId, userKeySerializer, userValueSerializer); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java deleted file mode 100644 index 053b9704eaaa..000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ReducingStateDescriptor.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import javax.annotation.Nonnull; - -/** - * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never - * released before, and will be safely removed before 2.0 release. - */ -@Deprecated -public class ReducingStateDescriptor<T> - extends org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> { - - public ReducingStateDescriptor( - @Nonnull String name, - @Nonnull ReduceFunction<T> reduceFunction, - @Nonnull TypeInformation<T> typeInfo) { - super(name, reduceFunction, typeInfo); - } - - public ReducingStateDescriptor( - @Nonnull String stateId, - @Nonnull ReduceFunction<T> reduceFunction, - @Nonnull TypeSerializer<T> serializer) { - super(stateId, reduceFunction, serializer); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java deleted file mode 100644 index 70e45ee9c3ad..000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ValueStateDescriptor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.v2; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import javax.annotation.Nonnull; - -/** - * This class is deprecated and only a placeholder for compatibility for EXISTING PRs. This is never - * released before, and will be safely removed before 2.0 release. - */ -@Deprecated -public class ValueStateDescriptor<T> - extends org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> { - - public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeInformation<T> typeInfo) { - super(stateId, typeInfo); - } - - public ValueStateDescriptor(@Nonnull String stateId, @Nonnull TypeSerializer<T> serializer) { - super(stateId, serializer); - } -} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java index 506c36cc8991..9a58cba1c67a 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java @@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.operators.aggregate.async; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.v2.ValueState; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateWindowAggProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateWindowAggProcessor.java index a5f9982275e4..81a73b2a02c4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateWindowAggProcessor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateWindowAggProcessor.java @@ -19,8 +19,8 @@ package org.apache.flink.table.runtime.operators.aggregate.asyncwindow.processors; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.runtime.state.v2.internal.InternalValueState; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.dataview.UnsupportedStateDataViewStore; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java index f3af0b883d48..a4c4e0edeadb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java @@ -21,9 +21,9 @@ package org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java index 3fb63d8a282f..5c982ed161ab 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.core.state.StateFutureUtils; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java index 9ff51aafa358..2704d5547fe0 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateFastTop1Function.java @@ -23,11 +23,11 @@ import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java index 30a84a757338..d629618638cf 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.metrics.Counter; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext;