This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e1c81b7089b07a4c0ab5c7881a6617b5b27ac57 Author: Arvid Heise <[email protected]> AuthorDate: Thu Jun 18 09:51:12 2020 +0200 [FLINK-18064][python] Adding unaligned checkpoint config options. --- .../pyflink/datastream/checkpoint_config.py | 39 ++++++++++++++++++++++ .../datastream/tests/test_check_point_config.py | 16 +++++++++ 2 files changed, 55 insertions(+) diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py index 3812c39..084939b 100644 --- a/flink-python/pyflink/datastream/checkpoint_config.py +++ b/flink-python/pyflink/datastream/checkpoint_config.py @@ -268,6 +268,45 @@ class CheckpointConfig(object): cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup() return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup(cleanup_mode) + def is_unaligned_checkpoints_enabled(self): + """ + Returns whether unaligned checkpoints are enabled. + + :return: ``True`` if unaligned checkpoints are enabled. + """ + return self._j_checkpoint_config.isUnalignedCheckpointsEnabled() + + def enable_unaligned_checkpoints(self, enabled=True): + """ + Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure. + + Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which + allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes + independent of the current throughput as checkpoint barriers are effectively not embedded + into the stream of data anymore. + + Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is + :data:`CheckpointingMode.EXACTLY_ONCE`. + + :param enabled: ``True`` if a checkpoints should be taken in unaligned mode. + """ + self._j_checkpoint_config.enableUnalignedCheckpoints(enabled) + + def disable_unaligned_checkpoints(self): + """ + Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure + (experimental). + + Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which + allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes + independent of the current throughput as checkpoint barriers are effectively not embedded + into the stream of data anymore. + + Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is + :data:`CheckpointingMode.EXACTLY_ONCE`. + """ + self.enable_unaligned_checkpoints(False) + class ExternalizedCheckpointCleanup(object): """ diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py index 4a10bb7..67b22d2 100644 --- a/flink-python/pyflink/datastream/tests/test_check_point_config.py +++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py @@ -135,3 +135,19 @@ class CheckpointConfigTests(PyFlinkTestCase): self.checkpoint_config.set_prefer_checkpoint_for_recovery(True) self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery()) + + def test_is_unaligned_checkpointing_enabled(self): + + self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled()) + + self.checkpoint_config.enable_unaligned_checkpoints() + + self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled()) + + self.checkpoint_config.disable_unaligned_checkpoints() + + self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled()) + + self.checkpoint_config.enable_unaligned_checkpoints(True) + + self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())
