dianfu commented on a change in pull request #8681: [FLINK-12585][python] Align Stream/BatchTableEnvironment with JAVA Table API URL: https://github.com/apache/flink/pull/8681#discussion_r292290488
########## File path: flink-python/pyflink/streaming/stream_execution_environment.py ########## @@ -0,0 +1,437 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from py4j.java_gateway import get_java_class + +from pyflink.common import CheckpointConfig, CheckpointingMode, RestartStrategies +from pyflink.common.execution_config import ExecutionConfig +from pyflink.common.state_backend import _from_j_state_backend +from pyflink.common.time_characteristic import TimeCharacteristic +from pyflink.java_gateway import get_gateway +from pyflink.util.utils import to_j_config, load_java_class + +__all__ = ['StreamExecutionEnvironment'] + + +class StreamExecutionEnvironment(object): + """ + The StreamExecutionEnvironment is the context in which a streaming program is executed. A + *LocalStreamEnvironment* will cause execution in the attached JVM, a + *RemoteStreamEnvironment* will cause execution on a remote setup. + + The environment provides methods to control the job execution (such as setting the parallelism + or the fault tolerance/checkpointing parameters) and to interact with the outside world (data + access). + """ + + def __init__(self, j_stream_execution_environment): + self._j_stream_execution_environment = j_stream_execution_environment + + def get_config(self): + """ + Gets the config object. + + :return: The :class:`~pyflink.common.ExecutionConfig` object. + """ + return ExecutionConfig(self._j_stream_execution_environment.getConfig()) + + def set_parallelism(self, parallelism): + """ + Sets the parallelism for operations executed through this environment. + Setting a parallelism of x here will cause all operators (such as map, + batchReduce) to run with x parallel instances. This method overrides the + default parallelism for this environment. The + *LocalStreamEnvironment* uses by default a value equal to the + number of hardware contexts (CPU cores / threads). When executing the + program via the command line client from a JAR file, the default degree + of parallelism is the one configured for that setup. + + :param parallelism: The parallelism. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setParallelism(parallelism) + return self + + def set_max_parallelism(self, max_parallelism): + """ + Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) + is 32767. + + The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also + defines the number of key groups used for partitioned state. + + :param max_parallelism: Maximum degree of parallelism to be used for the program, + with 0 < maxParallelism <= 2^15 - 1. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setMaxParallelism(max_parallelism) + return self + + def get_parallelism(self): + """ + Gets the parallelism with which operation are executed by default. + Operations can individually override this value to use a specific + parallelism. + + :return: The parallelism used by operations, unless they override that value. + """ + return self._j_stream_execution_environment.getParallelism() + + def get_max_parallelism(self): + """ + Gets the maximum degree of parallelism defined for the program. + + The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also + defines the number of key groups used for partitioned state. + + :return: Maximum degree of parallelism. + """ + return self._j_stream_execution_environment.getMaxParallelism() + + def set_buffer_timeout(self, timeout_millis): + """ + Sets the maximum time frequency (milliseconds) for the flushing of the + output buffers. By default the output buffers flush frequently to provide + low latency and to aid smooth developer experience. Setting the parameter + can result in three logical modes: + + - A positive integer triggers flushing periodically by that integer + - 0 triggers flushing after every record thus minimizing latency + - -1 triggers flushing only when the output buffer is full thus maximizing throughput + + :param timeout_millis: The maximum time between two output flushes. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setBufferTimeout(timeout_millis) + return self + + def get_buffer_timeout(self): + """ + Gets the maximum time frequency (milliseconds) for the flushing of the + output buffers. For clarification on the extremal values see + :func:`set_buffer_timeout`. + + :return: The timeout of the buffer. + """ + return self._j_stream_execution_environment.getBufferTimeout() + + def disable_operation_chaining(self): + """ + Disables operator chaining for streaming operators. Operator chaining + allows non-shuffle operations to be co-located in the same thread fully + avoiding serialization and de-serialization. + + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.disableOperatorChaining() + return self + + def is_chaining_enabled(self): + """ + Returns whether operator chaining is enabled. + + :return: True if chaining is enabled, false otherwise. + """ + return self._j_stream_execution_environment.isChainingEnabled() + + def get_checkpoint_config(self): + """ + Gets the checkpoint config, which defines values like checkpoint interval, delay between + checkpoints, etc. + + :return: The :class:`~pyflink.common.CheckpointConfig`. + """ + j_checkpoint_config = self._j_stream_execution_environment.getCheckpointConfig() + return CheckpointConfig(j_checkpoint_config) + + def enable_checkpointing(self, interval, mode=None): + """ + Enables checkpointing for the streaming job. The distributed state of the streaming + dataflow will be periodically snapshotted. In case of a failure, the streaming + dataflow will be restarted from the latest completed checkpoint. + + The job draws checkpoints periodically, in the given interval. The system uses the + given :class:`~pyflink.common.CheckpointingMode` for the checkpointing ("exactly once" + vs "at least once"). The state will be stored in the configured state backend. + + .. note:: + Checkpointing iterative streaming dataflows in not properly supported at + the moment. For that reason, iterative jobs will not be started if used + with enabled checkpointing. + + :param interval: Time interval between state checkpoints in milliseconds. + :param mode: The checkpointing mode, selecting between "exactly once" and "at least once" + guaranteed. + :return: This object. + """ + if mode is None: + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.enableCheckpointing(interval) + else: + j_checkpointing_mode = CheckpointingMode._to_j_checkpointing_mode(mode) + self._j_stream_execution_environment.enableCheckpointing( + interval, + j_checkpointing_mode) + return self + + def get_checkpoint_interval(self): + """ + Returns the checkpointing interval or -1 if checkpointing is disabled. + + Shorthand for get_checkpoint_config().get_checkpoint_interval(). + + :return: The checkpointing interval or -1. + """ + return self._j_stream_execution_environment.getCheckpointInterval() + + def get_checkpointing_mode(self): + """ + Returns the checkpointing mode (exactly-once vs. at-least-once). + + Shorthand for get_checkpoint_config().get_checkpointing_mode(). + + :return: The :class:`~pyflink.common.CheckpointingMode`. + """ + j_checkpointing_mode = self._j_stream_execution_environment.getCheckpointingMode() + return CheckpointingMode._from_j_checkpointing_mode(j_checkpointing_mode) + + def get_state_backend(self): + """ + Gets the state backend that defines how to store and checkpoint state. + + see :func:`set_state_backend` + + see :func:`set_state_backend_factory` + + :return: The :class:`StateBackend`. + """ + j_state_backend = self._j_stream_execution_environment.getStateBackend() + return _from_j_state_backend(j_state_backend) + + def set_state_backend(self, state_backend): + """ + Sets the state backend that describes how to store and checkpoint operator state. It + defines both which data structures hold state during execution (for example hash tables, + RockDB, or other data stores) as well as where checkpointed data will be persisted. + + The :class:`~pyflink.common.MemoryStateBackend` for example maintains the state in heap + memory, as objects. It is lightweight without extra dependencies, but can checkpoint only + small states(some counters). + + In contrast, the :class:`~pyflink.common.FsStateBackend` stores checkpoints of the state + (also maintained as heap objects) in files. When using a replicated file system (like HDFS, + S3, MapR FS, Alluxio, etc) this will guarantee that state is not lost upon failures of + individual nodes and that streaming program can be executed highly available and strongly + consistent(assuming that Flink is run in high-availability mode). + + see :func:`get_state_backend` + + :param state_backend: The :class:`StateBackend`. + :return: This object. + """ + self._j_stream_execution_environment = \ + self._j_stream_execution_environment.setStateBackend(state_backend._j_state_backend) + return self + + def set_state_backend_factory(self, state_backend_factory_class_name, config): Review comment: Why added this method? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
