dianfu commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r292271670
########## File path: flink-python/pyflink/common/execution_config.py ########## @@ -0,0 +1,720 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +from pyflink.common.execution_mode import ExecutionMode +from pyflink.common.input_dependency_constraint import InputDependencyConstraint +from pyflink.common.restart_strategy import RestartStrategies +from pyflink.java_gateway import get_gateway +from pyflink.util.utils import load_java_class + +if sys.version >= '3': + unicode = str + +__all__ = ['ExecutionConfig'] + + +class ExecutionConfig(object): + """ + A config to define the behavior of the program execution. It allows to define (among other + options) the following settings: + + - The default parallelism of the program, i.e., how many parallel tasks to use for + all functions that do not define a specific value directly. + + - The number of retries in the case of failed executions. + + - The delay between execution retries. + + - The :class:`ExecutionMode` of the program: Batch or Pipelined. + The default execution mode is :data:`ExecutionMode.PIPELINED` + + - Enabling or disabling the "closure cleaner". The closure cleaner pre-processes + the implementations of functions. In case they are (anonymous) inner classes, + it removes unused references to the enclosing class to fix certain serialization-related + problems and to reduce the size of the closure. + + - The config allows to register types and serializers to increase the efficiency of + handling *generic types* and *POJOs*. This is usually only needed + when the functions return not only the types declared in their signature, but + also subclasses of those types. + + :data:`PARALLELISM_DEFAULT`: + + The flag value indicating use of the default parallelism. This value can + be used to reset the parallelism back to the default state. + + :data:`PARALLELISM_UNKNOWN`: + + The flag value indicating an unknown or unset parallelism. This value is + not a valid parallelism and indicates that the parallelism should remain + unchanged. + """ + + PARALLELISM_DEFAULT = -1 + + PARALLELISM_UNKNOWN = -2 + + def __init__(self, j_execution_config): + self._j_execution_config = j_execution_config + + def enable_closure_cleaner(self): + """ + Enables the ClosureCleaner. This analyzes user code functions and sets fields to null + that are not used. This will in most cases make closures or anonymous inner classes + serializable that where not serializable due to some Scala or Java implementation artifact. + User code must be serializable because it needs to be sent to worker nodes. + + :return: This object. + """ + self._j_execution_config = self._j_execution_config.enableClosureCleaner() + return self + + def disable_closure_cleaner(self): + """ + Disables the ClosureCleaner. + + see :func:`enable_closure_cleaner` + + :return: This object. + """ + self._j_execution_config = self._j_execution_config.disableClosureCleaner() + return self + + def is_closure_cleaner_enabled(self): + """ + Returns whether the ClosureCleaner is enabled. + + see :func:`enable_closure_cleaner` + + :return: ``True`` means enable and ``False`` means disable. + """ + return self._j_execution_config.isClosureCleanerEnabled() + + def set_auto_watermark_interval(self, interval): + """ + Sets the interval of the automatic watermark emission. Watermarks are used throughout + the streaming system to keep track of the progress of time. They are used, for example, + for time based windowing. + + :param interval: The integer value interval between watermarks in milliseconds. + :return: This object. + """ + self._j_execution_config = self._j_execution_config.setAutoWatermarkInterval(interval) + return self + + def get_auto_watermark_interval(self): + """ + Returns the interval of the automatic watermark emission. + + see :func:`set_auto_watermark_interval` + + :return: The integer value interval in milliseconds of the automatic watermark emission. + """ + return self._j_execution_config.getAutoWatermarkInterval() + + def set_latency_tracking_interval(self, interval): + """ + Interval for sending latency tracking marks from the sources to the sinks. + + Flink will send latency tracking marks from the sources at the specified interval. + Setting a tracking interval <= 0 disables the latency tracking. + + :param interval: Integer value interval in milliseconds. + :return: This object. + """ + self._j_execution_config = self._j_execution_config.setLatencyTrackingInterval(interval) + return self + + def get_latency_tracking_interval(self): + """ + Returns the latency tracking interval. + + :return: The latency tracking interval in milliseconds. + """ + return self._j_execution_config.getLatencyTrackingInterval() + + def is_latency_tracking_configured(self): + """ + Returns whether the latency tracking is configured. + + :return: ``True`` means configured and ``False`` means not configured. + """ + return self._j_execution_config.isLatencyTrackingConfigured() Review comment: isLatencyTrackingConfigured is an internal API and can be removed in the Python API. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
