sjwiesman commented on a change in pull request #15441:
URL: https://github.com/apache/flink/pull/15441#discussion_r604849522
##########
File path: flink-python/pyflink/datastream/state_backend.py
##########
@@ -116,8 +119,280 @@ def __init__(self, j_state_backend):
self._j_state_backend = j_state_backend
+class HashMapStateBackend(StateBackend):
+ """
+ This state backend holds the working state in the memory (JVM heap) of the
TaskManagers
+ and checkpoints based on the configured CheckpointStorage.
+
+ **State Size Considerations**
+
+ Working state is kept on the TaskManager heap. If a TaskManager executes
multiple
+ tasks concurrently (if the TaskManager has multiple slots, or if
slot-sharing is used)
+ then the aggregate state of all tasks needs to fit into that TaskManager's
memory.
+
+ **Configuration**
+
+ As for all state backends, this backend can either be configured within
the application (by
+ creating the backend with the respective constructor parameters and
setting it on the execution
+ environment) or by specifying it in the Flink configuration.
+
+ If the state backend was specified in the application, it may pick up
additional configuration
+ parameters from the Flink configuration. For example, if the backend if
configured in the
+ application without a default savepoint directory, it will pick up a
default savepoint
+ directory specified in the Flink configuration of the running job/cluster.
That behavior is
+ implemented via the :func:`configure` method.
+ """
+
+ def __init__(self, j_hashmap_state_backend=None):
+ """
+ Creates a new MemoryStateBackend, setting optionally the paths to
persist checkpoint
+ metadata and savepoints to, as well as configuring state thresholds
and asynchronous
+ operations.
+
+ WARNING: Increasing the size of this value beyond the default value
+ (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+ The checkpointed state needs to be send to the JobManager via limited
size RPC messages,
+ and there and the JobManager needs to be able to hold all aggregated
state in its memory.
+
+ Example:
+ ::
+ >>> state_backend = HashMapStateBackend()
+
+ :param j_hashmap_state_backend: For internal use, please keep none.
+ """
+ if j_hashmap_state_backend is None:
+ gateway = get_gateway()
+ JHashMapStateBackend =
gateway.jvm.org.apache.flink.runtime.state.hashmap\
+ .HashMapStateBackend
+
+ j_hashmap_state_backend = JHashMapStateBackend()
+
+ self._j_hashmap_state_backend = j_hashmap_state_backend
+ super(HashMapStateBackend, self).__init__(j_hashmap_state_backend)
+
+ def __str__(self):
+ return self._j_hashmap_state_backend.toString()
+
+
+class EmbeddedRocksDBStateBackend(StateBackend):
+ """
+ A State Backend that stores its state in an embedded ``RocksDB`` instance.
This state backend
+ can store very large state that exceeds memory and spills to local disk.
+
+ All key/value state (including windows) is stored in the key/value index
of RocksDB.
+ For persistence against loss of machines, please configure a
CheckpointStorage instance
+ for the Job.
+
+ The behavior of the RocksDB instances can be parametrized by setting
RocksDB Options
+ using the methods :func:`set_predefined_options` and :func:`set_options`.
+ """
+
+ def __init__(self,
+ enable_incremental_checkpointing=None,
+ j_embedded_rocks_db_state_backend=None):
+ """
+ Creates a new :class:`EmbeddedRocksDBStateBackend` for storing local
state.
+
+ Example:
+ ::
+
+ >>> state_backend = EmbeddedRocksDBStateBackend()
+
+ :param enable_incremental_checkpointing: True if incremental
checkpointing is enabled.
+ :param j_embedded_rocks_db_state_backend: For internal use, please
keep none.
+ """
+ if j_embedded_rocks_db_state_backend is None:
+ gateway = get_gateway()
+ JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+ JEmbeddedRocksDBStateBackend =
gateway.jvm.org.apache.flink.contrib.streaming.state \
+ .EmbeddedRocksDBStateBackend
+
+ if enable_incremental_checkpointing not in (None, True, False):
+ raise TypeError("Unsupported input for
'enable_incremental_checkpointing': %s, "
+ "the value of the parameter should be None or"
+ "True or False.")
+
+ if enable_incremental_checkpointing is None:
+ j_enable_incremental_checkpointing = JTernaryBoolean.UNDEFINED
+ elif enable_incremental_checkpointing is True:
+ j_enable_incremental_checkpointing = JTernaryBoolean.TRUE
+ else:
+ j_enable_incremental_checkpointing = JTernaryBoolean.FALSE
+
+ j_embedded_rocks_db_state_backend = \
+
JEmbeddedRocksDBStateBackend(j_enable_incremental_checkpointing)
+
+ self._j_embedded_rocks_db_state_backend =
j_embedded_rocks_db_state_backend
+ super(EmbeddedRocksDBStateBackend,
self).__init__(j_embedded_rocks_db_state_backend)
+
+ def set_db_storage_paths(self, *paths: str):
+ """
+ Sets the directories in which the local RocksDB database puts its
files (like SST and
+ metadata files). These directories do not need to be persistent, they
can be ephemeral,
+ meaning that they are lost on a machine failure, because state in
RocksDB is persisted
+ in checkpoints.
+
+ If nothing is configured, these directories default to the
TaskManager's local
+ temporary file directories.
+
+ Each distinct state will be stored in one path, but when the state
backend creates
+ multiple states, they will store their files on different paths.
+
+ Passing ``None`` to this function restores the default behavior, where
the configured
+ temp directories will be used.
+
+ :param paths: The paths across which the local RocksDB database files
will be spread. this
+ parameter is optional.
+ """
+ if len(paths) < 1:
+ self._j_embedded_rocks_db_state_backend.setDbStoragePath(None)
+ else:
+ gateway = get_gateway()
+ j_path_array = gateway.new_array(gateway.jvm.String, len(paths))
+ for i in range(0, len(paths)):
+ j_path_array[i] = paths[i]
+
self._j_embedded_rocks_db_state_backend.setDbStoragePaths(j_path_array)
+
+ def get_db_storage_paths(self) -> List[str]:
+ """
+ Gets the configured local DB storage paths, or null, if none were
configured.
+
+ Under these directories on the TaskManager, RocksDB stores its SST
files and
+ metadata files. These directories do not need to be persistent, they
can be ephermeral,
+ meaning that they are lost on a machine failure, because state in
RocksDB is persisted
+ in checkpoints.
+
+ If nothing is configured, these directories default to the
TaskManager's local
+ temporary file directories.
+
+ :return: The list of configured local DB storage paths.
+ """
+ return
list(self._j_embedded_rocks_db_state_backend.getDbStoragePaths())
+
+ def is_incremental_checkpoints_enabled(self) -> bool:
+ """
+ Gets whether incremental checkpoints are enabled for this state
backend.
+
+ :return: True if incremental checkpoints are enabled, false otherwise.
+ """
+ return
self._j_embedded_rocks_db_state_backend.isIncrementalCheckpointsEnabled()
+
+ def set_predefined_options(self, options: 'PredefinedOptions'):
+ """
+ Sets the predefined options for RocksDB.
+
+ If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
+ flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ then the options from the factory are applied on top of the here
specified
+ predefined options and customized options.
+
+ Example:
+ ::
+
+ >>>
state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
+
+ :param options: The options to set (must not be null), see
:class:`PredefinedOptions`.
+ """
+ self._j_embedded_rocks_db_state_backend\
+ .setPredefinedOptions(options._to_j_predefined_options())
+
+ def get_predefined_options(self) -> 'PredefinedOptions':
+ """
+ Gets the current predefined options for RocksDB.
+ The default options (if nothing was set via
:func:`setPredefinedOptions`)
+ are :data:`PredefinedOptions.DEFAULT`.
+
+ If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
+ flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ then the options from the factory are applied on top of the predefined
and customized
+ options.
+
+ .. seealso:: :func:`set_predefined_options`
+
+ :return: Current predefined options.
+ """
+ j_predefined_options =
self._j_embedded_rocks_db_state_backend.getPredefinedOptions()
+ return
PredefinedOptions._from_j_predefined_options(j_predefined_options)
+
+ def set_options(self, options_factory_class_name: str):
Review comment:
No, this is fine (it's actually copied directly from
RocksDBStateBackend). This is just loading a factory class that is defined in
Java. We could think about making the factory definable in Python but that is a
separate issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]