dianfu commented on a change in pull request #15635:
URL: https://github.com/apache/flink/pull/15635#discussion_r614522682



##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+        an unaligned checkpoint.
+        """
+        
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+        return self
+
+    def get_alignment_timeout(self) -> 'CheckpointConfig':
+        """
+        Returns the alignment timeout, as configured via 
:func:`set_alignment_timeout` or
+        
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+        :return: the alignment timeout.
+        """
+        return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+    def set_force_unaligned_checkpoints(self, enabled: bool = True) -> 
'CheckpointConfig':
+        """
+        Checks whether Unaligned Checkpoints are forced, despite currently 
non-checkpointable
+        iteration feedback or custom partitioners.
+
+        :param forceUnalignedCheckpoints: The flag to force unaligned 
checkpoints.

Review comment:
       the param name in the doc `forceUnalignedCheckpoints ` doesn't match 
with the actual param name `enabled`

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+        an unaligned checkpoint.
+        """
+        
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+        return self
+
+    def get_alignment_timeout(self) -> 'CheckpointConfig':

Review comment:
       ```suggestion
       def get_alignment_timeout(self) -> 'Duration':
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+        an unaligned checkpoint.
+        """
+        
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+        return self
+
+    def get_alignment_timeout(self) -> 'CheckpointConfig':
+        """
+        Returns the alignment timeout, as configured via 
:func:`set_alignment_timeout` or
+        
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+        :return: the alignment timeout.
+        """
+        return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+    def set_force_unaligned_checkpoints(self, enabled: bool = True) -> 
'CheckpointConfig':
+        """
+        Checks whether Unaligned Checkpoints are forced, despite currently 
non-checkpointable

Review comment:
       ```suggestion
           Checks whether unaligned checkpoints are forced, despite currently 
non-checkpointable
   ```

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+        an unaligned checkpoint.
+        """
+        
self._j_checkpoint_config.setAlignmentTimeout(alignment_timeout._j_duration)
+        return self
+
+    def get_alignment_timeout(self) -> 'CheckpointConfig':
+        """
+        Returns the alignment timeout, as configured via 
:func:`set_alignment_timeout` or
+        
``org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT``.
+
+        :return: the alignment timeout.
+        """
+        return Duration(self._j_checkpoint_config.getAlignmentTimeout())
+
+    def set_force_unaligned_checkpoints(self, enabled: bool = True) -> 
'CheckpointConfig':
+        """
+        Checks whether Unaligned Checkpoints are forced, despite currently 
non-checkpointable
+        iteration feedback or custom partitioners.
+
+        :param forceUnalignedCheckpoints: The flag to force unaligned 
checkpoints.
+        """
+        self._j_checkpoint_config.setForceUnalignedCheckpoints(enabled)
+        return self
+
+    def is_force_unaligned_checkpoints(self) -> 'CheckpointConfig':
+        """
+        Checks whether unaligned checkpoints are forced, despite iteration 
feedback or custom
+        partitioners.
+
+        :return: True, if unaligned checkpoints are forced, false otherwise.
+        """
+        return self._j_checkpoint_config.isForceUnalignedCheckpoints()
 

Review comment:
       checkstyle: should have two blank lines.

##########
File path: flink-python/pyflink/datastream/checkpoint_config.py
##########
@@ -330,6 +331,48 @@ def disable_unaligned_checkpoints(self) -> 
'CheckpointConfig':
         self.enable_unaligned_checkpoints(False)
         return self
 
+    def set_alignment_timeout(self, alignment_timeout: Duration) -> 
'CheckpointConfig':
+        """
+        Only relevant if :func:`enable_unaligned_checkpoints` is enabled.
+
+        If ``alignment_timeout`` has value equal to ``0``, checkpoints will 
always start unaligned.
+        If ``alignment_timeout`` has value greater then ``0``, checkpoints 
will start aligned. If
+        during checkpointing, checkpoint start delay exceeds this 
``alignment_timeout``, alignment
+        will timeout and checkpoint will start working as unaligned checkpoint.
+
+        :param alignment_timeout: The duration until the aligned checkpoint 
will be converted into
+        an unaligned checkpoint.

Review comment:
       checkstyle issue: unexpected unindent, should add some empty spaces at 
the beginning.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to