dianfu commented on a change in pull request #8681: [FLINK-12585][python] Align 
Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r294755507
 
 

 ##########
 File path: flink-python/pyflink/common/execution_config.py
 ##########
 @@ -0,0 +1,705 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+from pyflink.common.execution_mode import ExecutionMode
+from pyflink.common.input_dependency_constraint import 
InputDependencyConstraint
+from pyflink.common.restart_strategy import RestartStrategies
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import load_java_class
+
+if sys.version >= '3':
+    unicode = str
+
+__all__ = ['ExecutionConfig']
+
+
+class ExecutionConfig(object):
+    """
+    A config to define the behavior of the program execution. It allows to 
define (among other
+    options) the following settings:
+
+    - The default parallelism of the program, i.e., how many parallel tasks to 
use for
+      all functions that do not define a specific value directly.
+
+    - The number of retries in the case of failed executions.
+
+    - The delay between execution retries.
+
+    - The :class:`ExecutionMode` of the program: Batch or Pipelined.
+      The default execution mode is :data:`ExecutionMode.PIPELINED`
+
+    - Enabling or disabling the "closure cleaner". The closure cleaner 
pre-processes
+      the implementations of functions. In case they are (anonymous) inner 
classes,
+      it removes unused references to the enclosing class to fix certain 
serialization-related
+      problems and to reduce the size of the closure.
+
+    - The config allows to register types and serializers to increase the 
efficiency of
+      handling *generic types* and *POJOs*. This is usually only needed
+      when the functions return not only the types declared in their 
signature, but
+      also subclasses of those types.
+
+    :data:`PARALLELISM_DEFAULT`:
+
+    The flag value indicating use of the default parallelism. This value can
+    be used to reset the parallelism back to the default state.
+
+    :data:`PARALLELISM_UNKNOWN`:
+
+    The flag value indicating an unknown or unset parallelism. This value is
+    not a valid parallelism and indicates that the parallelism should remain
+    unchanged.
+    """
+
+    PARALLELISM_DEFAULT = -1
+
+    PARALLELISM_UNKNOWN = -2
+
+    def __init__(self, j_execution_config):
+        self._j_execution_config = j_execution_config
+
+    def enable_closure_cleaner(self):
+        """
+        Enables the ClosureCleaner. This analyzes user code functions and sets 
fields to null
+        that are not used. This will in most cases make closures or anonymous 
inner classes
+        serializable that where not serializable due to some Scala or Java 
implementation artifact.
+        User code must be serializable because it needs to be sent to worker 
nodes.
+
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.enableClosureCleaner()
+        return self
+
+    def disable_closure_cleaner(self):
+        """
+        Disables the ClosureCleaner.
+
+        .. seealso:: :func:`enable_closure_cleaner`
+
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.disableClosureCleaner()
+        return self
+
+    def is_closure_cleaner_enabled(self):
+        """
+        Returns whether the ClosureCleaner is enabled.
+
+        .. seealso:: :func:`enable_closure_cleaner`
+
+        :return: ``True`` means enable and ``False`` means disable.
+        """
+        return self._j_execution_config.isClosureCleanerEnabled()
+
+    def set_auto_watermark_interval(self, interval):
+        """
+        Sets the interval of the automatic watermark emission. Watermarks are 
used throughout
+        the streaming system to keep track of the progress of time. They are 
used, for example,
+        for time based windowing.
+
+        :param interval: The integer value interval between watermarks in 
milliseconds.
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.setAutoWatermarkInterval(interval)
+        return self
+
+    def get_auto_watermark_interval(self):
+        """
+        Returns the interval of the automatic watermark emission.
+
+        .. seealso:: :func:`set_auto_watermark_interval`
+
+        :return: The integer value interval in milliseconds of the automatic 
watermark emission.
+        """
+        return self._j_execution_config.getAutoWatermarkInterval()
+
+    def set_latency_tracking_interval(self, interval):
+        """
+        Interval for sending latency tracking marks from the sources to the 
sinks.
+
+        Flink will send latency tracking marks from the sources at the 
specified interval.
+        Setting a tracking interval <= 0 disables the latency tracking.
+
+        :param interval: Integer value interval in milliseconds.
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.setLatencyTrackingInterval(interval)
+        return self
+
+    def get_latency_tracking_interval(self):
+        """
+        Returns the latency tracking interval.
+
+        :return: The latency tracking interval in milliseconds.
+        """
+        return self._j_execution_config.getLatencyTrackingInterval()
+
+    def get_parallelism(self):
+        """
+        Gets the parallelism with which operation are executed by default. 
Operations can
+        individually override this value to use a specific parallelism.
+
+        Other operations may need to run with a different parallelism - for 
example calling
+        a reduce operation over the entire data set will involve an operation 
that runs
+        with a parallelism of one (the final reduce to the single result 
value).
+
+        :return: The parallelism used by operations, unless they override that 
value. This method
+                 returns :data:`ExecutionConfig.PARALLELISM_DEFAULT` if the 
environment's default
+                 parallelism should be used.
+        """
+        return self._j_execution_config.getParallelism()
+
+    def set_parallelism(self, parallelism):
+        """
+        Sets the parallelism for operations executed through this environment.
+        Setting a parallelism of x here will cause all operators (such as 
join, map, reduce) to run
+        with x parallel instances.
+
+        This method overrides the default parallelism for this environment.
+        The local execution environment uses by default a value equal to the 
number of hardware
+        contexts (CPU cores / threads). When executing the program via the 
command line client
+        from a JAR/Python file, the default parallelism is the one configured 
for that setup.
+
+        :param parallelism: The parallelism to use.
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.setParallelism(parallelism)
+        return self
+
+    def get_max_parallelism(self):
+        """
+        Gets the maximum degree of parallelism defined for the program.
+
+        The maximum degree of parallelism specifies the upper limit for 
dynamic scaling. It also
+        defines the number of key groups used for partitioned state.
+
+        :return: Maximum degree of parallelism.
+        """
+        return self._j_execution_config.getMaxParallelism()
+
+    def set_max_parallelism(self, max_parallelism):
+        """
+        Sets the maximum degree of parallelism defined for the program.
+
+        The maximum degree of parallelism specifies the upper limit for 
dynamic scaling. It also
+        defines the number of key groups used for partitioned state.
+
+        :param max_parallelism: Maximum degree of parallelism to be used for 
the program.
+        """
+        self._j_execution_config.setMaxParallelism(max_parallelism)
+
+    def get_task_cancellation_interval(self):
+        """
+        Gets the interval (in milliseconds) between consecutive attempts to 
cancel a running task.
+
+        :return: The integer value interval in milliseconds.
+        """
+        return self._j_execution_config.getTaskCancellationInterval()
+
+    def set_task_cancellation_interval(self, interval):
+        """
+        Sets the configuration parameter specifying the interval (in 
milliseconds)
+        between consecutive attempts to cancel a running task.
+
+        :param interval: The integer value interval in milliseconds.
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.setTaskCancellationInterval(interval)
+        return self
+
+    def get_task_cancellation_timeout(self):
+        """
+        Returns the timeout (in milliseconds) after which an ongoing task
+        cancellation leads to a fatal TaskManager error.
+
+        The value ``0`` means that the timeout is disabled. In
+        this case a stuck cancellation will not lead to a fatal error.
+
+        :return: The timeout in milliseconds.
+        """
+        return self._j_execution_config.getTaskCancellationTimeout()
+
+    def set_task_cancellation_timeout(self, timeout):
+        """
+        Sets the timeout (in milliseconds) after which an ongoing task 
cancellation
+        is considered failed, leading to a fatal TaskManager error.
+
+        The cluster default is configured via 
``TaskManagerOptions#TASK_CANCELLATION_TIMEOUT``.
+
+        The value ``0`` disables the timeout. In this case a stuck
+        cancellation will not lead to a fatal error.
+
+        :param timeout: The task cancellation timeout (in milliseconds).
+        :return: This object.
+        """
+        self._j_execution_config = 
self._j_execution_config.setTaskCancellationTimeout(timeout)
+        return self
+
+    def set_restart_strategy(self, restart_strategy_configuration):
+        """
+        Sets the restart strategy to be used for recovery.
+        ::
+
+            >>> config = env.getConfig()
+            >>> 
config.set_restart_strategy(RestartStrategies.fixed_delay_restart(10, 1000))
+
+        :param restart_strategy_configuration: Configuration defining the 
restart strategy to use.
+        """
+        self._j_execution_config.setRestartStrategy(
+            restart_strategy_configuration._j_restart_strategy_configuration)
+
+    def get_restart_strategy(self):
+        """
+        Returns the restart strategy which has been set for the current job.
+
+        :return: The specified restart configuration.
+        """
+        return RestartStrategies._from_j_restart_strategy(
+            self._j_execution_config.getRestartStrategy())
+
+    def set_execution_mode(self, execution_mode):
+        """
+        Sets the execution mode to execute the program. The execution mode 
defines whether
+        data exchanges are performed in a batch or on a pipelined manner.
+
+        The default execution mode is :data:`ExecutionMode.PIPELINED`.
+
+        Example:
+        ::
+
+            >>> config.set_execution_mode(ExecutionMode.BATCH)
+
+        :param execution_mode: The execution mode to use.
+        """
+        self._j_execution_config.setExecutionMode(
+            ExecutionMode._to_j_execution_mode(execution_mode))
+
+    def get_execution_mode(self):
+        """
+        Gets the execution mode used to execute the program. The execution 
mode defines whether
+        data exchanges are performed in a batch or on a pipelined manner.
+
+        The default execution mode is :data:`ExecutionMode.PIPELINED`.
+
+        :return: The execution mode for the program.
+        """
+        j_execution_mode = self._j_execution_config.getExecutionMode()
+        return ExecutionMode._from_j_execution_mode(j_execution_mode)
+
+    def set_default_input_dependency_constraint(self, 
input_dependency_constraint):
+        """
+        Sets the default input dependency constraint for vertex scheduling. It 
indicates when a
+        task should be scheduled considering its inputs status.
+
 
 Review comment:
   Direct users to `InputDependencyConstraint` for all the available values for 
input_dependency_constraint. Otherwise, users won't know how to use this 
interface and they have to dig into the implementation. Should also consider 
other interfaces which have similar cases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to