dianfu commented on a change in pull request #13912:
URL: https://github.com/apache/flink/pull/13912#discussion_r517724631



##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_checkpoint_storage(self, checkpoint_storage: CheckpointStorage):

Review comment:
       ```suggestion
       def set_checkpoint_storage(self, checkpoint_storage: CheckpointStorage) 
-> 'CheckpointConfig':
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):

Review comment:
       Needs import abc

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)

Review comment:
       ```suggestion
           super(JobManagerCheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_max_state_size(self) -> int:
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_jobmanager_checkpoint_storage.getMaxStateSize()

Review comment:
       ```suggestion
           return self._j_checkpoint_storage.getMaxStateSize()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_max_state_size(self) -> int:
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_jobmanager_checkpoint_storage.getMaxStateSize()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()

Review comment:
       ```suggestion
           return self._j_checkpoint_storage.toString()
   ```

##########
File path: flink-python/pyflink/datastream/__init__.py
##########
@@ -95,11 +95,17 @@
     'SourceFunction',
     'StateBackend',
     'MapFunction',
+    'HashMapStateBackend',

Review comment:
       Need to import these classes, otherwise, the sphinx check will failed.

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_max_state_size(self) -> int:
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_jobmanager_checkpoint_storage.getMaxStateSize()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    This checkpoint storage checkpoints state as files to a file system (hence 
the name).
+
+    Each checkpoint individually will store all its files in a subdirectory 
that includes the
+    checkpoint number, such as 
``hdfs://namenode:port/flink-checkpoints/chk-17/``.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threshold :func:`get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this storage instance are as persistent and available as 
filesystem that is
+    written to. If the file system is a persistent distributed file system, 
this state backend
+    supports highly available setups. The backend additionally supports 
savepoints and externalized
+    checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage can either be configured 
within the application (by
+    creating the instance with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is 
implemented via
+    the :func:`configure` method.
+    """
+
+    def __init__(self,
+                 checkpoint_directory_uri: str = None,
+                 file_state_size_threshold: int = None,
+                 write_buffer_size: int = None,
+                 j_fs_checkpoint_storage=None):
+
+        if j_fs_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+            if checkpoint_directory_uri is None:
+                raise ValueError("The parameter 'checkpoint_directory_uri' is 
required!")
+            j_checkpoint_directory_uri = 
JPath(checkpoint_directory_uri).toUri()
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = -1
+
+            if write_buffer_size is None:
+                write_buffer_size = -1
+
+            j_fs_checkpoint_storage = 
JFileSystemCheckpointStorage(j_checkpoint_directory_uri,
+                                                                   
file_state_size_threshold,
+                                                                   
write_buffer_size)
+
+        super(FileSystemCheckpointStorage, 
self).__init__(j_fs_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return self._j_fs_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_min_file_size_threshold(self) -> int:
+        """
+        Gets the threshold below which state is stored as part of the 
metadata, rather than in
+        files. This threshold ensures that the backend does not create a large 
amount of very
+        small files, where potentially the file pointers are larger than the 
state itself.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD``.
+
+        :return: The file size threshold, in bytes.
+        """
+        return self._j_fs_checkpoint_storage.getMinFileSizeThreshold()
+
+    def get_write_buffer_size(self) -> int:
+        """
+        Gets the write buffer size for created checkpoint stream.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFER_SIZE``.
+
+        :return: The write buffer size, in bytes.
+        """
+        return self._j_fs_checkpoint_storage.getWriteBufferSize()

Review comment:
       ```suggestion
           return self._j_checkpoint_storage.getWriteBufferSize()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_max_state_size(self) -> int:
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_jobmanager_checkpoint_storage.getMaxStateSize()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    This checkpoint storage checkpoints state as files to a file system (hence 
the name).
+
+    Each checkpoint individually will store all its files in a subdirectory 
that includes the
+    checkpoint number, such as 
``hdfs://namenode:port/flink-checkpoints/chk-17/``.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threshold :func:`get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this storage instance are as persistent and available as 
filesystem that is
+    written to. If the file system is a persistent distributed file system, 
this state backend
+    supports highly available setups. The backend additionally supports 
savepoints and externalized
+    checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage can either be configured 
within the application (by
+    creating the instance with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is 
implemented via
+    the :func:`configure` method.
+    """
+
+    def __init__(self,
+                 checkpoint_directory_uri: str = None,
+                 file_state_size_threshold: int = None,
+                 write_buffer_size: int = None,
+                 j_fs_checkpoint_storage=None):
+
+        if j_fs_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+            if checkpoint_directory_uri is None:
+                raise ValueError("The parameter 'checkpoint_directory_uri' is 
required!")
+            j_checkpoint_directory_uri = 
JPath(checkpoint_directory_uri).toUri()
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = -1
+
+            if write_buffer_size is None:
+                write_buffer_size = -1
+
+            j_fs_checkpoint_storage = 
JFileSystemCheckpointStorage(j_checkpoint_directory_uri,
+                                                                   
file_state_size_threshold,
+                                                                   
write_buffer_size)
+
+        super(FileSystemCheckpointStorage, 
self).__init__(j_fs_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return self._j_fs_checkpoint_storage.getCheckpointPath().toString()
+
+    def get_min_file_size_threshold(self) -> int:
+        """
+        Gets the threshold below which state is stored as part of the 
metadata, rather than in
+        files. This threshold ensures that the backend does not create a large 
amount of very
+        small files, where potentially the file pointers are larger than the 
state itself.
+
+        If not explicitly configured, this is the default value of
+        
``org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD``.
+
+        :return: The file size threshold, in bytes.
+        """
+        return self._j_fs_checkpoint_storage.getMinFileSizeThreshold()

Review comment:
       ```suggestion
           return self._j_checkpoint_storage.getMinFileSizeThreshold()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.
+
+    This checkpoint storage is primarily  for experimentation, quick local 
setups, or for streaming
+    applications that have very small state: Because it requires checkpoints 
to go through the
+    JobManager's memory, larger state will occupy larger portions of the 
JobManager's main memory,
+    reducing operational stability.
+
+    For any other setup, the :class:`FileSystemCheckpointStorage` should be 
used. The
+    **FileSystemCheckpointStorage** checkpoints state directly to files rather 
than
+    to the JobManager's memory, thus supporting larger state sizes and more 
highly available
+    recovery.
+
+    **State Size Considerations**
+
+    State checkpointing with this state backend is subject to the following 
conditions:
+
+    - Each individual state must not exceed the configured maximum state size
+      (see :func:`get_max_state_size`.
+
+    - All state from one task (i.e., the sum of all operator states and keyed 
states from all
+      chained operators of the task) must not exceed what the RPC system 
supports, which is
+      be default < 10 MB. That limit can be configured up, but that is 
typically not advised.
+
+    - The sum of all states in the application times all retained checkpoints 
must comfortably
+      fit into the JobManager's JVM heap space.
+
+    **Persistence Guarantees**
+
+    For the use cases where the state sizes can be handled by this backend, 
the backend does
+    guarantee persistence for savepoints, externalized checkpoints (of 
configured), and checkpoints
+    (when high-availability is configured).
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage policy can either be 
configured within the
+    application (by creating the storage with the respective constructor 
parameters and setting
+    it on the execution environment) or by specifying it in the Flink 
configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is
+    implemented via the :func:`configure` method.
+    """
+
+    # The default maximal size that the snapshotted memory state may have (5 
MiBytes).
+    DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024
+
+    def __init__(self,
+                 checkpoint_path: str = None,
+                 max_state_size: int = None,
+                 j_jobmanager_checkpoint_storage=None):
+        """
+        Creates a new JobManagerCheckpointStorage, setting optionally the 
paths to persist
+        checkpoint metadata, as well as configuring state thresholds.
+
+        WARNING: Increasing the size of this value beyond the default value
+        (:data:`DEFAULT_MAX_STATE_SIZE`) should be done with care.
+        The checkpointed state needs to be send to the JobManager via limited 
size RPC messages,
+        and there and the JobManager needs to be able to hold all aggregated 
state in its memory.
+
+        Example:
+        ::
+            >>> checkpoint_storage = JobManagerCheckpointStorage()
+
+        :param checkpoint_path: The path to write checkpoint metadata to. If 
none, the value from
+                                the runtime configuration will be used.
+        :param max_state_size: The maximal size of the serialized state. If 
none, the
+                               :data:`DEFAULT_MAX_STATE_SIZE` will be used.
+        :param j_jobmanager_checkpoint_storage: For internal use, please keep 
none.
+        """
+        if j_jobmanager_checkpoint_storage is None:
+            gateway = get_gateway()
+            JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .JobManagerCheckpointStorage
+            if max_state_size is None:
+                max_state_size = 
JJobManagerCheckpointStorage.DEFAULT_MAX_STATE_SIZE
+            j_jobmanager_checkpoint_storage = 
JJobManagerCheckpointStorage(checkpoint_path,
+                                                                           
max_state_size)
+
+        super(CheckpointStorage, 
self).__init__(j_jobmanager_checkpoint_storage)
+
+    def get_max_state_size(self) -> int:
+        """
+        Gets the maximum size that an individual state can have, as configured 
in the
+        constructor (by default :data:`DEFAULT_MAX_STATE_SIZE`).
+
+        :return: The maximum size that an individual state can have.
+        """
+        return self._j_jobmanager_checkpoint_storage.getMaxStateSize()
+
+    def __str__(self):
+        return self._j_jobmanager_checkpoint_storage.toString()
+
+
+class FileSystemCheckpointStorage(CheckpointStorage):
+    """
+    This checkpoint storage checkpoints state as files to a file system (hence 
the name).
+
+    Each checkpoint individually will store all its files in a subdirectory 
that includes the
+    checkpoint number, such as 
``hdfs://namenode:port/flink-checkpoints/chk-17/``.
+
+    **State Size Considerations**
+
+    This checkpoint storage stores small state chunks directly with the 
metadata, to avoid creating
+    many small files. The threshold for that is configurable. When increasing 
this threshold, the
+    size of the checkpoint metadata increases. The checkpoint metadata of all 
retained completed
+    checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+    unless the threshold :func:`get_min_file_size_threshold` is increased 
significantly.
+
+    **Persistence Guarantees**
+
+    Checkpoints from this storage instance are as persistent and available as 
filesystem that is
+    written to. If the file system is a persistent distributed file system, 
this state backend
+    supports highly available setups. The backend additionally supports 
savepoints and externalized
+    checkpoints.
+
+    **Configuration**
+
+    As for all checkpoint storage, this storage can either be configured 
within the application (by
+    creating the instance with the respective constructor parameters and 
setting it on the execution
+    environment) or by specifying it in the Flink configuration.
+
+    If the checkpoint storage was specified in the application, it may pick up 
additional
+    configuration parameters from the Flink configuration. That behavior is 
implemented via
+    the :func:`configure` method.
+    """
+
+    def __init__(self,
+                 checkpoint_directory_uri: str = None,
+                 file_state_size_threshold: int = None,
+                 write_buffer_size: int = None,
+                 j_fs_checkpoint_storage=None):
+
+        if j_fs_checkpoint_storage is None:
+            gateway = get_gateway()
+            JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+                .FileSystemCheckpointStorage
+            JPath = gateway.jvm.org.apache.flink.core.fs.Path
+            if checkpoint_directory_uri is None:
+                raise ValueError("The parameter 'checkpoint_directory_uri' is 
required!")
+            j_checkpoint_directory_uri = 
JPath(checkpoint_directory_uri).toUri()
+
+            if file_state_size_threshold is None:
+                file_state_size_threshold = -1
+
+            if write_buffer_size is None:
+                write_buffer_size = -1
+
+            j_fs_checkpoint_storage = 
JFileSystemCheckpointStorage(j_checkpoint_directory_uri,
+                                                                   
file_state_size_threshold,
+                                                                   
write_buffer_size)
+
+        super(FileSystemCheckpointStorage, 
self).__init__(j_fs_checkpoint_storage)
+
+    def get_checkpoint_path(self) -> str:
+        """
+        Gets the base directory where all the checkpoints are stored.
+        The job-specific checkpoint directory is created inside this directory.
+
+        :return: The base directory for checkpoints.
+        """
+        return self._j_fs_checkpoint_storage.getCheckpointPath().toString()

Review comment:
       ```suggestion
           return self._j_checkpoint_storage.getCheckpointPath().toString()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_checkpoint_storage(self, checkpoint_storage: CheckpointStorage):
+        """
+        Sets the checkpoint storage that describes how to checkpoint operator 
state. It
+        defines where checkpointed data will be persisted.
+        The :class:`~pyflink.datastream.JobManagerCheckpointStorage` is 
lightweight without extra
+        dependencies, but can checkpoint only small states(some counters).
+        In contrast, the 
:class:`~pyflink.datastream.FileSystemCheckpointStorage` stores checkpoints
+        of the state in files. When using a replicated file system (like HDFS,
+        S3, MapR FS, Alluxio, etc) this will guarantee that state is not lost 
upon failures of
+        individual nodes and that streaming program can be executed highly 
available and strongly
+        consistent(assuming that Flink is run in high-availability mode). This 
checkpoint storage is
+        recommended for most production deployments.
+
+        The build-in state backend includes:
+            :class:`~pyflink.datastream.JobManagerCheckpointStorage`,
+            and :class:`~pyflink.datastream.FileSystemCheckpointStorage`.
+
+        .. seealso:: :func:`get_checkpoint_storage`
+
+        Example:
+        ::
+            >>> env.get_checkpoint_config()\
+                
.set_checkpoint_storage(FileSystemCheckpointStorage("file://var/checkpoints/"))
+        :param checkpoint_storage: The :class:`CheckpointStorage`.
+        """
+
+        
self._j_checkpoint_config.setCheckpointStorage(checkpoint_storage._j_checkpoint_storage)
+        return self
+
+    def get_checkpoint_storage(self) -> CheckpointStorage:
+        """
+        Returns the default :class:`CheckpointStorage` that has been 
configured for the Job.
+
+        .. seealso:: :func:`set_checkpoint_storage`
+
+        :return: The :class:`CheckpointStorage` that has been configured for 
the job.
+        or None if none has been set.

Review comment:
       correct the indentation, otherwise, the sphinx check will failed.

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the
+     name), but savepoints will be persisted to a file system.

Review comment:
       ```suggestion
       name), but savepoints will be persisted to a file system.
   ```

##########
File path: flink-python/pyflink/datastream/tests/test_check_point_config.py
##########
@@ -151,3 +152,12 @@ def test_is_unaligned_checkpointing_enabled(self):
         self.checkpoint_config.enable_unaligned_checkpoints(True)
 
         
self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+    def test_set_checkpoint_storage(self):
+
+        self.assertNone(self.checkpoint_config.get_checkpoint_storage())

Review comment:
       ```suggestion
           self.assertIsNone(self.checkpoint_config.get_checkpoint_storage())
   ```

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -280,7 +276,38 @@ def set_state_backend(self, state_backend: StateBackend) 
-> 'StreamExecutionEnvi
             
self._j_stream_execution_environment.setStateBackend(state_backend._j_state_backend)
         return self
 
-    def set_restart_strategy(self, restart_strategy_configuration: 
RestartStrategyConfiguration):
+    def set_default_savepoint_directory(self, savepoint_path: str):
+        """
+        Sets the default savepoint directory, where savepoints will be written 
to
+        if none is explicitly provided when triggered.
+
+        .. seealso: :func:`get_default_savepoint_directory`
+
+        Example:
+        ::
+
+            >>> env.set_default_savepoint_directory("file://var/savepoints")
+
+        :param savepoint_path: The savepoint directory
+        :return: This object.
+        """
+
+        self._j_stream_execution_environment = \
+            
self._j_stream_execution_environment.setDefaultSavepointDirectory(savepoint_path)
+        return self
+
+    def get_default_savepoint_directory(self) -> Optional[str]:
+        """
+        Gets the default savepoint directory, or None is none has been set.
+
+        .. seealso: :func:`set_default_savepoint_directory
+
+        :return: The default savepoint directory
+        """
+
+        return 
self._j_stream_execution_environment.setDefaultSavepointDirectory()

Review comment:
       ```suggestion
           return 
self._j_stream_execution_environment.getDefaultSavepointDirectory()
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_checkpoint_storage(self, checkpoint_storage: CheckpointStorage):
+        """
+        Sets the checkpoint storage that describes how to checkpoint operator 
state. It
+        defines where checkpointed data will be persisted.
+        The :class:`~pyflink.datastream.JobManagerCheckpointStorage` is 
lightweight without extra
+        dependencies, but can checkpoint only small states(some counters).
+        In contrast, the 
:class:`~pyflink.datastream.FileSystemCheckpointStorage` stores checkpoints
+        of the state in files. When using a replicated file system (like HDFS,
+        S3, MapR FS, Alluxio, etc) this will guarantee that state is not lost 
upon failures of
+        individual nodes and that streaming program can be executed highly 
available and strongly
+        consistent(assuming that Flink is run in high-availability mode). This 
checkpoint storage is
+        recommended for most production deployments.
+
+        The build-in state backend includes:
+            :class:`~pyflink.datastream.JobManagerCheckpointStorage`,
+            and :class:`~pyflink.datastream.FileSystemCheckpointStorage`.
+
+        .. seealso:: :func:`get_checkpoint_storage`
+
+        Example:

Review comment:
       Need add empty lines around the examples as following, otherwise, sphinx 
check will failed. Besides, we need also remove the *\* after 
*env.get_checkpoint_config()* and add *...* before*.set_checkpoint_storage* to 
make the example in the generated sphinx doc having a good format:
   ```
   Example:
   ::
   
   >>> env.get_checkpoint_config()
   ...    
.set_checkpoint_storage(FileSystemCheckpointStorage("file://var/checkpoints/"))
   
   :param checkpoint_storage: The :class:`CheckpointStorage`.
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_storage.py
##########
@@ -0,0 +1,296 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from py4j.java_gateway import get_java_class
+
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'CheckpointStorage',
+    'JobManagerCheckpointStorage',
+    'FileSystemCheckpointStorage',
+    'CustomCheckpointStorage'
+]
+
+
+def _from_j_checkpoint_storage(j_checkpoint_storage):
+    if j_checkpoint_storage is None:
+        return None
+    gateway = get_gateway()
+    JCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.CheckpointStorage
+    JJobManagerCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .JobManagerCheckpointStorage
+    JFileSystemCheckpointStorage = 
gateway.jvm.org.apache.flink.runtime.state.storage \
+        .FileSystemCheckpointStorage
+    j_clz = j_checkpoint_storage.getClass()
+
+    if not get_java_class(JCheckpointStorage).isAssignableFrom(j_clz):
+        raise TypeError("The input %s is not an instance of CheckpointStorage."
+                        % j_checkpoint_storage)
+
+    if get_java_class(JJobManagerCheckpointStorage).isAssignableFrom(j_clz):
+        return 
JobManagerCheckpointStorage(j_jobmanager_checkpoint_storage=j_checkpoint_storage)
+    elif get_java_class(JFileSystemCheckpointStorage).isAssignableFrom(j_clz):
+        return 
FileSystemCheckpointStorage(j_fs_checkpoint_storage=j_checkpoint_storage)
+    else:
+        return CustomCheckpointStorage(j_checkpoint_storage)  # users' 
customized state backend
+
+
+class CheckpointStorage(abc.ABC):
+    """
+    CheckpointStorage defines how :class:`StateBackend`'s checkpoint their 
state for fault
+    tolerance in streaming applications. Various implementations store their 
checkpoints in
+    different fashions and have different requirements and availability 
guarantees.
+
+    For example, :class:`JobManagerCheckpointStorage` stores checkpoints in 
the memory of
+    the JobManager. It is lightweight and without additional dependencies but 
is not highly
+    available and only supports small state sizes. This checkpoint storage 
policy is convenient
+    for local testing and development.
+
+    :class`FileSystemCheckpointStorage` stores checkpoints in a filesystem. 
For systems like HDFS,
+    NFS Drives, S3, and GCS, this storage policy supports large state size, in 
the magnitude of
+    many terabytes while providing a highly available foundation for stateful 
applications. This
+    checkpoint storage policy is recommended for most production deployments.
+
+    **Raw Bytes Storage**
+
+    The **CheckpointStorage** creates services for raw bytes storage.
+
+    The raw bytes storage (through the CheckpointStreamFactory) is the 
fundamental service that
+    simply stores bytes in a fault tolerant fashion. This service is used by 
the JobManager
+    to store checkpoint and recovery metadata and is typically also used by 
the keyed- and operator
+    state backends to store checkpointed state.
+
+    **Serializability**
+
+    Checkpoint storage need to be serializable, because they distributed 
across parallel processes
+    (for distributed execution) together with the streaming application code.
+
+    Because of that, **CheckpointStorage** implementations are meant to be 
like factories
+    that create the proper states stores that provide access to the 
persistent. That way, the
+    Checkpoint Storage can be very lightweight (contain only configurations) 
which makes it easier
+    to be serializable.
+
+    **Thread Safety**
+
+    Checkpoint storage implementations have to be thread-safe. Multiple 
threads may be creating
+    streams concurrently.
+    """
+
+    def __init__(self, j_checkpoint_storage):
+        self._j_checkpoint_storage = j_checkpoint_storage
+
+
+class JobManagerCheckpointStorage(CheckpointStorage):
+    """
+     This **CheckpointStorage** checkpoints state directly to the JobManager's 
memory (hence the

Review comment:
       ```suggestion
       This **CheckpointStorage** checkpoints state directly to the 
JobManager's memory (hence the
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to