dianfu commented on a change in pull request #15441:
URL: https://github.com/apache/flink/pull/15441#discussion_r604566912



##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage

Review comment:
       the instance variable `_j_jobmanager_checkpoint_storage ` could be 
removed and we could use the `_j_checkpoint_storage` defined in the super class 
CheckpointStorage

##########
File path: flink-python/pyflink/datastream/state_backend.py
##########
@@ -116,8 +119,280 @@ def __init__(self, j_state_backend):
         self._j_state_backend = j_state_backend
 
 
+class HashMapStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers
+    and checkpoints based on the configured CheckpointStorage.
+
+    **State Size Considerations**
+
+    Working state is kept on the TaskManager heap. If a TaskManager executes 
multiple
+    tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+    then the aggregate state of all tasks needs to fit into that TaskManager's 
memory.
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    def __init__(self, j_hashmap_state_backend=None):
+        """
+        Creates a new MemoryStateBackend, setting optionally the paths to 
persist checkpoint
+        metadata and savepoints to, as well as configuring state thresholds 
and asynchronous
+        operations.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> state_backend = HashMapStateBackend()
+
+        :param j_hashmap_state_backend: For internal use, please keep none.
+        """
+        if j_hashmap_state_backend is None:
+            gateway = get_gateway()
+            JHashMapStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.hashmap\
+                .HashMapStateBackend
+
+            j_hashmap_state_backend = JHashMapStateBackend()
+
+        self._j_hashmap_state_backend = j_hashmap_state_backend

Review comment:
       _j_hashmap_state_backend could be removed

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    `FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
+
+    Each checkpoint will store all its files in a subdirectory that includes 
the
+    checkpoints number, such as 
`hdfs://namenode:port/flink-checkpoints/chk-17/`.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threashold `get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this checkpoint storage are as persistent and available 
as the filesystem
+    that it is written to. If the file system is a persistent distributed file 
system, this
+    checkpoint storage supports highly available setups. The backend 
additionally supports
+    savepoints and externalized checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage policies, this backend can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. For example, if the 
storage is configured
+    in the application without a default savepoint directory, it will pick up 
a default savepoint
+    directory specified in the Flink configuration of the running job/cluster.
+    """
+
+    # Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte).
+    MAX_FILE_STATE_THRESHOLD = 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 file_state_size_threshold=None,
+                 write_buffer_size=-1,
+                 j_filesystem_checkpoint_storage=None):
+        """
+        Creates a new FileSystemCheckpointStorage, setting the paths for the 
checkpoint data
+        in a file system.
+
+        All file systems for the file system scheme in the URI (e.g., 
`file://`, `hdfs://`, or
+        `s3://`) must be accessible via `FileSystem#get`.
+
+        For a Job targeting HDFS, this means that the URI must either specify 
the authority (host
+        and port), of the Hadoop configuration that describes that information 
must be in the
+        classpath.
+
+        Example:
+        ::
+            >>> checkpoint_storage = 
FileSystemCheckpointStorage("hdfs://checkpoints")
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                        metadata, rather than in files. If -1, 
the value configured
+                                        in the runtime configuration will be 
used, or the default
+                                        value (1KB) if nothing is configured.
+        :param write_buffer_size: Write buffer size used to serialize state. 
If -1, the value
+                                    configured in the runtime configuration 
will be used, or the
+                                    default value (4KB) if nothing is 
configured.
+        :param j_filesystem_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_filesystem_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is None:
+                raise ValueError("checkpoint_path must not be None")
+            else:
+                checkpoint_path = JPath(checkpoint_path)
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = 
FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD
+
+            j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
+                checkpoint_path,
+                file_state_size_threshold,
+                write_buffer_size)
+
+        self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage
+        super(FileSystemCheckpointStorage, 
self).__init__(j_filesystem_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_min_file_size_threshold(self) -> int:
+        """
+        Gets the threshold below which state is stored as part of the 
metadata, rather than in
+        file. This threshold ensures the backend does not create a large 
amount of small files,
+        where potentially the file pointers are larget than the state itself.
+        """
+        return self._j_filesystem_checkpoint_storage.getMinFileSizeThreshold()
+
+    def get_write_buffer_size(self) -> int:
+        """
+        Gets the write buffer size for created checkpoint streams.
+        """
+        return self._j_filesystem_checkpoint_storage.getWriteBufferSize()
+
+    def __str__(self):
+        return self._j_filesystem_checkpoint_storage.toString()
+
+
+class CustomCheckpointStorage(CheckpointStorage):
+    """
+    A wrapper of customized java checkpoint storae created from the provided 
`StateBackendFactory`.

Review comment:
       ```suggestion
       A wrapper of customized java checkpoint storage created from the 
provided `StateBackendFactory`.
   ```

##########
File path: flink-python/pyflink/datastream/state_backend.py
##########
@@ -116,8 +119,280 @@ def __init__(self, j_state_backend):
         self._j_state_backend = j_state_backend
 
 
+class HashMapStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers
+    and checkpoints based on the configured CheckpointStorage.
+
+    **State Size Considerations**
+
+    Working state is kept on the TaskManager heap. If a TaskManager executes 
multiple
+    tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+    then the aggregate state of all tasks needs to fit into that TaskManager's 
memory.
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    def __init__(self, j_hashmap_state_backend=None):
+        """
+        Creates a new MemoryStateBackend, setting optionally the paths to 
persist checkpoint
+        metadata and savepoints to, as well as configuring state thresholds 
and asynchronous
+        operations.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> state_backend = HashMapStateBackend()
+
+        :param j_hashmap_state_backend: For internal use, please keep none.
+        """
+        if j_hashmap_state_backend is None:
+            gateway = get_gateway()
+            JHashMapStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.hashmap\
+                .HashMapStateBackend
+
+            j_hashmap_state_backend = JHashMapStateBackend()
+
+        self._j_hashmap_state_backend = j_hashmap_state_backend
+        super(HashMapStateBackend, self).__init__(j_hashmap_state_backend)
+
+    def __str__(self):
+        return self._j_hashmap_state_backend.toString()
+
+
+class EmbeddedRocksDBStateBackend(StateBackend):
+    """
+    A State Backend that stores its state in an embedded ``RocksDB`` instance. 
This state backend
+    can store very large state that exceeds memory and spills to local disk.
+
+    All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+    For persistence against loss of machines, please configure a 
CheckpointStorage instance
+    for the Job.
+
+    The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+    using the methods :func:`set_predefined_options` and :func:`set_options`.
+    """
+
+    def __init__(self,
+                 enable_incremental_checkpointing=None,
+                 j_embedded_rocks_db_state_backend=None):
+        """
+        Creates a new :class:`EmbeddedRocksDBStateBackend` for storing local 
state.
+
+        Example:
+        ::
+
+            >>> state_backend = EmbeddedRocksDBStateBackend()
+
+        :param enable_incremental_checkpointing: True if incremental 
checkpointing is enabled.
+        :param j_embedded_rocks_db_state_backend: For internal use, please 
keep none.
+        """
+        if j_embedded_rocks_db_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JEmbeddedRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+                .EmbeddedRocksDBStateBackend
+
+            if enable_incremental_checkpointing not in (None, True, False):
+                raise TypeError("Unsupported input for 
'enable_incremental_checkpointing': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+
+            if enable_incremental_checkpointing is None:
+                j_enable_incremental_checkpointing = JTernaryBoolean.UNDEFINED
+            elif enable_incremental_checkpointing is True:
+                j_enable_incremental_checkpointing = JTernaryBoolean.TRUE
+            else:
+                j_enable_incremental_checkpointing = JTernaryBoolean.FALSE
+
+            j_embedded_rocks_db_state_backend = \
+                
JEmbeddedRocksDBStateBackend(j_enable_incremental_checkpointing)
+
+        self._j_embedded_rocks_db_state_backend = 
j_embedded_rocks_db_state_backend

Review comment:
       _j_embedded_rocks_db_state_backend could be removed

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    `FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
+
+    Each checkpoint will store all its files in a subdirectory that includes 
the
+    checkpoints number, such as 
`hdfs://namenode:port/flink-checkpoints/chk-17/`.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threashold `get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this checkpoint storage are as persistent and available 
as the filesystem
+    that it is written to. If the file system is a persistent distributed file 
system, this
+    checkpoint storage supports highly available setups. The backend 
additionally supports
+    savepoints and externalized checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage policies, this backend can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. For example, if the 
storage is configured
+    in the application without a default savepoint directory, it will pick up 
a default savepoint
+    directory specified in the Flink configuration of the running job/cluster.
+    """
+
+    # Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte).
+    MAX_FILE_STATE_THRESHOLD = 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 file_state_size_threshold=None,
+                 write_buffer_size=-1,
+                 j_filesystem_checkpoint_storage=None):
+        """
+        Creates a new FileSystemCheckpointStorage, setting the paths for the 
checkpoint data
+        in a file system.
+
+        All file systems for the file system scheme in the URI (e.g., 
`file://`, `hdfs://`, or
+        `s3://`) must be accessible via `FileSystem#get`.
+
+        For a Job targeting HDFS, this means that the URI must either specify 
the authority (host
+        and port), of the Hadoop configuration that describes that information 
must be in the
+        classpath.
+
+        Example:
+        ::
+            >>> checkpoint_storage = 
FileSystemCheckpointStorage("hdfs://checkpoints")
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                        metadata, rather than in files. If -1, 
the value configured
+                                        in the runtime configuration will be 
used, or the default
+                                        value (1KB) if nothing is configured.
+        :param write_buffer_size: Write buffer size used to serialize state. 
If -1, the value
+                                    configured in the runtime configuration 
will be used, or the
+                                    default value (4KB) if nothing is 
configured.
+        :param j_filesystem_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_filesystem_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is None:
+                raise ValueError("checkpoint_path must not be None")
+            else:
+                checkpoint_path = JPath(checkpoint_path)
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = 
FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD
+
+            j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
+                checkpoint_path,
+                file_state_size_threshold,
+                write_buffer_size)
+
+        self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage
+        super(FileSystemCheckpointStorage, 
self).__init__(j_filesystem_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       ```suggestion
           return 
self._j_filesystem_checkpoint_storage.getSavepointPath().toString()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       should also consider the null case

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       ```suggestion
           return 
self._j_jobmanager_checkpoint_storage.getSavepointPath().toString()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       Should consider the case that self. 
_j_jobmanager_checkpoint_storage.getCheckpointPath() returns null.

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    `FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
+
+    Each checkpoint will store all its files in a subdirectory that includes 
the
+    checkpoints number, such as 
`hdfs://namenode:port/flink-checkpoints/chk-17/`.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threashold `get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this checkpoint storage are as persistent and available 
as the filesystem
+    that it is written to. If the file system is a persistent distributed file 
system, this
+    checkpoint storage supports highly available setups. The backend 
additionally supports
+    savepoints and externalized checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage policies, this backend can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. For example, if the 
storage is configured
+    in the application without a default savepoint directory, it will pick up 
a default savepoint
+    directory specified in the Flink configuration of the running job/cluster.
+    """
+
+    # Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte).
+    MAX_FILE_STATE_THRESHOLD = 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 file_state_size_threshold=None,
+                 write_buffer_size=-1,
+                 j_filesystem_checkpoint_storage=None):
+        """
+        Creates a new FileSystemCheckpointStorage, setting the paths for the 
checkpoint data
+        in a file system.
+
+        All file systems for the file system scheme in the URI (e.g., 
`file://`, `hdfs://`, or
+        `s3://`) must be accessible via `FileSystem#get`.
+
+        For a Job targeting HDFS, this means that the URI must either specify 
the authority (host
+        and port), of the Hadoop configuration that describes that information 
must be in the
+        classpath.
+
+        Example:
+        ::
+            >>> checkpoint_storage = 
FileSystemCheckpointStorage("hdfs://checkpoints")
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                        metadata, rather than in files. If -1, 
the value configured
+                                        in the runtime configuration will be 
used, or the default
+                                        value (1KB) if nothing is configured.
+        :param write_buffer_size: Write buffer size used to serialize state. 
If -1, the value
+                                    configured in the runtime configuration 
will be used, or the
+                                    default value (4KB) if nothing is 
configured.
+        :param j_filesystem_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_filesystem_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is None:
+                raise ValueError("checkpoint_path must not be None")
+            else:
+                checkpoint_path = JPath(checkpoint_path)
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = 
FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD
+
+            j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
+                checkpoint_path,
+                file_state_size_threshold,
+                write_buffer_size)
+
+        self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage
+        super(FileSystemCheckpointStorage, 
self).__init__(j_filesystem_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_min_file_size_threshold(self) -> int:
+        """
+        Gets the threshold below which state is stored as part of the 
metadata, rather than in
+        file. This threshold ensures the backend does not create a large 
amount of small files,
+        where potentially the file pointers are larget than the state itself.
+        """
+        return self._j_filesystem_checkpoint_storage.getMinFileSizeThreshold()
+
+    def get_write_buffer_size(self) -> int:
+        """
+        Gets the write buffer size for created checkpoint streams.
+        """
+        return self._j_filesystem_checkpoint_storage.getWriteBufferSize()
+
+    def __str__(self):
+        return self._j_filesystem_checkpoint_storage.toString()
+
+
+class CustomCheckpointStorage(CheckpointStorage):
+    """
+    A wrapper of customized java checkpoint storae created from the provided 
`StateBackendFactory`.
+    """
+
+    def __init__(self, j_custom_checkpoint_storage):
+        super(CheckpointStorage, self).__init__(j_custom_checkpoint_storage)

Review comment:
       ```suggestion
           super(CustomCheckpointStorage, 
self).__init__(j_custom_checkpoint_storage)
   ```

##########
File path: flink-python/pyflink/datastream/state_backend.py
##########
@@ -116,8 +119,280 @@ def __init__(self, j_state_backend):
         self._j_state_backend = j_state_backend
 
 
+class HashMapStateBackend(StateBackend):
+    """
+    This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers
+    and checkpoints based on the configured CheckpointStorage.
+
+    **State Size Considerations**
+
+    Working state is kept on the TaskManager heap. If a TaskManager executes 
multiple
+    tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+    then the aggregate state of all tasks needs to fit into that TaskManager's 
memory.
+
+    **Configuration**
+
+    As for all state backends, this backend can either be configured within 
the application (by
+    creating the backend with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the state backend was specified in the application, it may pick up 
additional configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    def __init__(self, j_hashmap_state_backend=None):
+        """
+        Creates a new MemoryStateBackend, setting optionally the paths to 
persist checkpoint
+        metadata and savepoints to, as well as configuring state thresholds 
and asynchronous
+        operations.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> state_backend = HashMapStateBackend()
+
+        :param j_hashmap_state_backend: For internal use, please keep none.
+        """
+        if j_hashmap_state_backend is None:
+            gateway = get_gateway()
+            JHashMapStateBackend = 
gateway.jvm.org.apache.flink.runtime.state.hashmap\
+                .HashMapStateBackend
+
+            j_hashmap_state_backend = JHashMapStateBackend()
+
+        self._j_hashmap_state_backend = j_hashmap_state_backend
+        super(HashMapStateBackend, self).__init__(j_hashmap_state_backend)
+
+    def __str__(self):
+        return self._j_hashmap_state_backend.toString()
+
+
+class EmbeddedRocksDBStateBackend(StateBackend):
+    """
+    A State Backend that stores its state in an embedded ``RocksDB`` instance. 
This state backend
+    can store very large state that exceeds memory and spills to local disk.
+
+    All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+    For persistence against loss of machines, please configure a 
CheckpointStorage instance
+    for the Job.
+
+    The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+    using the methods :func:`set_predefined_options` and :func:`set_options`.
+    """
+
+    def __init__(self,
+                 enable_incremental_checkpointing=None,
+                 j_embedded_rocks_db_state_backend=None):
+        """
+        Creates a new :class:`EmbeddedRocksDBStateBackend` for storing local 
state.
+
+        Example:
+        ::
+
+            >>> state_backend = EmbeddedRocksDBStateBackend()
+
+        :param enable_incremental_checkpointing: True if incremental 
checkpointing is enabled.
+        :param j_embedded_rocks_db_state_backend: For internal use, please 
keep none.
+        """
+        if j_embedded_rocks_db_state_backend is None:
+            gateway = get_gateway()
+            JTernaryBoolean = gateway.jvm.org.apache.flink.util.TernaryBoolean
+            JEmbeddedRocksDBStateBackend = 
gateway.jvm.org.apache.flink.contrib.streaming.state \
+                .EmbeddedRocksDBStateBackend
+
+            if enable_incremental_checkpointing not in (None, True, False):
+                raise TypeError("Unsupported input for 
'enable_incremental_checkpointing': %s, "
+                                "the value of the parameter should be None or"
+                                "True or False.")
+
+            if enable_incremental_checkpointing is None:
+                j_enable_incremental_checkpointing = JTernaryBoolean.UNDEFINED
+            elif enable_incremental_checkpointing is True:
+                j_enable_incremental_checkpointing = JTernaryBoolean.TRUE
+            else:
+                j_enable_incremental_checkpointing = JTernaryBoolean.FALSE
+
+            j_embedded_rocks_db_state_backend = \
+                
JEmbeddedRocksDBStateBackend(j_enable_incremental_checkpointing)
+
+        self._j_embedded_rocks_db_state_backend = 
j_embedded_rocks_db_state_backend
+        super(EmbeddedRocksDBStateBackend, 
self).__init__(j_embedded_rocks_db_state_backend)
+
+    def set_db_storage_paths(self, *paths: str):
+        """
+        Sets the directories in which the local RocksDB database puts its 
files (like SST and
+        metadata files). These directories do not need to be persistent, they 
can be ephemeral,
+        meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        in checkpoints.
+
+        If nothing is configured, these directories default to the 
TaskManager's local
+        temporary file directories.
+
+        Each distinct state will be stored in one path, but when the state 
backend creates
+        multiple states, they will store their files on different paths.
+
+        Passing ``None`` to this function restores the default behavior, where 
the configured
+        temp directories will be used.
+
+        :param paths: The paths across which the local RocksDB database files 
will be spread. this
+                      parameter is optional.
+        """
+        if len(paths) < 1:
+            self._j_embedded_rocks_db_state_backend.setDbStoragePath(None)
+        else:
+            gateway = get_gateway()
+            j_path_array = gateway.new_array(gateway.jvm.String, len(paths))
+            for i in range(0, len(paths)):
+                j_path_array[i] = paths[i]
+            
self._j_embedded_rocks_db_state_backend.setDbStoragePaths(j_path_array)
+
+    def get_db_storage_paths(self) -> List[str]:
+        """
+        Gets the configured local DB storage paths, or null, if none were 
configured.
+
+        Under these directories on the TaskManager, RocksDB stores its SST 
files and
+        metadata files. These directories do not need to be persistent, they 
can be ephermeral,
+        meaning that they are lost on a machine failure, because state in 
RocksDB is persisted
+        in checkpoints.
+
+        If nothing is configured, these directories default to the 
TaskManager's local
+        temporary file directories.
+
+        :return: The list of configured local DB storage paths.
+        """
+        return 
list(self._j_embedded_rocks_db_state_backend.getDbStoragePaths())
+
+    def is_incremental_checkpoints_enabled(self) -> bool:
+        """
+        Gets whether incremental checkpoints are enabled for this state 
backend.
+
+        :return: True if incremental checkpoints are enabled, false otherwise.
+        """
+        return 
self._j_embedded_rocks_db_state_backend.isIncrementalCheckpointsEnabled()
+
+    def set_predefined_options(self, options: 'PredefinedOptions'):
+        """
+        Sets the predefined options for RocksDB.
+
+        If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
+        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        then the options from the factory are applied on top of the here 
specified
+        predefined options and customized options.
+
+        Example:
+        ::
+
+            >>> 
state_backend.set_predefined_options(PredefinedOptions.SPINNING_DISK_OPTIMIZED)
+
+        :param options: The options to set (must not be null), see 
:class:`PredefinedOptions`.
+        """
+        self._j_embedded_rocks_db_state_backend\
+            .setPredefinedOptions(options._to_j_predefined_options())
+
+    def get_predefined_options(self) -> 'PredefinedOptions':
+        """
+        Gets the current predefined options for RocksDB.
+        The default options (if nothing was set via 
:func:`setPredefinedOptions`)
+        are :data:`PredefinedOptions.DEFAULT`.
+
+        If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
+        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        then the options from the factory are applied on top of the predefined 
and customized
+        options.
+
+        .. seealso:: :func:`set_predefined_options`
+
+        :return: Current predefined options.
+        """
+        j_predefined_options = 
self._j_embedded_rocks_db_state_backend.getPredefinedOptions()
+        return 
PredefinedOptions._from_j_predefined_options(j_predefined_options)
+
+    def set_options(self, options_factory_class_name: str):

Review comment:
       I guess it's not enough to only pass the class name of the 
options_factory. For example, for DefaultConfigurableOptionsFactory, there are 
many methods in it to configure it.

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    `FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
+
+    Each checkpoint will store all its files in a subdirectory that includes 
the
+    checkpoints number, such as 
`hdfs://namenode:port/flink-checkpoints/chk-17/`.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threashold `get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this checkpoint storage are as persistent and available 
as the filesystem
+    that it is written to. If the file system is a persistent distributed file 
system, this
+    checkpoint storage supports highly available setups. The backend 
additionally supports
+    savepoints and externalized checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage policies, this backend can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. For example, if the 
storage is configured
+    in the application without a default savepoint directory, it will pick up 
a default savepoint
+    directory specified in the Flink configuration of the running job/cluster.
+    """
+
+    # Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte).
+    MAX_FILE_STATE_THRESHOLD = 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 file_state_size_threshold=None,
+                 write_buffer_size=-1,
+                 j_filesystem_checkpoint_storage=None):
+        """
+        Creates a new FileSystemCheckpointStorage, setting the paths for the 
checkpoint data
+        in a file system.
+
+        All file systems for the file system scheme in the URI (e.g., 
`file://`, `hdfs://`, or
+        `s3://`) must be accessible via `FileSystem#get`.
+
+        For a Job targeting HDFS, this means that the URI must either specify 
the authority (host
+        and port), of the Hadoop configuration that describes that information 
must be in the
+        classpath.
+
+        Example:
+        ::
+            >>> checkpoint_storage = 
FileSystemCheckpointStorage("hdfs://checkpoints")
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                        metadata, rather than in files. If -1, 
the value configured
+                                        in the runtime configuration will be 
used, or the default
+                                        value (1KB) if nothing is configured.
+        :param write_buffer_size: Write buffer size used to serialize state. 
If -1, the value
+                                    configured in the runtime configuration 
will be used, or the
+                                    default value (4KB) if nothing is 
configured.
+        :param j_filesystem_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_filesystem_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is None:
+                raise ValueError("checkpoint_path must not be None")
+            else:
+                checkpoint_path = JPath(checkpoint_path)
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = 
FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD
+
+            j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
+                checkpoint_path,
+                file_state_size_threshold,
+                write_buffer_size)
+
+        self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage
+        super(FileSystemCheckpointStorage, 
self).__init__(j_filesystem_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_filesystem_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       should consider the null case

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,344 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABCMeta
+
+from py4j.java_gateway import get_java_class
+from typing import Optional
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage']
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("%s is not an instance of CheckpointStorage." % 
j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_clz)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_filesystem_checkpoint_storage=j_clz)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)
+
+
+class CheckpointStorage(object, metaclass=ABCMeta):
+    """
+    Checkpoint storage defines how :class:`StateBackend`'s store their state 
for fault-tolerance
+    in streaming applications. Various implementations store their checkpoints 
in different fashions
+    and have different requirements and availability guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of the
+    `JobManager`. It is lightweight and without additional dependencies but is 
not scalable
+    and only supports small state sizes. This checkpoints storage policy is 
convenient for local
+    testing and development.
+
+    :class:`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS
+    NFS drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of many
+    terabytes while providing a highly available foundation for streaming 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The `CheckpointStorage` creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager to
+    store checkpoint and recovery metadata and is typically also used by the 
keyed- and operator-
+    state backends to store checkpoint state.
+
+    **Serializability**
+
+    Implementations need to be serializable(`java.io.Serializable`), because 
they are distributed
+    across parallel processes (for distributed execution) together with the 
streaming application
+    code.
+
+    Because of that `CheckpointStorage` implementations are meant to be like 
_factories_ that create
+    the proper state stores that provide access to the persistent layer. That 
way, the storage
+    policy can be very lightweight (contain only configurations) which makes 
it easier to be
+    serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+    The `CheckpointStorage` checkpoints state directly to the JobManager's 
memory (hence the
+    name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability. For any other setup, the 
`FileSystemCheckpointStorage`
+    should be used. The `FileSystemCheckpointStorage` but checkpoints state 
directly to files
+    rather than to the JobManager's memory, thus supporting larger state sizes 
and more highly
+    available recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this checkpoint storage is subject to the 
following conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this storage, it 
does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+        **Configuration**
+
+    As for all checkpoint storage, this type can either be configured within 
the application (by
+    creating the storage with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the storage was specified in the application, it may pick up additional 
configuration
+    parameters from the Flink configuration. For example, if the backend if 
configured in the
+    application without a default savepoint directory, it will pick up a 
default savepoint
+    directory specified in the Flink configuration of the running job/cluster. 
That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 max_state_size=None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata to, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .JobManagerCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is not None:
+                checkpoint_path = JPath(checkpoint_path)
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JobManagerCheckpointStorage(checkpoint_path,
+                                                                          
max_state_size)
+
+        self._j_jobmanager_checkpoint_storage = j_jobmanager_checkpoint_storage
+        super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_savepoint_path(self) -> Optional[str]:
+        """
+        Gets the base directory where all the savepoints are stored.
+        The job-specific savepoint directory is created inside this directory.
+
+        :return: The base directory for savepoints.
+        """
+        return 
self._j_jobmanager_checkpoint_storage.getCheckpointPath().toString()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    `FileSystemCheckpointStorage` checkpoints state as files to a filesystem.
+
+    Each checkpoint will store all its files in a subdirectory that includes 
the
+    checkpoints number, such as 
`hdfs://namenode:port/flink-checkpoints/chk-17/`.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threashold `get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this checkpoint storage are as persistent and available 
as the filesystem
+    that it is written to. If the file system is a persistent distributed file 
system, this
+    checkpoint storage supports highly available setups. The backend 
additionally supports
+    savepoints and externalized checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage policies, this backend can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. For example, if the 
storage is configured
+    in the application without a default savepoint directory, it will pick up 
a default savepoint
+    directory specified in the Flink configuration of the running job/cluster.
+    """
+
+    # Maximum size of state that is stored with the metadata, rather than in 
files (1 MiByte).
+    MAX_FILE_STATE_THRESHOLD = 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path=None,
+                 file_state_size_threshold=None,
+                 write_buffer_size=-1,
+                 j_filesystem_checkpoint_storage=None):
+        """
+        Creates a new FileSystemCheckpointStorage, setting the paths for the 
checkpoint data
+        in a file system.
+
+        All file systems for the file system scheme in the URI (e.g., 
`file://`, `hdfs://`, or
+        `s3://`) must be accessible via `FileSystem#get`.
+
+        For a Job targeting HDFS, this means that the URI must either specify 
the authority (host
+        and port), of the Hadoop configuration that describes that information 
must be in the
+        classpath.
+
+        Example:
+        ::
+            >>> checkpoint_storage = 
FileSystemCheckpointStorage("hdfs://checkpoints")
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param file_state_size_threshold: State below this size will be stored 
as part of the
+                                        metadata, rather than in files. If -1, 
the value configured
+                                        in the runtime configuration will be 
used, or the default
+                                        value (1KB) if nothing is configured.
+        :param write_buffer_size: Write buffer size used to serialize state. 
If -1, the value
+                                    configured in the runtime configuration 
will be used, or the
+                                    default value (4KB) if nothing is 
configured.
+        :param j_filesystem_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_filesystem_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage\
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+
+            if checkpoint_path is None:
+                raise ValueError("checkpoint_path must not be None")
+            else:
+                checkpoint_path = JPath(checkpoint_path)
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = 
FileSystemCheckpointStorage.MAX_FILE_STATE_THRESHOLD
+
+            j_filesystem_checkpoint_storage = JFileSystemCheckpointStorage(
+                checkpoint_path,
+                file_state_size_threshold,
+                write_buffer_size)
+
+        self._j_filesystem_checkpoint_storage = j_filesystem_checkpoint_storage

Review comment:
       _j_filesystem_checkpoint_storage could be removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to