This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 581a4f761d231166f1471fcd1f18ae55df680983
Author: JunRuiLee <[email protected]>
AuthorDate: Fri Aug 30 11:42:26 2024 +0800

    [FLINK-36185][state-processor] Remove deprecated internal class 
BoundedOneInputStreamTaskRunner.
---
 .../output/BoundedOneInputStreamTaskRunner.java    | 96 ----------------------
 1 file changed, 96 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
deleted file mode 100644
index cefe67f7396..00000000000
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedOneInputStreamTaskRunner.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.api.output;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.state.api.functions.Timestamper;
-import org.apache.flink.state.api.runtime.SavepointEnvironment;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.Collector;
-
-/**
- * A {@link RichMapPartitionFunction} that serves as the runtime for a {@link 
BoundedStreamTask}.
- *
- * <p>The task is executed processing the data in a particular partition 
instead of the pulling from
- * the network stack. After all data has been processed the runner will output 
the {@link
- * OperatorSubtaskState} from the snapshot of the bounded task.
- *
- * @param <IN> Type of the input to the partition
- */
-@Deprecated
-@Internal
-public class BoundedOneInputStreamTaskRunner<IN>
-        extends RichMapPartitionFunction<IN, TaggedOperatorSubtaskState> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final StreamConfig streamConfig;
-
-    private final int maxParallelism;
-
-    private final Timestamper<IN> timestamper;
-
-    private transient SavepointEnvironment env;
-
-    /**
-     * Create a new {@link BoundedOneInputStreamTaskRunner}.
-     *
-     * @param streamConfig The internal configuration for the task.
-     * @param maxParallelism The max parallelism of the operator.
-     */
-    public BoundedOneInputStreamTaskRunner(
-            StreamConfig streamConfig, int maxParallelism, Timestamper<IN> 
timestamper) {
-
-        this.streamConfig = streamConfig;
-        this.maxParallelism = maxParallelism;
-        this.timestamper = timestamper;
-    }
-
-    @Override
-    public void open(OpenContext openContext) throws Exception {
-        super.open(openContext);
-
-        env =
-                new SavepointEnvironment.Builder(
-                                getRuntimeContext(),
-                                getRuntimeContext().getExecutionConfig(),
-                                maxParallelism)
-                        .setConfiguration(streamConfig.getConfiguration())
-                        .build();
-    }
-
-    @Override
-    public void mapPartition(Iterable<IN> values, 
Collector<TaggedOperatorSubtaskState> out)
-            throws Exception {
-        BoundedStreamTask<
-                        IN,
-                        TaggedOperatorSubtaskState,
-                        ? extends OneInputStreamOperator<IN, 
TaggedOperatorSubtaskState>>
-                boundedStreamTask = new BoundedStreamTask<>(env, values, 
timestamper, out);
-        try {
-            boundedStreamTask.invoke();
-        } finally {
-            boundedStreamTask.cleanUp(null);
-        }
-    }
-}

Reply via email to