Vancior commented on a change in pull request #18957:
URL: https://github.com/apache/flink/pull/18957#discussion_r823294596
##########
File path: flink-python/pyflink/datastream/window.py
##########
@@ -517,3 +538,721 @@ def __init__(self,
self.window_state_descriptor = window_state_descriptor
self.internal_window_function = internal_window_function
self.window_serializer = window_serializer
+
+
+class EventTimeTrigger(Trigger[T, TimeWindow]):
+ """
+ A Trigger that fires once the watermark passes the end of the window to
which a pane belongs.
+ """
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if window.max_timestamp() <= ctx.get_current_watermark():
+ return TriggerResult.FIRE
+ else:
+ ctx.register_event_time_timer(window.max_timestamp())
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if time == window.max_timestamp():
+ return TriggerResult.FIRE
+ else:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: TimeWindow,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ window_max_timestamp = window.max_timestamp()
+ if window_max_timestamp > ctx.get_current_watermark():
+ ctx.register_event_time_timer(window_max_timestamp)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> None:
+ ctx.delete_event_time_timer(window.max_timestamp())
+
+
+class ProcessingTimeTrigger(Trigger[T, TimeWindow]):
+ """
+ A Trigger that fires once the current system time passes the end of the
window to
+ which a pane belongs.
+ """
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ ctx.register_processing_time_timer(window.max_timestamp())
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.FIRE
+
+ def on_event_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: W,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ window_max_timestamp = window.max_timestamp()
+ if window_max_timestamp > ctx.get_current_processing_time():
+ ctx.register_processing_time_timer(window_max_timestamp)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> None:
+ ctx.delete_processing_time_timer(window.max_timestamp())
+
+
+class CountTrigger(Trigger[T, CountWindow]):
+ """
+ A Trigger that fires once the count of elements in a pane reaches the
given count.
+ """
+
+ def __init__(self, window_size: int):
+ self._window_size = window_size
+ self._count_state_descriptor = ReducingStateDescriptor(
+ "trigger_counter", lambda a, b: a + b, Types.LONG())
+
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ state = ctx.get_partitioned_state(self._count_state_descriptor) #
type: ReducingState
+ state.add(1)
+ if state.get() >= self._window_size:
+ # On FIRE, the window is evaluated and results are emitted. The
window is not purged
+ # though, all elements are retained.
+ return TriggerResult.FIRE
+ else:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: CountWindow,
+ ctx: Trigger.TriggerContext) -> TriggerResult:
+ # No action is taken on the window.
+ return TriggerResult.CONTINUE
+
+ def on_merge(self, window: CountWindow, ctx: Trigger.OnMergeContext) ->
None:
+ ctx.merge_partitioned_state(self._count_state_descriptor)
+
+ def can_merge(self) -> bool:
+ return True
+
+ def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None:
+ state = ctx.get_partitioned_state(self._count_state_descriptor)
+ state.clear()
+
+
+class TumblingWindowAssigner(WindowAssigner[T, TimeWindow]):
+ """
+ A WindowAssigner that windows elements into windows based on the current
system time of the
+ machine the operation is running on. Windows cannot overlap.
+ For example, in order to window into windows of 1 minute, every 10 seconds:
+ ::
+ >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(TumblingWindowAssigner(Time.minutes(1),
Time.seconds(10), False))
+
+ A WindowAssigner that windows elements into windows based on the timestamp
of the elements.
+ Windows cannot overlap.
+ For example, in order to window into windows of 1 minute:
+ ::
+ >>>
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ >>> .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(TumblingWindowAssigner(60000, 0, True))
+ """
+
+ def __init__(self, size: int, offset: int, is_event_time: bool):
+ if abs(offset) >= size:
+ raise Exception("TumblingWindowAssigner parameters must satisfy
abs(offset) < size")
+
+ self._size = size
+ self._offset = offset
+ self._is_event_time = is_event_time
+
+ def assign_windows(self,
+ element: T,
+ timestamp: int,
+ context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
+ if self._is_event_time is False:
+ current_processing_time = context.get_current_processing_time()
+ start =
TimeWindow.get_window_start_with_offset(current_processing_time, self._offset,
+ self._size)
+ return [TimeWindow(start, start + self._size)]
+ else:
+ if timestamp > MIN_LONG_VALUE:
+ start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
+ return [TimeWindow(start, start + self._size)]
+ else:
+ raise Exception("Record has MIN_LONG_VALUE timestamp (= no
timestamp marker). "
+ + "Is the time characteristic set to
'ProcessingTime', or did you forget to call "
+ +
"'data_stream.assign_timestamps_and_watermarks(...)'?")
+
+ def get_default_trigger(self, env) -> Trigger[T, W]:
+ if self._is_event_time is True:
+ return EventTimeTrigger()
+ else:
+ return ProcessingTimeTrigger()
+
+ def get_window_serializer(self) -> TypeSerializer[W]:
+ return TimeWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return self._is_event_time
+
+ def __repr__(self):
+ return "TumblingWindowAssigner(%s,%s,%s)" % (self._size, self._offset,
self.is_event_time)
+
+
+class CountTumblingWindowAssigner(WindowAssigner[T, CountWindow]):
+ """
+ A WindowAssigner that windows elements into fixed-size windows based on
the count number
+ of the elements. Windows cannot overlap.
+ """
+
+ def __init__(self, window_size: int):
+ """
+ Windows this KeyedStream into tumbling count windows.
+ :param window_size: The size of the windows in number of elements.
+ """
+ self._window_size = window_size
+ self._counter_state_descriptor = ReducingStateDescriptor(
+ "assigner_counter", lambda a, b: a + b, Types.LONG())
+
+ @staticmethod
+ def of(window_size: int):
+ return CountTumblingWindowAssigner(window_size)
+
+ def assign_windows(self,
+ element: T,
+ timestamp: int,
+ context: 'WindowAssigner.WindowAssignerContext') ->
Collection[W]:
+ counter = context.get_runtime_context().get_reducing_state(
+ self._counter_state_descriptor)
+ if counter.get() is None:
+ counter.add(0)
+ result = [CountWindow(counter.get() // self._window_size)]
+ counter.add(1)
+ return result
+
+ def get_default_trigger(self, env) -> Trigger[T, W]:
+ return CountTrigger(self._window_size)
+
+ def get_window_serializer(self) -> TypeSerializer[W]:
+ return CountWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return False
+
+ def __repr__(self) -> str:
+ return "CountTumblingWindowAssigner(%s)" % (self._window_size)
+
+
+class SlidingWindowAssigner(WindowAssigner[T, TimeWindow]):
+ """
+ A WindowAssigner that windows elements into sliding windows based on the
current system time
+ of the machine the operation is running on. Windows can possibly overlap.
+ For example, in order to window into windows of 1 minute, every 10 seconds:
+ ::
+ >>> data_stream.key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(SlidingWindowAssigner(60000, 10000, 0, False))
+
+ A WindowAssigner that windows elements into sliding windows based on the
timestamp of the
+ elements. Windows can possibly overlap.
+ For example, in order to window into windows of 1 minute, every 10 seconds:
+ ::
+ >>>
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ >>> .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ >>> .window(SlidingWindowAssigner(60000, 10000, 0, True))
+ """
+
+ def __init__(self, size: int, slide: int, offset: int, is_event_time:
bool):
+ if abs(offset) >= slide or size <= 0:
+ raise Exception("SlidingWindowAssigner parameters must satisfy "
+ + "abs(offset) < slide and size > 0")
+
+ self._size = size
+ self._slide = slide
+ self._offset = offset
+ self._is_event_time = is_event_time
+ self._pane_size = math.gcd(size, slide)
+ self._num_panes_per_window = size // self._pane_size
+
+ def assign_windows(
+ self,
+ element: T,
+ timestamp: int,
+ context: 'WindowAssigner.WindowAssignerContext') -> Collection[W]:
+ if self._is_event_time is False:
+ current_processing_time = context.get_current_processing_time()
+ last_start = TimeWindow.get_window_start_with_offset(
+ current_processing_time, self._offset, self._slide)
+ windows = [TimeWindow(start, start + self._size)
+ for start in range(last_start,
+ current_processing_time -
self._size, -self._slide)]
+ return windows
+ else:
+ if timestamp > MIN_LONG_VALUE:
+ last_start = TimeWindow.get_window_start_with_offset(timestamp,
+
self._offset, self._slide)
+ windows = [TimeWindow(start, start + self._size)
+ for start in range(last_start, timestamp -
self._size, -self._slide)]
+ return windows
+ else:
+ raise Exception("Record has MIN_LONG_VALUE timestamp (= no
timestamp marker). "
+ + "Is the time characteristic set to
'ProcessingTime', "
+ "or did you forget to call "
+ +
"'data_stream.assign_timestamps_and_watermarks(...)'?")
+
+ def get_default_trigger(self, env) -> Trigger[T, W]:
+ if self._is_event_time is True:
+ return EventTimeTrigger()
+ else:
+ return ProcessingTimeTrigger()
+
+ def get_window_serializer(self) -> TypeSerializer[W]:
+ return TimeWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return self._is_event_time
+
+ def __repr__(self) -> str:
+ return "SlidingWindowAssigner(%s, %s, %s, %s)" % (
+ self._size, self._slide, self._offset, self._is_event_time)
+
+
+class CountSlidingWindowAssigner(WindowAssigner[T, CountWindow]):
+ """
+ A WindowAssigner that windows elements into sliding windows based on the
count number of
+ the elements. Windows can possibly overlap.
+ """
+
+ def __init__(self, window_size: int, window_slide: int):
+ """
+ Windows this KeyedStream into sliding count windows.
+ :param window_size: The size of the windows in number of elements.
+ :param window_slide: The slide interval in number of elements.
+ """
+ self._window_size = window_size
+ self._window_slide = window_slide
+ self._count = None # type: ValueState
+ self._counter_state_descriptor = ReducingStateDescriptor(
+ "slide-count-assigner", lambda a, b: a + b, Types.LONG())
+
+ def assign_windows(self,
+ element: T,
+ timestamp: int,
+ context: 'WindowAssigner.WindowAssignerContext') ->
Collection[W]:
+ count_descriptor = ValueStateDescriptor('slide-count-assigner',
Types.LONG())
+ self._count = context.get_runtime_context().get_state(count_descriptor)
+ count_value = self._count.value()
+ if count_value is None:
+ current_count = 0
+ else:
+ current_count = count_value
+ self._count.update(current_count + 1)
+ last_id = current_count // self._window_slide
+ last_start = last_id * self._window_slide
+ last_end = last_start + self._window_size - 1
+ windows = []
+ while last_id >= 0 and last_start <= current_count <= last_end:
+ if last_start <= current_count <= last_end:
+ windows.append(CountWindow(last_id))
+ last_id -= 1
+ last_start -= self._window_slide
+ last_end -= self._window_slide
+ return windows
+
+ def get_default_trigger(self, env) -> Trigger[T, W]:
+ return CountTrigger(self._window_size)
+
+ def get_window_serializer(self) -> TypeSerializer[W]:
+ return CountWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return False
+
+ def __repr__(self):
+ return "CountSlidingWindowAssigner(%s, %s)" % (self._window_size,
self._window_slide)
+
+
+class SessionWindowAssigner(MergingWindowAssigner[T, TimeWindow]):
+ """
+ WindowAssigner that windows elements into sessions based on the
timestamp. Windows cannot
+ overlap.
+ """
+
+ def __init__(self, session_gap: int, is_event_time: bool):
+ if session_gap <= 0:
+ raise Exception("SessionWindowAssigner parameters must satisfy 0 <
size")
+
+ self._session_gap = session_gap
+ self._is_event_time = is_event_time
+
+ def merge_windows(self,
+ windows: Iterable[W],
+ callback: 'MergingWindowAssigner.MergeCallback[W]') ->
None:
+ window_list = [w for w in windows]
+ window_list.sort()
+ for i in range(1, len(window_list)):
Review comment:
This logic is incorrect when receiving late records, checkout java
implementation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]