This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8931ce4494ee730126cb856c8a3a8c59a1d5dcf6
Author: codenohup <[email protected]>
AuthorDate: Tue Oct 1 19:46:11 2024 +0800

    [FLINK-19153] Remove deprecated ExecutionMode in flink-core module
---
 .../apache/flink/api/common/ExecutionConfig.java   |  60 ----------
 .../org/apache/flink/api/common/ExecutionMode.java |  92 ---------------
 .../docs/reference/pyflink.common/config.rst       |  11 --
 flink-python/pyflink/common/__init__.py            |   2 -
 flink-python/pyflink/common/execution_config.py    |  39 -------
 flink-python/pyflink/common/execution_mode.py      |  82 -------------
 .../pyflink/common/tests/test_execution_config.py  |  20 +---
 .../flink/runtime/io/network/DataExchangeMode.java | 127 ---------------------
 .../runtime/io/network/DataExchangeModeTest.java   |  50 --------
 .../scheduler/benchmark/JobConfiguration.java      |  48 ++++----
 .../benchmark/SchedulerBenchmarkUtils.java         |   4 +-
 .../streaming/api/datastream/DataStreamUtils.java  |  38 ------
 .../recovery/BatchFineGrainedRecoveryITCase.java   |   5 +-
 13 files changed, 27 insertions(+), 551 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index fc8c3480c24..9dd729cae5b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -24,7 +24,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializerConfig;
 import org.apache.flink.api.common.serialization.SerializerConfigImpl;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DescribedEnum;
@@ -48,7 +47,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -61,8 +59,6 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *       functions that do not define a specific value directly.
  *   <li>The number of retries in the case of failed executions.
  *   <li>The delay between execution retries.
- *   <li>The {@link ExecutionMode} of the program: Batch or Pipelined. The 
default execution mode is
- *       {@link ExecutionMode#PIPELINED}
  *   <li>Enabling or disabling the "closure cleaner". The closure cleaner 
pre-processes the
  *       implementations of functions. In case they are (anonymous) inner 
classes, it removes unused
  *       references to the enclosing class to fix certain 
serialization-related problems and to
@@ -104,22 +100,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
      */
     public static final int PARALLELISM_UNKNOWN = -2;
 
-    /**
-     * Internal {@link ConfigOption}s, that are not exposed and it's not 
possible to configure them
-     * via config files. We are defining them here, so that we can store them 
in the {@link
-     * #configuration}.
-     *
-     * <p>If you decide to expose any of those {@link ConfigOption}s, please 
double-check if the
-     * key, type and descriptions are sensible, as the initial values are 
arbitrary.
-     */
-    // 
--------------------------------------------------------------------------------------------
-
-    private static final ConfigOption<ExecutionMode> EXECUTION_MODE =
-            key("hidden.execution.mode")
-                    .enumType(ExecutionMode.class)
-                    .defaultValue(ExecutionMode.PIPELINED)
-                    .withDescription("Defines how data exchange happens - 
batch or pipelined");
-
     // 
--------------------------------------------------------------------------------------------
 
     /**
@@ -408,46 +388,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         return configuration.getOptional(JobManagerOptions.SCHEDULER);
     }
 
-    /**
-     * Sets the execution mode to execute the program. The execution mode 
defines whether data
-     * exchanges are performed in a batch or on a pipelined manner.
-     *
-     * <p>The default execution mode is {@link ExecutionMode#PIPELINED}.
-     *
-     * @param executionMode The execution mode to use.
-     * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
-     *     All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
-     *     Flink major version. You can still build your application in 
DataSet, but you should move
-     *     to either the DataStream and/or Table API.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741";>
-     *     FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
-     *     API</a>
-     */
-    @Deprecated
-    public void setExecutionMode(ExecutionMode executionMode) {
-        configuration.set(EXECUTION_MODE, executionMode);
-    }
-
-    /**
-     * Gets the execution mode used to execute the program. The execution mode 
defines whether data
-     * exchanges are performed in a batch or on a pipelined manner.
-     *
-     * <p>The default execution mode is {@link ExecutionMode#PIPELINED}.
-     *
-     * @return The execution mode for the program.
-     * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
-     *     All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
-     *     Flink major version. You can still build your application in 
DataSet, but you should move
-     *     to either the DataStream and/or Table API.
-     * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741";>
-     *     FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
-     *     API</a>
-     */
-    @Deprecated
-    public ExecutionMode getExecutionMode() {
-        return configuration.get(EXECUTION_MODE);
-    }
-
     /**
      * Enables the Flink runtime to auto-generate UID's for operators.
      *
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
deleted file mode 100644
index e60aad819aa..00000000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common;
-
-import org.apache.flink.annotation.Public;
-
-/**
- * The execution mode specifies how a batch program is executed in terms of 
data exchange:
- * pipelining or batched.
- *
- * @deprecated The {@link ExecutionMode} is deprecated because it's only used 
in DataSet APIs. All
- *     Flink DataSet APIs are deprecated since Flink 1.18 and will be removed 
in a future Flink
- *     major version. You can still build your application in DataSet, but you 
should move to either
- *     the DataStream and/or Table API.
- * @see <a 
href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741";>
- *     FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate 
the DataSet API</a>
- */
-@Deprecated
-@Public
-public enum ExecutionMode {
-
-    /**
-     * Executes the program in a pipelined fashion (including shuffles and 
broadcasts), except for
-     * data exchanges that are susceptible to deadlocks when pipelining. These 
data exchanges are
-     * performed in a batch manner.
-     *
-     * <p>An example of situations that are susceptible to deadlocks (when 
executed in a pipelined
-     * manner) are data flows that branch (one data set consumed by multiple 
operations) and re-join
-     * later:
-     *
-     * <pre>{@code
-     * DataSet data = ...;
-     * DataSet mapped1 = data.map(new MyMapper());
-     * DataSet mapped2 = data.map(new AnotherMapper());
-     * mapped1.join(mapped2).where(...).equalTo(...);
-     * }</pre>
-     */
-    PIPELINED,
-
-    /**
-     * Executes the program in a pipelined fashion (including shuffles and 
broadcasts),
-     * <strong>including</strong> data exchanges that are susceptible to 
deadlocks when executed via
-     * pipelining.
-     *
-     * <p>Usually, {@link #PIPELINED} is the preferable option, which 
pipelines most data exchanges
-     * and only uses batch data exchanges in situations that are susceptible 
to deadlocks.
-     *
-     * <p>This option should only be used with care and only in situations 
where the programmer is
-     * sure that the program is safe for full pipelining and that Flink was 
too conservative when
-     * choosing the batch exchange at a certain point.
-     */
-    PIPELINED_FORCED,
-
-    // This is for later, we are missing a bit of infrastructure for this.
-    // /**
-    //  * The execution mode starts executing the program in a pipelined 
fashion
-    //  * (except for deadlock prone situations), similar to the {@link 
#PIPELINED}
-    //  * option. In the case of a task failure, re-execution happens in a 
batched
-    //  * mode, as defined for the {@link #BATCH} option.
-    //  */
-    // PIPELINED_WITH_BATCH_FALLBACK,
-
-    /**
-     * This mode executes all shuffles and broadcasts in a batch fashion, 
while pipelining data
-     * between operations that exchange data only locally between one producer 
and one consumer.
-     */
-    BATCH,
-
-    /**
-     * This mode executes the program in a strict batch way, including all 
points where data is
-     * forwarded locally from one producer to one consumer. This mode is 
typically more expensive to
-     * execute than the {@link #BATCH} mode. It does guarantee that no 
successive operations are
-     * ever executed concurrently.
-     */
-    BATCH_FORCED
-}
diff --git a/flink-python/docs/reference/pyflink.common/config.rst 
b/flink-python/docs/reference/pyflink.common/config.rst
index fa9611e409a..144cfcd938c 100644
--- a/flink-python/docs/reference/pyflink.common/config.rst
+++ b/flink-python/docs/reference/pyflink.common/config.rst
@@ -51,17 +51,6 @@ ExecutionConfig
     ExecutionConfig
 
 
-ExecutionMode
--------------
-
-.. currentmodule:: pyflink.common.execution_mode
-
-.. autosummary::
-    :toctree: api/
-
-    ExecutionMode
-
-
 RestartStrategy
 ---------------
 
diff --git a/flink-python/pyflink/common/__init__.py 
b/flink-python/pyflink/common/__init__.py
index 19f76febc7d..82328522561 100644
--- a/flink-python/pyflink/common/__init__.py
+++ b/flink-python/pyflink/common/__init__.py
@@ -52,7 +52,6 @@ from pyflink.common.completable_future import 
CompletableFuture
 from pyflink.common.config_options import ConfigOption, ConfigOptions
 from pyflink.common.configuration import Configuration
 from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.execution_mode import ExecutionMode
 from pyflink.common.input_dependency_constraint import 
InputDependencyConstraint
 from pyflink.common.job_client import JobClient
 from pyflink.common.job_execution_result import JobExecutionResult
@@ -81,7 +80,6 @@ __all__ = [
     'SimpleStringSchema',
     'Encoder',
     'CompletableFuture',
-    'ExecutionMode',
     'InputDependencyConstraint',
     'JobClient',
     'JobExecutionResult',
diff --git a/flink-python/pyflink/common/execution_config.py 
b/flink-python/pyflink/common/execution_config.py
index db46d957551..efdf0c2de8d 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -18,7 +18,6 @@
 
 from typing import Dict, List
 
-from pyflink.common.execution_mode import ExecutionMode
 from pyflink.java_gateway import get_gateway
 
 __all__ = ['ExecutionConfig']
@@ -36,9 +35,6 @@ class ExecutionConfig(object):
 
     - The delay between execution retries.
 
-    - The :class:`ExecutionMode` of the program: Batch or Pipelined.
-      The default execution mode is :data:`ExecutionMode.PIPELINED`
-
     - Enabling or disabling the "closure cleaner". The closure cleaner 
pre-processes
       the implementations of functions. In case they are (anonymous) inner 
classes,
       it removes unused references to the enclosing class to fix certain 
serialization-related
@@ -246,41 +242,6 @@ class ExecutionConfig(object):
         self._j_execution_config = 
self._j_execution_config.setTaskCancellationTimeout(timeout)
         return self
 
-    def set_execution_mode(self, execution_mode: ExecutionMode) -> 
'ExecutionConfig':
-        """
-        Sets the execution mode to execute the program. The execution mode 
defines whether
-        data exchanges are performed in a batch or on a pipelined manner.
-
-        The default execution mode is :data:`ExecutionMode.PIPELINED`.
-
-        Example:
-        ::
-
-            >>> config.set_execution_mode(ExecutionMode.BATCH)
-
-        :param execution_mode: The execution mode to use. The execution mode 
could be
-                               :data:`ExecutionMode.PIPELINED`,
-                               :data:`ExecutionMode.PIPELINED_FORCED`,
-                               :data:`ExecutionMode.BATCH` or
-                               :data:`ExecutionMode.BATCH_FORCED`.
-        """
-        
self._j_execution_config.setExecutionMode(execution_mode._to_j_execution_mode())
-        return self
-
-    def get_execution_mode(self) -> 'ExecutionMode':
-        """
-        Gets the execution mode used to execute the program. The execution 
mode defines whether
-        data exchanges are performed in a batch or on a pipelined manner.
-
-        The default execution mode is :data:`ExecutionMode.PIPELINED`.
-
-        .. seealso:: :func:`set_execution_mode`
-
-        :return: The execution mode for the program.
-        """
-        j_execution_mode = self._j_execution_config.getExecutionMode()
-        return ExecutionMode._from_j_execution_mode(j_execution_mode)
-
     def enable_force_kryo(self) -> 'ExecutionConfig':
         """
         Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
diff --git a/flink-python/pyflink/common/execution_mode.py 
b/flink-python/pyflink/common/execution_mode.py
deleted file mode 100644
index 5252024526c..00000000000
--- a/flink-python/pyflink/common/execution_mode.py
+++ /dev/null
@@ -1,82 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from enum import Enum
-
-from pyflink.java_gateway import get_gateway
-
-__all__ = ['ExecutionMode']
-
-
-class ExecutionMode(Enum):
-    """
-    The execution mode specifies how a batch program is executed in terms
-    of data exchange: pipelining or batched.
-
-    :data:`PIPELINED`:
-
-    Executes the program in a pipelined fashion (including shuffles and 
broadcasts),
-    except for data exchanges that are susceptible to deadlocks when 
pipelining.
-    These data exchanges are performed in a batch manner.
-
-    An example of situations that are susceptible to deadlocks (when executed 
in a
-    pipelined manner) are data flows that branch (one data set consumed by 
multiple
-    operations) and re-join later.
-
-
-    :data:`PIPELINED_FORCED`:
-
-    Executes the program in a pipelined fashion (including shuffles and 
broadcasts),
-    **including** data exchanges that are susceptible to deadlocks when
-    executed via pipelining.
-
-    Usually, PIPELINED is the preferable option, which pipelines most
-    data exchanges and only uses batch data exchanges in situations that are
-    susceptible to deadlocks.
-
-    This option should only be used with care and only in situations where the
-    programmer is sure that the program is safe for full pipelining and that
-    Flink was too conservative when choosing the batch exchange at a certain
-    point.
-
-    :data:`BATCH`:
-
-    This mode executes all shuffles and broadcasts in a batch fashion, while
-    pipelining data between operations that exchange data only locally
-    between one producer and one consumer.
-
-    :data:`BATCH_FORCED`:
-
-    This mode executes the program in a strict batch way, including all points
-    where data is forwarded locally from one producer to one consumer. This 
mode
-    is typically more expensive to execute than the BATCH mode. It does
-    guarantee that no successive operations are ever executed concurrently.
-    """
-
-    PIPELINED = 0
-    PIPELINED_FORCED = 1
-    BATCH = 2
-    BATCH_FORCED = 3
-
-    @staticmethod
-    def _from_j_execution_mode(j_execution_mode) -> 'ExecutionMode':
-        return ExecutionMode[j_execution_mode.name()]
-
-    def _to_j_execution_mode(self):
-        gateway = get_gateway()
-        JExecutionMode = gateway.jvm.org.apache.flink.api.common.ExecutionMode
-        return getattr(JExecutionMode, self.name)
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py 
b/flink-python/pyflink/common/tests/test_execution_config.py
index e08e5a23e83..3a8c278959f 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.common import (ExecutionConfig, ExecutionMode, Configuration)
+from pyflink.common import (ExecutionConfig, Configuration)
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase
 from pyflink.util.java_utils import get_j_env_configuration
@@ -91,24 +91,6 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
         
self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 3000)
 
-    def test_get_set_execution_mode(self):
-
-        self.execution_config.set_execution_mode(ExecutionMode.BATCH)
-
-        self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.BATCH)
-
-        self.execution_config.set_execution_mode(ExecutionMode.PIPELINED)
-
-        self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.PIPELINED)
-
-        self.execution_config.set_execution_mode(ExecutionMode.BATCH_FORCED)
-
-        self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.BATCH_FORCED)
-
-        
self.execution_config.set_execution_mode(ExecutionMode.PIPELINED_FORCED)
-
-        self.assertEqual(self.execution_config.get_execution_mode(), 
ExecutionMode.PIPELINED_FORCED)
-
     def test_disable_enable_auto_generated_uids(self):
 
         self.execution_config.disable_auto_generated_uids()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
deleted file mode 100644
index 74e63f760ba..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/DataExchangeMode.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-/** Defines how the data exchange between two specific operators happens. */
-public enum DataExchangeMode {
-
-    /**
-     * The data exchange is streamed, sender and receiver are online at the 
same time, and the
-     * receiver back-pressures the sender.
-     */
-    PIPELINED,
-
-    /**
-     * The data exchange is decoupled. The sender first produces its entire 
result and finishes.
-     * After that, the receiver is started and may consume the data.
-     */
-    BATCH,
-
-    /**
-     * The data exchange starts like in {@link #PIPELINED} and falls back to 
{@link #BATCH} for
-     * recovery runs.
-     */
-    PIPELINE_WITH_BATCH_FALLBACK;
-
-    // ------------------------------------------------------------------------
-
-    public static DataExchangeMode getForForwardExchange(ExecutionMode mode) {
-        return FORWARD[mode.ordinal()];
-    }
-
-    public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode 
mode) {
-        return SHUFFLE[mode.ordinal()];
-    }
-
-    public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode 
mode) {
-        return BREAKING[mode.ordinal()];
-    }
-
-    /**
-     * Computes the mode of data exchange to be used for a given execution 
mode and ship strategy.
-     * The type of the data exchange depends also on whether this connection 
has been identified to
-     * require pipeline breaking for deadlock avoidance.
-     *
-     * <ul>
-     *   <li>If the connection is set to be pipeline breaking, this returns 
the pipeline breaking
-     *       variant of the execution mode {@link
-     *       
org.apache.flink.runtime.io.network.DataExchangeMode#getPipelineBreakingExchange(org.apache.flink.api.common.ExecutionMode)}.
-     *   <li>If the data exchange is a simple FORWARD (one-to-one 
communication), this returns
-     *       {@link
-     *       
org.apache.flink.runtime.io.network.DataExchangeMode#getForForwardExchange(org.apache.flink.api.common.ExecutionMode)}.
-     *   <li>If otherwise, this returns {@link
-     *       
org.apache.flink.runtime.io.network.DataExchangeMode#getForShuffleOrBroadcast(org.apache.flink.api.common.ExecutionMode)}.
-     * </ul>
-     *
-     * @param shipStrategy The ship strategy (FORWARD, PARTITION, BROADCAST, 
...) of the runtime
-     *     data exchange.
-     * @return The data exchange mode for the connection, given the concrete 
ship strategy.
-     */
-    public static DataExchangeMode select(
-            ExecutionMode executionMode, ShipStrategyType shipStrategy, 
boolean breakPipeline) {
-
-        if (shipStrategy == null || shipStrategy == ShipStrategyType.NONE) {
-            throw new IllegalArgumentException("shipStrategy may not be null 
or NONE");
-        }
-        if (executionMode == null) {
-            throw new IllegalArgumentException("executionMode may not mbe 
null");
-        }
-
-        if (breakPipeline) {
-            return getPipelineBreakingExchange(executionMode);
-        } else if (shipStrategy == ShipStrategyType.FORWARD) {
-            return getForForwardExchange(executionMode);
-        } else {
-            return getForShuffleOrBroadcast(executionMode);
-        }
-    }
-
-    // ------------------------------------------------------------------------
-
-    private static final DataExchangeMode[] FORWARD =
-            new DataExchangeMode[ExecutionMode.values().length];
-
-    private static final DataExchangeMode[] SHUFFLE =
-            new DataExchangeMode[ExecutionMode.values().length];
-
-    private static final DataExchangeMode[] BREAKING =
-            new DataExchangeMode[ExecutionMode.values().length];
-
-    // initialize the map between execution modes and exchange modes in
-    static {
-        FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
-        SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
-        BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
-
-        FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
-        SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
-        BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;
-
-        FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
-        SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
-        BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;
-
-        FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
-        SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
-        BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
deleted file mode 100644
index da5506c78c4..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DataExchangeModeTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.api.common.ExecutionMode;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** This test verifies that the data exchange modes are defined for every 
execution mode. */
-class DataExchangeModeTest {
-
-    @Test
-    void testForward() {
-        for (ExecutionMode mode : ExecutionMode.values()) {
-            
assertThat(DataExchangeMode.getForForwardExchange(mode)).isNotNull();
-        }
-    }
-
-    @Test
-    void testShuffleAndBroadcast() {
-        for (ExecutionMode mode : ExecutionMode.values()) {
-            
assertThat(DataExchangeMode.getForShuffleOrBroadcast(mode)).isNotNull();
-        }
-    }
-
-    @Test
-    void testPipelineBreaking() {
-        for (ExecutionMode mode : ExecutionMode.values()) {
-            
assertThat(DataExchangeMode.getPipelineBreakingExchange(mode)).isNotNull();
-        }
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
index 5a049d98172..20ee4d94300 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/JobConfiguration.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.scheduler.benchmark;
 
-import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -26,15 +26,15 @@ import org.apache.flink.runtime.jobgraph.JobType;
 
 /**
  * {@link JobConfiguration} contains the configuration of a STREAMING/BATCH 
job. It concludes {@link
- * DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link 
ExecutionMode}, {@link
- * HybridPartitionDataConsumeConstraint}.
+ * DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link 
RuntimeExecutionMode},
+ * {@link HybridPartitionDataConsumeConstraint}.
  */
 public enum JobConfiguration {
     STREAMING(
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
-            ExecutionMode.PIPELINED,
+            RuntimeExecutionMode.STREAMING,
             4000,
             false),
 
@@ -42,7 +42,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             4000,
             false),
 
@@ -50,7 +50,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS,
             4000,
             false),
@@ -59,7 +59,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS,
             4000,
             false),
@@ -68,7 +68,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED,
             4000,
             false),
@@ -77,7 +77,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
-            ExecutionMode.PIPELINED,
+            RuntimeExecutionMode.STREAMING,
             10,
             false),
 
@@ -85,7 +85,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             10,
             false),
 
@@ -93,7 +93,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS,
             10,
             false),
@@ -102,7 +102,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS,
             10,
             false),
@@ -111,7 +111,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.HYBRID_FULL,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED,
             10,
             false),
@@ -120,7 +120,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
-            ExecutionMode.PIPELINED,
+            RuntimeExecutionMode.STREAMING,
             4000,
             true),
 
@@ -128,7 +128,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.BATCH,
             4000,
             true),
 
@@ -136,7 +136,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.PIPELINED,
             JobType.STREAMING,
-            ExecutionMode.PIPELINED,
+            RuntimeExecutionMode.STREAMING,
             10,
             true),
 
@@ -144,7 +144,7 @@ public enum JobConfiguration {
             DistributionPattern.ALL_TO_ALL,
             ResultPartitionType.BLOCKING,
             JobType.BATCH,
-            ExecutionMode.BATCH,
+            RuntimeExecutionMode.STREAMING,
             10,
             true);
 
@@ -152,7 +152,7 @@ public enum JobConfiguration {
     private final DistributionPattern distributionPattern;
     private final ResultPartitionType resultPartitionType;
     private final JobType jobType;
-    private final ExecutionMode executionMode;
+    private final RuntimeExecutionMode runtimeExecutionMode;
     private final boolean evenlySpreadOutSlots;
     private final HybridPartitionDataConsumeConstraint 
hybridPartitionDataConsumeConstraint;
 
@@ -160,14 +160,14 @@ public enum JobConfiguration {
             DistributionPattern distributionPattern,
             ResultPartitionType resultPartitionType,
             JobType jobType,
-            ExecutionMode executionMode,
+            RuntimeExecutionMode runtimeExecutionMode,
             int parallelism,
             boolean evenlySpreadOutSlots) {
         this(
                 distributionPattern,
                 resultPartitionType,
                 jobType,
-                executionMode,
+                runtimeExecutionMode,
                 HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS,
                 parallelism,
                 evenlySpreadOutSlots);
@@ -177,14 +177,14 @@ public enum JobConfiguration {
             DistributionPattern distributionPattern,
             ResultPartitionType resultPartitionType,
             JobType jobType,
-            ExecutionMode executionMode,
+            RuntimeExecutionMode runtimeExecutionMode,
             HybridPartitionDataConsumeConstraint 
hybridPartitionDataConsumeConstraint,
             int parallelism,
             boolean evenlySpreadOutSlots) {
         this.distributionPattern = distributionPattern;
         this.resultPartitionType = resultPartitionType;
         this.jobType = jobType;
-        this.executionMode = executionMode;
+        this.runtimeExecutionMode = runtimeExecutionMode;
         this.hybridPartitionDataConsumeConstraint = 
hybridPartitionDataConsumeConstraint;
         this.parallelism = parallelism;
         this.evenlySpreadOutSlots = evenlySpreadOutSlots;
@@ -206,8 +206,8 @@ public enum JobConfiguration {
         return jobType;
     }
 
-    public ExecutionMode getExecutionMode() {
-        return executionMode;
+    public RuntimeExecutionMode getRuntimeExecutionMode() {
+        return runtimeExecutionMode;
     }
 
     public HybridPartitionDataConsumeConstraint 
getHybridPartitionDataConsumeConstraint() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index a60b3fd584b..af718fdd7cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -95,9 +95,7 @@ public class SchedulerBenchmarkUtils {
 
         jobGraph.setJobType(jobConfiguration.getJobType());
 
-        final ExecutionConfig executionConfig = new ExecutionConfig();
-        executionConfig.setExecutionMode(jobConfiguration.getExecutionMode());
-        jobGraph.setExecutionConfig(executionConfig);
+        jobGraph.setExecutionConfig(new ExecutionConfig());
 
         return jobGraph;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 6c65053c46e..a35494a7143 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -24,48 +24,10 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
-import java.util.Iterator;
-
 /** A collection of utilities for {@link DataStream DataStreams}. */
 @Experimental
 public final class DataStreamUtils {
 
-    /**
-     * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the
-     * elements of the given DataStream.
-     *
-     * <p>The DataStream application is executed in the regular distributed 
manner on the target
-     * environment, and the events from the stream are polled back to this 
application process and
-     * thread through Flink's REST API.
-     *
-     * @deprecated Please use {@link DataStream#executeAndCollect()}.
-     */
-    @Deprecated
-    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
-        return collect(stream, "Data Stream Collect");
-    }
-
-    /**
-     * Triggers the distributed execution of the streaming dataflow and 
returns an iterator over the
-     * elements of the given DataStream.
-     *
-     * <p>The DataStream application is executed in the regular distributed 
manner on the target
-     * environment, and the events from the stream are polled back to this 
application process and
-     * thread through Flink's REST API.
-     *
-     * @deprecated Please use {@link DataStream#executeAndCollect()}.
-     */
-    @Deprecated
-    public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, String 
executionJobName) {
-        try {
-            return stream.executeAndCollect(executionJobName);
-        } catch (Exception e) {
-            // this "wrap as unchecked" step is here only to preserve the 
exception signature
-            // backwards compatible.
-            throw new RuntimeException("Failed to execute data stream", e);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Deriving a KeyedStream from a stream already partitioned by key
     //  without a shuffle
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index b46910033db..1a7843dc88d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.recovery;
 
-import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -274,9 +273,7 @@ public class BatchFineGrainedRecoveryITCase extends 
TestLogger {
         StreamExecutionEnvironment env = new 
TestStreamEnvironment(miniCluster, 1);
         RestartStrategyUtils.configureFixedDelayRestartStrategy(
                 env, MAX_JOB_RESTART_ATTEMPTS, Duration.ofMillis(10));
-        env.getConfig()
-                .setExecutionMode(
-                        ExecutionMode.BATCH_FORCED); // forces all partitions 
to be blocking
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         return env;
     }
 


Reply via email to