dianfu commented on code in PR #19421:
URL: https://github.com/apache/flink/pull/19421#discussion_r848998698


##########
flink-python/pyflink/datastream/window.py:
##########
@@ -48,6 +48,8 @@
            'Trigger',
            'EventTimeTrigger',
            'ProcessingTimeTrigger',
+           'ContinuousEventTimeTrigger',
+           'CountTumblingWindowAssigner',

Review Comment:
   ```suggestion
              'ContinuousProcessingTimeTrigger',
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -635,6 +637,90 @@ def clear(self,
         ctx.delete_event_time_timer(window.max_timestamp())
 
 
+class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval. This 
fires based Watermarks.
+    """
+
+    def __init__(self, interval: int):
+        self.interval = interval
+        self.state_desc = ReducingStateDescriptor("fire-time", Min, 
Types.LONG())
+        self.fire_timestamp_state = None
+
+    @staticmethod
+    def of(interval: Time) -> 'ContinuousEventTimeTrigger':
+        return ContinuousEventTimeTrigger(interval.to_milliseconds())
+
+    def on_element(self, element: T,

Review Comment:
   ```suggestion
       def on_element(self,
                      element: T,
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -677,6 +763,80 @@ def clear(self,
         ctx.delete_processing_time_timer(window.max_timestamp())
 
 
+class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval as 
measured by the clock of the
+    machine on which the job is running.
+    Type parameters:<W> – The type of Windows on which this trigger can 
operate.

Review Comment:
   ```suggestion
   ```



##########
flink-python/pyflink/datastream/window.py:
##########
@@ -677,6 +763,80 @@ def clear(self,
         ctx.delete_processing_time_timer(window.max_timestamp())
 
 
+class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
+    """
+    A Trigger that continuously fires based on a given time interval as 
measured by the clock of the
+    machine on which the job is running.
+    Type parameters:<W> – The type of Windows on which this trigger can 
operate.
+    """
+
+    def __init__(self, interval: int):
+        self.interval = interval
+        self.state_desc = ReducingStateDescriptor("fire-time", Min, 
Types.LONG())
+        self.fire_timestamp_state = None
+
+    @staticmethod
+    def of(interval: Time) -> 'ContinuousEventTimeTrigger':
+        return ContinuousEventTimeTrigger(interval.to_milliseconds())

Review Comment:
   ```suggestion
           return ContinuousProcessingTimeTrigger(interval.to_milliseconds())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to