sjwiesman commented on a change in pull request #15441:
URL: https://github.com/apache/flink/pull/15441#discussion_r604849522



##########
File path: flink-python/pyflink/datastream/state_backend.py
##########
@@ -116,8 +119,280 @@ def __init__(self, j_state_backend):
         self._j_state_backend = j_state_backend
 
 
+class HashMapStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers
+    and checkpoints based on the configured CheckpointStorage.
+
+    **State Size Considerations**
+
+    Working state is kept on the TaskManager heap. If a TaskManager executes 
multiple
+    tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+    then the aggregate state of all tasks needs to fit into that TaskManager's 
memory.
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    def __init__(self, j_hashmap_state_backend=None):
+        """
+        Creates a new MemoryStateBackend, setting optionally the paths to 
persist checkpoint
+        metadata and savepoints to, as well as configuring state thresholds 
and asynchronous
+        operations.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> state_backend = HashMapStateBackend()
+
+        :param j_hashmap_state_backend: For internal use, please keep none.
+        """
+        if j_hashmap_state_backend is None:
+            gateway = get_gateway()
+            JHashMapStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.hashmap\
+                .HashMapStateBackend
+
+            j_hashmap_state_backend = JHashMapStateBackend()
+
+        self._j_hashmap_state_backend = j_hashmap_state_backend
+        super(HashMapStateBackend, self).__init__(j_hashmap_state_backend)
+
+    def __str__(self):
+        return self._j_hashmap_state_backend.toString()
+
+
+class EmbeddedRocksDBStateBackend(StateBackend):
+    """
+    A State Backend that stores its state in an embedded ``RocksDB`` instance. 
This state backend
+    can store very large state that exceeds memory and spills to local disk.
+
+    All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+    For persistence against loss of machines, please configure a 
CheckpointStorage instance
+    for the Job.
+
+    The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+    using the methods :func:`set_predefined_options` and :func:`set_options`.
+    """
+
+    def __init__(self,
+                 enable_incremental_checkpointing=None,
+                 j_embedded_rocks_db_state_backend=None):
+        """
+        Creates a new :class:`EmbeddedRocksDBStateBackend` for storing local 
state.
+
+        Example:
+        ::
+
+            >>> state_backend = EmbeddedRocksDBStateBackend()
+
+        :param enable_incremental_checkpointing: True if incremental 
checkpointing is enabled.
+        :param j_embedded_rocks_db_state_backend: For internal use, please 
keep none.
+        """
+        if j_embedded_rocks_db_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JEmbeddedRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+                .EmbeddedRocksDBStateBackend
+
+            if enable_incremental_checkpointing not in (None, True, False):
+                raise TypeError("Unsupported input for 
'enable_incremental_checkpointing': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+
+            if enable_incremental_checkpointing is None:
+                j_enable_incremental_checkpointing = JTernaryBoolean.UNDEFINED
+            elif enable_incremental_checkpointing is True:
+                j_enable_incremental_checkpointing = JTernaryBoolean.TRUE
+            else:
+                j_enable_incremental_checkpointing = JTernaryBoolean.FALSE
+
+            j_embedded_rocks_db_state_backend = \
+                
JEmbeddedRocksDBStateBackend(j_enable_incremental_checkpointing)
+
+        self._j_embedded_rocks_db_state_backend = 
j_embedded_rocks_db_state_backend
+        super(EmbeddedRocksDBStateBackend, 
self).__init__(j_embedded_rocks_db_state_backend)
+
+    def set_db_storage_paths(self, *paths: str):
+        """
+        Sets the directories in which the local RocksDB database puts its 
files (like SST and
+        metadata files). These directories do not need to be persistent, they 
can be ephemeral,
+        meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        in checkpoints.
+
+        If nothing is configured, these directories default to the 
TaskManager's local
+        temporary file directories.
+
+        Each distinct state will be stored in one path, but when the state 
backend creates
+        multiple states, they will store their files on different paths.
+
+        Passing ``None`` to this function restores the default behavior, where 
the configured
+        temp directories will be used.
+
+        :param paths: The paths across which the local RocksDB database files 
will be spread. this
+                      parameter is optional.
+        """
+        if len(paths) < 1:
+            self._j_embedded_rocks_db_state_backend.setDbStoragePath(None)
+        else:
+            gateway = get_gateway()
+            j_path_array = gateway.new_array(gateway.jvm.String, len(paths))
+            for i in range(0, len(paths)):
+                j_path_array[i] = paths[i]
+            
self._j_embedded_rocks_db_state_backend.setDbStoragePaths(j_path_array)
+
+    def get_db_storage_paths(self) -> List[str]:
+        """
+        Gets the configured local DB storage paths, or null, if none were 
configured.
+
+        Under these directories on the TaskManager, RocksDB stores its SST 
files and
+        metadata files. These directories do not need to be persistent, they 
can be ephermeral,
+        meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        in checkpoints.
+
+        If nothing is configured, these directories default to the 
TaskManager's local
+        temporary file directories.
+
+        :return: The list of configured local DB storage paths.
+        """
+        return 
list(self._j_embedded_rocks_db_state_backend.getDbStoragePaths())
+
+    def is_incremental_checkpoints_enabled(self) -> bool:
+        """
+        Gets whether incremental checkpoints are enabled for this state 
backend.
+
+        :return: True if incremental checkpoints are enabled, false otherwise.
+        """
+        return 
self._j_embedded_rocks_db_state_backend.isIncrementalCheckpointsEnabled()
+
+    def set_predefined_options(self, options: 'PredefinedOptions'):
+        """
+        Sets the predefined options for RocksDB.
+
+        If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
+        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        then the options from the factory are applied on top of the here 
specified
+        predefined options and customized options.
+
+        Example:
+        ::
+
+            >>> 
state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
+
+        :param options: The options to set (must not be null), see 
:class:`PredefinedOptions`.
+        """
+        self._j_embedded_rocks_db_state_backend\
+            .setPredefinedOptions(options._to_j_predefined_options())
+
+    def get_predefined_options(self) -> 'PredefinedOptions':
+        """
+        Gets the current predefined options for RocksDB.
+        The default options (if nothing was set via 
:func:`setPredefinedOptions`)
+        are :data:`PredefinedOptions.DEFAULT`.
+
+        If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
+        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        then the options from the factory are applied on top of the predefined 
and customized
+        options.
+
+        .. seealso:: :func:`set_predefined_options`
+
+        :return: Current predefined options.
+        """
+        j_predefined_options = 
self._j_embedded_rocks_db_state_backend.getPredefinedOptions()
+        return 
PredefinedOptions._from_j_predefined_options(j_predefined_options)
+
+    def set_options(self, options_factory_class_name: str):

Review comment:
       No, this is fine (it's actually copied directly from 
RocksDBStateBackend). This is just loading a factory class that is defined in 
Java. We could think about making the factory definable in Python but that is a 
separate issue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to