dianfu commented on a change in pull request #15635:
URL: https://github.com/apache/flink/pull/15635#discussion_r614522682
##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) ->
'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self
+ def set_alignment_timeout(self, alignment_timeout: Duration) ->
'CheckpointConfig':
+ """
+ Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+ If ``alignment_timeout`` has value equal to ``0``, checkpoints will
always start unaligned.
+ If ``alignment_timeout`` has value greater then ``0``, checkpoints
will start aligned. If
+ during checkpointing, checkpoint start delay exceeds this
``alignment_timeout``, alignment
+ will timeout and checkpoint will start working as unaligned checkpoint.
+
+ :param alignment_timeout: The duration until the aligned checkpoint
will be converted into
+ an unaligned checkpoint.
+ """
+
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+ return self
+
+ def get_alignment_timeout(self) -> 'CheckpointConfig':
+ """
+ Returns the alignment timeout, as configured via
:func:`set_alignment_timeout` or
+
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+ :return: the alignment timeout.
+ """
+ return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+ def set_force_unaligned_checkpoints(self, enabled: bool = True) ->
'CheckpointConfig':
+ """
+ Checks whether Unaligned Checkpoints are forced, despite currently
non-checkpointable
+ iteration feedback or custom partitioners.
+
+ :param forceUnalignedCheckpoints: The flag to force unaligned
checkpoints.
Review comment:
the param name in the doc `forceUnalignedCheckpoints ` doesn't match
with the actual param name `enabled`
##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) ->
'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self
+ def set_alignment_timeout(self, alignment_timeout: Duration) ->
'CheckpointConfig':
+ """
+ Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+ If ``alignment_timeout`` has value equal to ``0``, checkpoints will
always start unaligned.
+ If ``alignment_timeout`` has value greater then ``0``, checkpoints
will start aligned. If
+ during checkpointing, checkpoint start delay exceeds this
``alignment_timeout``, alignment
+ will timeout and checkpoint will start working as unaligned checkpoint.
+
+ :param alignment_timeout: The duration until the aligned checkpoint
will be converted into
+ an unaligned checkpoint.
+ """
+
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+ return self
+
+ def get_alignment_timeout(self) -> 'CheckpointConfig':
Review comment:
```suggestion
def get_alignment_timeout(self) -> 'Duration':
```
##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) ->
'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self
+ def set_alignment_timeout(self, alignment_timeout: Duration) ->
'CheckpointConfig':
+ """
+ Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+ If ``alignment_timeout`` has value equal to ``0``, checkpoints will
always start unaligned.
+ If ``alignment_timeout`` has value greater then ``0``, checkpoints
will start aligned. If
+ during checkpointing, checkpoint start delay exceeds this
``alignment_timeout``, alignment
+ will timeout and checkpoint will start working as unaligned checkpoint.
+
+ :param alignment_timeout: The duration until the aligned checkpoint
will be converted into
+ an unaligned checkpoint.
+ """
+
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+ return self
+
+ def get_alignment_timeout(self) -> 'CheckpointConfig':
+ """
+ Returns the alignment timeout, as configured via
:func:`set_alignment_timeout` or
+
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+ :return: the alignment timeout.
+ """
+ return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+ def set_force_unaligned_checkpoints(self, enabled: bool = True) ->
'CheckpointConfig':
+ """
+ Checks whether Unaligned Checkpoints are forced, despite currently
non-checkpointable
Review comment:
```suggestion
Checks whether unaligned checkpoints are forced, despite currently
non-checkpointable
```
##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) ->
'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self
+ def set_alignment_timeout(self, alignment_timeout: Duration) ->
'CheckpointConfig':
+ """
+ Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+ If ``alignment_timeout`` has value equal to ``0``, checkpoints will
always start unaligned.
+ If ``alignment_timeout`` has value greater then ``0``, checkpoints
will start aligned. If
+ during checkpointing, checkpoint start delay exceeds this
``alignment_timeout``, alignment
+ will timeout and checkpoint will start working as unaligned checkpoint.
+
+ :param alignment_timeout: The duration until the aligned checkpoint
will be converted into
+ an unaligned checkpoint.
+ """
+
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+ return self
+
+ def get_alignment_timeout(self) -> 'CheckpointConfig':
+ """
+ Returns the alignment timeout, as configured via
:func:`set_alignment_timeout` or
+
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+ :return: the alignment timeout.
+ """
+ return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+ def set_force_unaligned_checkpoints(self, enabled: bool = True) ->
'CheckpointConfig':
+ """
+ Checks whether Unaligned Checkpoints are forced, despite currently
non-checkpointable
+ iteration feedback or custom partitioners.
+
+ :param forceUnalignedCheckpoints: The flag to force unaligned
checkpoints.
+ """
+ self._j_checkpoint_config.setForceUnalignedCheckpoints(enabled)
+ return self
+
+ def is_force_unaligned_checkpoints(self) -> 'CheckpointConfig':
+ """
+ Checks whether unaligned checkpoints are forced, despite iteration
feedback or custom
+ partitioners.
+
+ :return: True, if unaligned checkpoints are forced, false otherwise.
+ """
+ return self._j_checkpoint_config.isForceUnalignedCheckpoints()
Review comment:
checkstyle: should have two blank lines.
##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) ->
'CheckpointConfig':
self.enable_unaligned_checkpoints(False)
return self
+ def set_alignment_timeout(self, alignment_timeout: Duration) ->
'CheckpointConfig':
+ """
+ Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+ If ``alignment_timeout`` has value equal to ``0``, checkpoints will
always start unaligned.
+ If ``alignment_timeout`` has value greater then ``0``, checkpoints
will start aligned. If
+ during checkpointing, checkpoint start delay exceeds this
``alignment_timeout``, alignment
+ will timeout and checkpoint will start working as unaligned checkpoint.
+
+ :param alignment_timeout: The duration until the aligned checkpoint
will be converted into
+ an unaligned checkpoint.
Review comment:
checkstyle issue: unexpected unindent, should add some empty spaces at
the beginning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]