dianfu commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r292287750
########## File path: flink-python/pyflink/streaming/stream_execution_environment.py ########## @@ -0,0 +1,437 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from py4j.java_gateway import get_java_class + +from pyflink.common import CheckpointConfig, CheckpointingMode, RestartStrategies +from pyflink.common.execution_config import ExecutionConfig +from pyflink.common.state_backend import _from_j_state_backend +from pyflink.common.time_characteristic import TimeCharacteristic +from pyflink.java_gateway import get_gateway +from pyflink.util.utils import to_j_config, load_java_class + +__all__ = ['StreamExecutionEnvironment'] + + +class StreamExecutionEnvironment(object): + """ + The StreamExecutionEnvironment is the context in which a streaming program is executed. A + *LocalStreamEnvironment* will cause execution in the attached JVM, a + *RemoteStreamEnvironment* will cause execution on a remote setup. + + The environment provides methods to control the job execution (such as setting the parallelism + or the fault tolerance/checkpointing parameters) and to interact with the outside world (data + access). + """ + + def __init__(self, j_stream_execution_environment): + self._j_stream_execution_environment = j_stream_execution_environment + + def get_config(self): + """ + Gets the config object. + + :return: The :class:`~pyflink.common.ExecutionConfig` object. + """ + return ExecutionConfig(self._j_stream_execution_environment.getConfig()) + + def set_parallelism(self, parallelism): + """ + Sets the parallelism for operations executed through this environment. + Setting a parallelism of x here will cause all operators (such as map, + batchReduce) to run with x parallel instances. This method overrides the + default parallelism for this environment. The + *LocalStreamEnvironment* uses by default a value equal to the + number of hardware contexts (CPU cores / threads). When executing the + program via the command line client from a JAR file, the default degree + of parallelism is the one configured for that setup. + + :param parallelism: The parallelism. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setParallelism(parallelism) + return self + + def set_max_parallelism(self, max_parallelism): + """ + Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) + is 32767. + + The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also + defines the number of key groups used for partitioned state. + + :param max_parallelism: Maximum degree of parallelism to be used for the program, + with 0 < maxParallelism <= 2^15 - 1. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setMaxParallelism(max_parallelism) + return self + + def get_parallelism(self): + """ + Gets the parallelism with which operation are executed by default. + Operations can individually override this value to use a specific + parallelism. + + :return: The parallelism used by operations, unless they override that value. + """ + return self._j_stream_execution_environment.getParallelism() + + def get_max_parallelism(self): + """ + Gets the maximum degree of parallelism defined for the program. + + The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also + defines the number of key groups used for partitioned state. + + :return: Maximum degree of parallelism. + """ + return self._j_stream_execution_environment.getMaxParallelism() + + def set_buffer_timeout(self, timeout_millis): + """ + Sets the maximum time frequency (milliseconds) for the flushing of the + output buffers. By default the output buffers flush frequently to provide + low latency and to aid smooth developer experience. Setting the parameter + can result in three logical modes: + + - A positive integer triggers flushing periodically by that integer + - 0 triggers flushing after every record thus minimizing latency + - -1 triggers flushing only when the output buffer is full thus maximizing throughput + + :param timeout_millis: The maximum time between two output flushes. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setBufferTimeout(timeout_millis) + return self + + def get_buffer_timeout(self): + """ + Gets the maximum time frequency (milliseconds) for the flushing of the + output buffers. For clarification on the extremal values see + :func:`set_buffer_timeout`. + + :return: The timeout of the buffer. + """ + return self._j_stream_execution_environment.getBufferTimeout() + + def disable_operation_chaining(self): Review comment: operation -> operator ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
