This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 581a4f761d231166f1471fcd1f18ae55df680983 Author: JunRuiLee <[email protected]> AuthorDate: Fri Aug 30 11:42:26 2024 +0800 [FLINK-36185][state-processor] Remove deprecated internal class BoundedOneInputStreamTaskRunner. --- .../output/BoundedOneInputStreamTaskRunner.java | 96 ---------------------- 1 file changed, 96 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java deleted file mode 100644 index cefe67f7396..00000000000 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.api.output; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.state.api.functions.Timestamper; -import org.apache.flink.state.api.runtime.SavepointEnvironment; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.util.Collector; - -/** - * A {@link RichMapPartitionFunction} that serves as the runtime for a {@link BoundedStreamTask}. - * - * <p>The task is executed processing the data in a particular partition instead of the pulling from - * the network stack. After all data has been processed the runner will output the {@link - * OperatorSubtaskState} from the snapshot of the bounded task. - * - * @param <IN> Type of the input to the partition - */ -@Deprecated -@Internal -public class BoundedOneInputStreamTaskRunner<IN> - extends RichMapPartitionFunction<IN, TaggedOperatorSubtaskState> { - - private static final long serialVersionUID = 1L; - - private final StreamConfig streamConfig; - - private final int maxParallelism; - - private final Timestamper<IN> timestamper; - - private transient SavepointEnvironment env; - - /** - * Create a new {@link BoundedOneInputStreamTaskRunner}. - * - * @param streamConfig The internal configuration for the task. - * @param maxParallelism The max parallelism of the operator. - */ - public BoundedOneInputStreamTaskRunner( - StreamConfig streamConfig, int maxParallelism, Timestamper<IN> timestamper) { - - this.streamConfig = streamConfig; - this.maxParallelism = maxParallelism; - this.timestamper = timestamper; - } - - @Override - public void open(OpenContext openContext) throws Exception { - super.open(openContext); - - env = - new SavepointEnvironment.Builder( - getRuntimeContext(), - getRuntimeContext().getExecutionConfig(), - maxParallelism) - .setConfiguration(streamConfig.getConfiguration()) - .build(); - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<TaggedOperatorSubtaskState> out) - throws Exception { - BoundedStreamTask< - IN, - TaggedOperatorSubtaskState, - ? extends OneInputStreamOperator<IN, TaggedOperatorSubtaskState>> - boundedStreamTask = new BoundedStreamTask<>(env, values, timestamper, out); - try { - boundedStreamTask.invoke(); - } finally { - boundedStreamTask.cleanUp(null); - } - } -}
