dianfu commented on a change in pull request #8681: [FLINK-12585][python] Align 
Stream/BatchTableEnvironment with JAVA Table API
URL: https://github.com/apache/flink/pull/8681#discussion_r292742628
 
 

 ##########
 File path: flink-python/pyflink/common/state_backend.py
 ##########
 @@ -0,0 +1,797 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+from pyflink.util.utils import to_j_config, load_java_class
+
+__all__ = [
+    'StateBackend',
+    'MemoryStateBackend',
+    'FsStateBackend',
+    'RocksDBStateBackend',
+    'CustomStateBackend',
+    'PredefinedOptions']
+
+if sys.version > '3':
+    xrange = range
+
+
+def _configure(configurable_state_backend, config):
+    gateway = get_gateway()
+    j_config = to_j_config(config)
+    context_class_loader = 
gateway.jvm.Thread.currentThread().getContextClassLoader()
+    return configurable_state_backend._j_state_backend.configure(j_config, 
context_class_loader)
+
+
+def _from_j_state_backend(j_state_backend):
+    if j_state_backend is None:
+        return None
+    gateway = get_gateway()
+    JStateBackend = gateway.jvm.org.apache.flink.runtime.state.StateBackend
+    JMemoryStateBackend = gateway.jvm.org.apache.flink.runtime.state.memory \
+        .MemoryStateBackend
+    JFsStateBackend = gateway.jvm.org.apache.flink.runtime.state.filesystem \
+        .FsStateBackend
+    JRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+        .RocksDBStateBackend
+    j_clz = j_state_backend.getClass()
+
+    if not get_java_class(JStateBackend).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of StateBackend." % 
j_state_backend)
+
+    if 
get_java_class(JMemoryStateBackend).isAssignableFrom(j_state_backend.getClass()):
+        return MemoryStateBackend(j_memory_state_backend=j_state_backend)
+    elif 
get_java_class(JFsStateBackend).isAssignableFrom(j_state_backend.getClass()):
+        return FsStateBackend(j_fs_state_backend=j_state_backend)
+    elif 
get_java_class(JRocksDBStateBackend).isAssignableFrom(j_state_backend.getClass()):
+        return RocksDBStateBackend(j_rocks_db_state_backend=j_state_backend)
+    else:
+        return CustomStateBackend(j_state_backend)  # users' customized state 
backend
+
+
+class StateBackend(object):
+    """
+    A **State Backend** defines how the state of a streaming application is 
stored and
+    checkpointed. Different State Backends store their state in different 
fashions, and use
+    different data structures to hold the state of a running application.
+
+    For example, the :class:`MemoryStateBackend` keeps working state in the 
memory of the
+    TaskManager and stores checkpoints in the memory of the JobManager. The 
backend is
+    lightweight and without additional dependencies, but not highly available 
and supports only
+    small state.
+
+    The :class:`FsStateBackend` keeps working state in the memory of the 
TaskManager and stores
+    state checkpoints in a filesystem(typically a replicated highly-available 
filesystem,
+    like `HDFS <https://hadoop.apache.org/>`_, `Ceph <https://ceph.com/>`_,
+    `S3 <https://aws.amazon.com/documentation/s3/>`_, `GCS 
<https://cloud.google.com/storage/>`_,
+    etc).
+
+    The :class:`RocksDBStateBackend` stores working state in `RocksDB 
<http://rocksdb.org/>`_,
+    and checkpoints the state by default to a filesystem (similar to the 
:class:`FsStateBackend`).
+
+    **Raw Bytes Storage and Backends**
+
+    The :class:`StateBackend` creates services for *raw bytes storage* and for 
*keyed state*
+    and *operator state*.
+
+    The *raw bytes storage* (through the 
`org.apache.flink.runtime.state.CheckpointStreamFactory`)
+    is the fundamental service that simply stores bytes in a fault tolerant 
fashion. This service
+    is used by the JobManager to store checkpoint and recovery metadata and is 
typically also used
+    by the keyed- and operator state backends to store checkpointed state.
+
+    The `org.apache.flink.runtime.state.AbstractKeyedStateBackend and
+    `org.apache.flink.runtime.state.OperatorStateBackend` created by this 
state backend define how
+    to hold the working state for keys and operators. They also define how to 
checkpoint that
+    state, frequently using the raw bytes storage (via the
+    `org.apache.flink.runtime.state.CheckpointStreamFactory`). However, it is 
also possible that
+    for example a keyed state backend simply implements the bridge to a 
key/value store, and that
+    it does not need to store anything in the raw byte storage upon a 
checkpoint.
+
+    **Serializability**
+
+    State Backends need to be serializable(`java.io.Serializable`), because 
they distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that, :class:`StateBackend` implementations are meant to be 
like *factories* that
+    create the proper states stores that provide access to the persistent 
storage and hold the
+    keyed- and operator state data structures. That way, the State Backend can 
be very lightweight
+    (contain only configurations) which makes it easier to be serializable.
+
+    **Thread Safety**
+
+    State backend implementations have to be thread-safe. Multiple threads may 
be creating
+    streams and keyed-/operator state backends concurrently.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_state_backend):
+        self._j_state_backend = j_state_backend
+
+
+class MemoryStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers.
+    The state backend checkpoints state directly to the JobManager's memory 
(hence the backend's
+    name), but the checkpoints will be persisted to a file system for 
high-availability setups and
+    savepoints. The MemoryStateBackend is consequently a FileSystem-based 
backend that can work
+    without a file system dependency in simple setups.
+
+    This state backend should be used only for experimentation, quick local 
setups,
+    or for streaming applications that have very small state: Because it 
requires checkpoints to
+    go through the JobManager's memory, larger state will occupy larger 
portions of the
+    JobManager's main memory, reducing operational stability.
+    For any other setup, the :class:`FsStateBackend` should be used. The 
:class:`FsStateBackend`
+    holds the working state on the TaskManagers in the same way, but 
checkpoints state directly to
+    files rather then to the JobManager's memory, thus supporting large state 
sizes.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 savepoint_path=None,
+                 max_state_size=None,
+                 using_asynchronous_snapshots=None,
+                 j_memory_state_backend=None):
+        """
+        Creates a new MemoryStateBackend, setting optionally the paths to 
persist checkpoint
+        metadata and savepoints to, as well as configuring state thresholds 
and asynchronous
+        operations.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param savepoint_path: The path to write savepoints to. If none, the 
value from
+                               the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param using_asynchronous_snapshots: Flag to switch between 
synchronous and asynchronous
+                                             snapshot mode. If null, the value 
configured in the
+                                             runtime configuration will be 
used.
+        :param j_memory_state_backend: For internal use, please keep none.
+        """
+        if j_memory_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JMemoryStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.memory\
+                .MemoryStateBackend
+            if using_asynchronous_snapshots is None:
+                j_asynchronous_snapshots = JTernaryBoolean.UNDEFINED
+            elif using_asynchronous_snapshots is True:
+                j_asynchronous_snapshots = JTernaryBoolean.TRUE
+            elif using_asynchronous_snapshots is False:
+                j_asynchronous_snapshots = JTernaryBoolean.FALSE
+            else:
+                raise TypeError("Unsupported input for 
'using_asynchronous_snapshots': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+            if max_state_size is None:
+                max_state_size = JMemoryStateBackend.DEFAULT_MAX_STATE_SIZE
+            j_memory_state_backend = JMemoryStateBackend(checkpoint_path,
+                                                         savepoint_path,
+                                                         max_state_size,
+                                                         
j_asynchronous_snapshots)
+
+        self._j_memory_state_backend = j_memory_state_backend
+        super(MemoryStateBackend, self).__init__(j_memory_state_backend)
+
+    def configure(self, config):
+        """
+        Creates a copy of this state backend that uses the values defined in 
the configuration
+        for fields where that were not specified in this state backend.
+
+        :param config: The configuration dict.
+        :return: The re-configured variant of the state backend.
+        """
+        return MemoryStateBackend(j_memory_state_backend=_configure(self, 
config))
+
+    def get_max_state_size(self):
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_memory_state_backend.getMaxStateSize()
+
+    def is_using_asynchronous_snapshots(self):
+        """
+        Gets whether the key/value data structures are asynchronously 
snapshotted.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.ASYNC_SNAPSHOTS``.
+
+        :return: True if the key/value data structures are asynchronously 
snapshotted,
+                 false otherwise.
+        """
+        return self._j_memory_state_backend.isUsingAsynchronousSnapshots()
+
+    def __str__(self):
+        return self._j_memory_state_backend.toString()
+
+
+class FsStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers.
+    The state backend checkpoints state as files to a file system (hence the 
backend's name).
+
+    Each checkpoint individually will store all its files in a subdirectory 
that includes the
+    checkpoint number, such as 
``hdfs://namenode:port/flink-checkpoints/chk-17/``.
+
+    **State Size Considerations**
+
+    Working state is kept on the TaskManager heap. If a TaskManager executes 
multiple
+    tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+    then the aggregate state of all tasks needs to fit into that TaskManager's 
memory.
+
+    This state backend stores small state chunks directly with the metadata, 
to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threshold :func:`get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this state backend are as persistent and available as 
filesystem that is
+    written to. If the file system is a persistent distributed file system, 
this state backend
+    supports highly available setups. The backend additionally supports 
savepoints and externalized
+    checkpoints.
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    def __init__(self,
+                 checkpoint_directory_uri=None,
+                 default_savepoint_directory_uri=None,
+                 file_state_size_threshold=None,
+                 using_asynchronous_snapshots=None,
+                 j_fs_state_backend=None):
+        """
+        Creates a new state backend that stores its checkpoint data in the 
file system and location
+        defined by the given URI.
+
+        A file system for the file system scheme in the URI (e.g., 'file://', 
'hdfs://', or
+        'S3://') must be accessible via 
``org.apache.flink.core.fs.FileSystem.get(URI)``.
+
+        For a state backend targeting HDFS, this means that the URI must 
either specify the
+        authority (host and port), or that the Hadoop configuration that 
describes that information
+        must be in the classpath.
+
+        :param checkpoint_directory_uri: The path to write checkpoint metadata 
to, required.
+        :param default_savepoint_directory_uri: The path to write savepoints 
to. If none, the value
+                                                from the runtime configuration 
will be used, or
+                                                savepoint target locations 
need to be passed when
+                                                triggering a savepoint.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                          metadata, rather than in files. If 
none, the value
+                                          configured in the runtime 
configuration will be used, or
+                                          the default value (1KB) if nothing 
is configured.
+        :param using_asynchronous_snapshots: Flag to switch between 
synchronous and asynchronous
+                                             snapshot mode. If none, the value 
configured in
+                                             the runtime configuration will be 
used.
+        :param j_fs_state_backend: For internal use, please keep none.
+        """
+        if j_fs_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JFsStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.filesystem\
+                .FsStateBackend
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+            if checkpoint_directory_uri is None:
+                raise ValueError("The parameter 'checkpoint_directory_uri' is 
required!")
+            j_checkpoint_directory_uri = 
JPath(checkpoint_directory_uri).toUri()
+
+            if default_savepoint_directory_uri is None:
+                j_default_savepoint_directory_uri = None
+            else:
+                j_default_savepoint_directory_uri = 
JPath(default_savepoint_directory_uri).toUri()
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = -1
+
+            if using_asynchronous_snapshots is None:
+                j_asynchronous_snapshots = JTernaryBoolean.UNDEFINED
+            elif using_asynchronous_snapshots is True:
+                j_asynchronous_snapshots = JTernaryBoolean.TRUE
+            elif using_asynchronous_snapshots is False:
+                j_asynchronous_snapshots = JTernaryBoolean.FALSE
+            else:
+                raise TypeError("Unsupported input for 
'using_asynchronous_snapshots': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+
+            j_fs_state_backend = JFsStateBackend(j_checkpoint_directory_uri,
+                                                 
j_default_savepoint_directory_uri,
+                                                 file_state_size_threshold,
+                                                 j_asynchronous_snapshots)
+
+        self._j_fs_state_backend = j_fs_state_backend
+        super(FsStateBackend, self).__init__(j_fs_state_backend)
+
+    def configure(self, config):
+        """
+        Creates a copy of this state backend that uses the values defined in 
the configuration
+        for fields where that were not specified in this state backend.
+
+        :param config: The configuration dict.
+        :return: The re-configured variant of the state backend.
+        """
+        return FsStateBackend(j_fs_state_backend=_configure(self, config))
+
+    def get_checkpoint_path(self):
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return self._j_fs_state_backend.getCheckpointPath().toString()
+
+    def get_min_file_size_threshold(self):
+        """
+        Gets the threshold below which state is stored as part of the 
metadata, rather than in
+        files. This threshold ensures that the backend does not create a large 
amount of very
+        small files, where potentially the file pointers are larger than the 
state itself.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD``.
+
+        :return: The file size threshold, in bytes.
+        """
+        return self._j_fs_state_backend.getMinFileSizeThreshold()
+
+    def is_using_asynchronous_snapshots(self):
+        """
+        Gets whether the key/value data structures are asynchronously 
snapshotted.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.ASYNC_SNAPSHOTS``.
+
+        :return: True if the key/value data structures are asynchronously 
snapshotted,
+                 false otherwise.
+        """
+        return self._j_fs_state_backend.isUsingAsynchronousSnapshots()
+
+
+class RocksDBStateBackend(StateBackend):
+    """
+    A State Backend that stores its state in ``RocksDB``. This state backend 
can
+    store very large state that exceeds memory and spills to disk.
+
+    All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+    For persistence against loss of machines, checkpoints take a snapshot of 
the
+    RocksDB database, and persist that snapshot in a file system (by default) 
or
+    another configurable state backend.
+
+    The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+    using the methods :func:`set_predefined_options` and :func:`set_options`.
+    """
+
+    def __init__(self,
+                 checkpoint_data_uri=None,
+                 enable_incremental_checkpointing=None,
+                 checkpoint_stream_backend=None,
+                 j_rocks_db_state_backend=None):
+        """
+        Creates a new :class:`RocksDBStateBackend` that stores its checkpoint 
data in the given
+        state backend or the location of given URI.
+
+        If using state backend, typically, one would supply a filesystem or 
database state backend
+        here where the snapshots from RocksDB would be stored.
+
+        If using URI, a state backend that stores checkpoints in HDFS or S3 
must specify the file
+        system host and port in the URI, or have the Hadoop configuration that 
describes the file
+        system (host / high-availability group / possibly credentials) either 
referenced from the
+        Flink config, or included in the classpath.
+
+        :param checkpoint_data_uri: The URI describing the filesystem and path 
to the checkpoint
+                                    data directory.
+        :param enable_incremental_checkpointing: True if incremental 
checkpointing is enabled.
+        :param checkpoint_stream_backend: The backend write the checkpoint 
streams to.
+        :param j_rocks_db_state_backend: For internal use, please keep none.
+        """
+        if j_rocks_db_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+                .RocksDBStateBackend
+
+            if enable_incremental_checkpointing not in (None, True, False):
+                raise TypeError("Unsupported input for 
'enable_incremental_checkpointing': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+
+            if checkpoint_data_uri is not None:
+                if enable_incremental_checkpointing is None:
+                    j_rocks_db_state_backend = 
JRocksDBStateBackend(checkpoint_data_uri)
+                else:
+                    j_rocks_db_state_backend = \
+                        JRocksDBStateBackend(checkpoint_data_uri, 
enable_incremental_checkpointing)
+            elif isinstance(checkpoint_stream_backend, StateBackend):
+                if enable_incremental_checkpointing is None:
+                    j_enable_incremental_checkpointing = 
JTernaryBoolean.UNDEFINED
+                elif enable_incremental_checkpointing is True:
+                    j_enable_incremental_checkpointing = JTernaryBoolean.TRUE
+                else:
+                    j_enable_incremental_checkpointing = JTernaryBoolean.FALSE
+
+                j_rocks_db_state_backend = \
+                    
JRocksDBStateBackend(checkpoint_stream_backend._j_state_backend,
+                                         j_enable_incremental_checkpointing)
+
+        self._j_rocks_db_state_backend = j_rocks_db_state_backend
+        super(RocksDBStateBackend, self).__init__(j_rocks_db_state_backend)
+
+    def configure(self, config):
 
 Review comment:
   The method `configure` is used internally and can be removed. What do you 
think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to