Vancior commented on a change in pull request #18957:
URL: https://github.com/apache/flink/pull/18957#discussion_r823283875
##########
File path: flink-python/pyflink/datastream/window.py
##########
@@ -517,3 +538,721 @@ def __init__(self,
self.window_state_descriptor = window_state_descriptor
self.internal_window_function = internal_window_function
self.window_serializer = window_serializer
+
+
+class EventTimeTrigger(Trigger[T, TimeWindow]):
+ """
+ A Trigger that fires once the watermark passes the end of the window to
which a pane belongs.
+ """
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if window.max_timestamp() <= ctx.get_current_watermark():
+ return TriggerResult.FIRE
+ else:
+ ctx.register_event_time_timer(window.max_timestamp())
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if time == window.max_timestamp():
+ return TriggerResult.FIRE
+ else:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: TimeWindow,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ window_max_timestamp = window.max_timestamp()
+ if window_max_timestamp > ctx.get_current_watermark():
+ ctx.register_event_time_timer(window_max_timestamp)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> None:
+ ctx.delete_event_time_timer(window.max_timestamp())
+
+
+class ProcessingTimeTrigger(Trigger[T, TimeWindow]):
+ """
+ A Trigger that fires once the current system time passes the end of the
window to
+ which a pane belongs.
+ """
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ ctx.register_processing_time_timer(window.max_timestamp())
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.FIRE
+
+ def on_event_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: W,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ window_max_timestamp = window.max_timestamp()
+ if window_max_timestamp > ctx.get_current_processing_time():
+ ctx.register_processing_time_timer(window_max_timestamp)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> None:
+ ctx.delete_processing_time_timer(window.max_timestamp())
+
+
+class CountTrigger(Trigger[T, CountWindow]):
+ """
+ A Trigger that fires once the count of elements in a pane reaches the
given count.
+ """
+
+ def __init__(self, window_size: int):
+ self._window_size = window_size
+ self._count_state_descriptor = ReducingStateDescriptor(
+ "trigger_counter", lambda a, b: a + b, Types.LONG())
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ state = ctx.get_partitioned_state(self._count_state_descriptor) #
type: ReducingState
+ state.add(1)
+ if state.get() >= self._window_size:
+ # On FIRE, the window is evaluated and results are emitted. The
window is not purged
+ # though, all elements are retained.
+ return TriggerResult.FIRE
+ else:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_merge(self, window: CountWindow, ctx: Trigger.OnMergeContext) ->
None:
+ ctx.merge_partitioned_state(self._count_state_descriptor)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None:
+ state = ctx.get_partitioned_state(self._count_state_descriptor)
+ state.clear()
+
+
+class TumblingWindowAssigner(WindowAssigner[T, TimeWindow]):
+ """
+ A WindowAssigner that windows elements into windows based on the current
system time of the
+ machine the operation is running on. Windows cannot overlap.
+ For example, in order to window into windows of 1 minute, every 10 seconds:
+ ::
+ >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(TumblingWindowAssigner(Time.minutes(1),
Time.seconds(10), False))
+
+ A WindowAssigner that windows elements into windows based on the timestamp
of the elements.
+ Windows cannot overlap.
+ For example, in order to window into windows of 1 minute:
+ ::
+ >>>
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ >>> .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(TumblingWindowAssigner(60000, 0, True))
+ """
+
+ def __init__(self, size: int, offset: int, is_event_time: bool):
+ if abs(offset) >= size:
+ raise Exception("TumblingWindowAssigner parameters must satisfy
abs(offset) < size")
+
+ self._size = size
+ self._offset = offset
+ self._is_event_time = is_event_time
+
+ def assign_windows(self,
+ element: T,
+ timestamp: int,
+ context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
+ if self._is_event_time is False:
+ current_processing_time = context.get_current_processing_time()
+ start =
TimeWindow.get_window_start_with_offset(current_processing_time, self._offset,
+ self._size)
+ return [TimeWindow(start, start + self._size)]
+ else:
+ if timestamp > MIN_LONG_VALUE:
+ start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
+ return [TimeWindow(start, start + self._size)]
+ else:
+ raise Exception("Record has MIN_LONG_VALUE timestamp (= no
timestamp marker). "
Review comment:
maybe use `jvm.java.lang.Long.MIN_VALUE` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]