This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5fb8d24c1c3 [FLINK-28921][python][docs] Optimize the Python DataStream
Window Documentation
5fb8d24c1c3 is described below
commit 5fb8d24c1c369ab33ffe807d76e54a2080c56668
Author: huangxingbo <[email protected]>
AuthorDate: Fri Aug 12 11:33:27 2022 +0800
[FLINK-28921][python][docs] Optimize the Python DataStream Window
Documentation
This closes #20557.
---
.../docs/dev/datastream/operators/windows.md | 126 +++--
.../dev/python/datastream/operators/windows.md | 564 +--------------------
.../docs/dev/datastream/operators/windows.md | 127 +++--
.../dev/python/datastream/operators/windows.md | 564 +--------------------
4 files changed, 170 insertions(+), 1211 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/operators/windows.md
b/docs/content.zh/docs/dev/datastream/operators/windows.md
index 8dd7b198b37..de284b6326b 100644
--- a/docs/content.zh/docs/dev/datastream/operators/windows.md
+++ b/docs/content.zh/docs/dev/datastream/operators/windows.md
@@ -35,6 +35,9 @@ under the License.
**Keyed Windows**
+{{< tabs "Keyed Windows" >}}
+
+{{< tab "Java/Scala" >}}
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
@@ -45,8 +48,26 @@ under the License.
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
+{{< /tab >}}
+
+{{< tab "Python" >}}
+ stream
+ .key_by(...) <- 仅 keyed 窗口需要
+ .window(...) <- 必填项:"assigner"
+ [.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
+ [.allowed_lateness(...)] <- 可选项:"lateness" (省略则为 0)
+ [.side_output_late_data(...)] <- 可选项:"output tag" (省略则不对迟到数据使用
side output)
+ .reduce/aggregate/apply() <- 必填项:"function"
+ [.get_side_output(...)] <- 可选项:"output tag"
+
+{{< /tab >}}
+{{< /tabs >}}
+
**Non-Keyed Windows**
+{{< tabs "Non-Keyed Windows" >}}
+
+{{< tab "Java/Scala" >}}
stream
.windowAll(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (else default trigger)
@@ -56,8 +77,23 @@ under the License.
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"
+{{< /tab >}}
+
+{{< tab "Python" >}}
+ stream
+ .window_all(...) <- 必填项:"assigner"
+ [.trigger(...)] <- 可选项:"trigger" (else default
trigger)
+ [.allowed_lateness(...)] <- 可选项:"lateness" (else zero)
+ [.side_output_late_data(...)] <- 可选项:"output tag" (else no side
output for late data)
+ .reduce/aggregate/apply() <- 必填项:"function"
+ [.get_side_output(...)] <- 可选项:"output tag"
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
上面方括号([...])中的命令是可选的。也就是说,Flink 允许你自定义多样化的窗口操作来满足你的需求。
-{{< hint info >}} Note: Non-Keyed windows 在 Python DataStream API 中还不支持. {{<
/hint >}}
+{{< hint info >}} Note: `Evictor` 在 Python DataStream API 中还不支持. {{< /hint >}}
## 窗口的生命周期
@@ -813,51 +849,51 @@ class ProcessWindowFunction(Function, Generic[IN, OUT,
KEY, W]):
"""
pass
-class Context(ABC, Generic[W2]):
- """
- The context holding window metadata.
- """
-
- @abstractmethod
- def window(self) -> W2:
- """
- :return: The window that is being evaluated.
- """
- pass
-
- @abstractmethod
- def current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def current_watermark(self) -> int:
- """
- :return: The current event-time watermark.
- """
- pass
-
- @abstractmethod
- def window_state(self) -> KeyedStateStore:
+ class Context(ABC, Generic[W2]):
"""
- State accessor for per-key and per-window state.
-
- .. note::
- If you use per-window state you have to ensure that you clean it
up by implementing
- :func:`~ProcessWindowFunction.clear`.
-
- :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ The context holding window metadata.
"""
- pass
-
- @abstractmethod
- def global_state(self) -> KeyedStateStore:
- """
- State accessor for per-key global state.
- """
- pass
+
+ @abstractmethod
+ def window(self) -> W2:
+ """
+ :return: The window that is being evaluated.
+ """
+ pass
+
+ @abstractmethod
+ def current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def current_watermark(self) -> int:
+ """
+ :return: The current event-time watermark.
+ """
+ pass
+
+ @abstractmethod
+ def window_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key and per-window state.
+
+ .. note::
+ If you use per-window state you have to ensure that you clean
it up by implementing
+ :func:`~ProcessWindowFunction.clear`.
+
+ :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ """
+ pass
+
+ @abstractmethod
+ def global_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key global state.
+ """
+ pass
```
{{< /tab >}}
{{< /tabs >}}
@@ -1413,6 +1449,8 @@ Flink 内置有三个 evictor:
指定一个 evictor 可以避免预聚合,因为窗口中的所有元素在计算前都必须经过 evictor。
{{< /hint >}}
+{{< hint info >}} Note: `Evictor` 在 Python DataStream API 中还不支持. {{< /hint >}}
+
Flink 不对窗口中元素的顺序做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。
diff --git a/docs/content.zh/docs/dev/python/datastream/operators/windows.md
b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
index a79d7c744db..cbb31f936d1 100644
--- a/docs/content.zh/docs/dev/python/datastream/operators/windows.md
+++ b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
@@ -2,6 +2,8 @@
title: "Windows"
weight: 2
type: docs
+aliases:
+- /zh/docs/dev/python/datastream/operators/windows.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -22,564 +24,4 @@ specific language governing permissions and limitations
under the License.
-->
-# Windows
-
-Windows are at the heart of processing infinite streams. Windows split the
stream into "buckets" of finite size,
-over which we can apply computations. This document focuses on how windowing
is performed in Flink and how the
-programmer can benefit to the maximum from its offered functionality.
-
-Currently, the widow operation is only supported in *keyed* streams
-
-**Keyed Windows**
-
- stream
- .key_by(...)
- .window(...) <- required: "assigner"
- [.trigger(...)] <- optional: "trigger" (else default
trigger)
- [.allowed_lateness(...)] <- optional: "lateness" (else zero)
- .apply/process() <- required: "function"
-
-In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
-windowing logic in many different ways so that it best fits your needs.
-
-## Window Lifecycle
-
-In a nutshell, a window is **created** as soon as the first element that
should belong to this window arrives, and the
-window is **completely removed** when the time (event or processing time)
passes its end timestamp plus the user-specified
-`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink
guarantees removal only for time-based
-windows and not for other types, *e.g.* global windows (see [Window
Assigners](#window-assigners)). For example, with an
-event-time-based windowing strategy that creates non-overlapping (or tumbling)
windows every 5 minutes and has an allowed
-lateness of 1 min, Flink will create a new window for the interval between
`12:00` and `12:05` when the first element with
-a timestamp that falls into this interval arrives, and it will remove it when
the watermark passes the `12:06`
-timestamp.
-
-In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and
a function (`WindowFunction` or `ProcessWindowFunction`)
-(see [Window Functions](#window-functions)) attached to it. The function will
contain the computation to
-be applied to the contents of the window, while the `Trigger` specifies the
conditions under which the window is
-considered ready for the function to be applied. A triggering policy might be
something like "when the number of elements
-in the window is more than 4", or "when the watermark passes the end of the
window". A trigger can also decide to
-purge a window's contents any time between its creation and removal. Purging
in this case only refers to the elements
-in the window, and *not* the window metadata. This means that new data can
still be added to that window.
-
-In the following we go into more detail for each of the components above. We
start with the required parts in the above
-snippet (see [Keyed Windows](#keyed-windows), [Window
Assigner](#window-assigners), and [Window Function](#window-functions))
-before moving to the optional ones.
-
-## Keyed Windows
-
-The first thing to specify is whether your stream should be keyed or not. This
has to be done before defining the window.
-Using the `key_by(...)` will split your infinite stream into logical keyed
streams. If `key_by(...)` is not called, your
-stream is not keyed.
-
-In the case of keyed streams, any attribute of your incoming events can be
used as a key
-(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state"
>}}#keyed-datastream)). Having a keyed stream will
-allow your windowed computation to be performed in parallel by multiple tasks,
as each logical keyed stream can be processed
-independently from the rest. All elements referring to the same key will be
sent to the same parallel task.
-
-## Window Assigners
-
-After specifying your stream is keyed, the next step is to define a *window
assigner*.
-The window assigner defines how elements are assigned to windows. This is done
by specifying the `WindowAssigner`
-of your choice in the `window(...)` (for *keyed* streams) call.
-
-A `WindowAssigner` is responsible for assigning each incoming element to one
or more windows.
-You can implement a custom window assigner by extending the `WindowAssigner`
class.
-
-Time-based windows have a *start timestamp* (inclusive) and an *end timestamp*
(exclusive)
-that together describe the size of the window. In code, Flink uses
`TimeWindow` when working with
-time-based windows which has methods for querying the start- and end-timestamp
and also an
-additional method `max_timestamp()` that returns the largest allowed timestamp
for a given windows.
-
-In the following, we show how to custom a *tumbling windows* assigner. For
details of Tumbling Windows, you can
-refer to the [the relevant documentation]({{< ref
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
-
-```python
-from typing import Tuple, Collection
-
-from pyflink.common.serializer import TypeSerializer
-from pyflink.datastream import WindowAssigner, Trigger
-from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
-
-class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
-
- def __init__(self, size: int, offset: int, is_event_time: bool):
- self._size = size
- self._offset = offset
- self._is_event_time = is_event_time
-
- def assign_windows(self,
- element: Tuple,
- timestamp: int,
- context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
- start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
- return [TimeWindow(start, start + self._size)]
-
- def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
- return EventTimeTrigger()
-
- def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
- return TimeWindowSerializer()
-
- def is_event_time(self) -> bool:
- return False
-```
-
-## Window Functions
-
-After defining the window assigner, we need to specify the computation that we
want
-to perform on each of these windows. This is the responsibility of the *window
function*, which is used to process the
-elements of each keyed window once the system determines that a window is
ready for processing
-(see [triggers](#triggers) for how Flink determines when a window is ready).
-
-The window function can be `ProcessWindowFunction` or `WindowFunction`. They
get an `Iterable` for all the elements contained in a
-window and additional meta information about the window to which the elements
belong.
-
-In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`.
-This is an older version of ProcessWindowFunction that provides less
contextual information and
-does not have some advances features, such as per-window keyed state. We will
look at examples for each of these variants.
-
-### ProcessWindowFunction
-
-A ProcessWindowFunction gets an Iterable containing all the elements of the
window, and a Context
-object with access to time and state information, which enables it to provide
more flexibility than
-other window functions. This comes at the cost of performance and resource
consumption, because
-elements cannot be incrementally aggregated but instead need to be buffered
internally until the
-window is considered ready for processing.
-
-The signature of `ProcessWindowFunction` looks as follows:
-
-```python
-class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
- """
- Base interface for functions that are evaluated over keyed (grouped)
windows using a context
- for retrieving extra information.
- """
-
- class Context(ABC, Generic[W2]):
- """
- The context holding window metadata.
- """
-
- @abstractmethod
- def window(self) -> W2:
- """
- :return: The window that is being evaluated.
- """
- pass
-
- @abstractmethod
- def current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def current_watermark(self) -> int:
- """
- :return: The current event-time watermark.
- """
- pass
-
- @abstractmethod
- def window_state(self) -> KeyedStateStore:
- """
- State accessor for per-key and per-window state.
-
- .. note::
- If you use per-window state you have to ensure that you clean
it up by implementing
- :func:`~ProcessWindowFunction.clear`.
-
- :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
- """
- pass
-
- @abstractmethod
- def global_state(self) -> KeyedStateStore:
- """
- State accessor for per-key global state.
- """
- pass
-
- @abstractmethod
- def process(self,
- key: KEY,
- content: 'ProcessWindowFunction.Context',
- elements: Iterable[IN]) -> Iterable[OUT]:
- """
- Evaluates the window and outputs none or several elements.
-
- :param key: The key for which this window is evaluated.
- :param content: The context in which the window is being evaluated.
- :param elements: The elements in the window being evaluated.
- :return: The iterable object which produces the elements to emit.
- """
- pass
-
- @abstractmethod
- def clear(self, context: 'ProcessWindowFunction.Context') -> None:
- """
- Deletes any state in the :class:`Context` when the Window expires (the
watermark passes its
- max_timestamp + allowed_lateness).
-
- :param context: The context to which the window is being evaluated.
- """
- pass
-```
-
-The `key` parameter is the key that is extracted
-via the `KeySelector` that was specified for the `key_by()` invocation. In
case of tuple-index
-keys or string-field references this key type is always `Tuple` and you have
to manually cast
-it to a tuple of the correct size to extract the key fields.
-
-A `ProcessWindowFunction` can be defined and used like this:
-
-```python
-from typing import Tuple, Iterable
-
-from pyflink.common.typeinfo import Types
-from pyflink.datastream.window import TimeWindow
-
-class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str,
TimeWindow]):
-
- def process(self,
- key: str,
- content: ProcessWindowFunction.Context,
- elements: Iterable[Tuple]) -> Iterable[tuple]:
- result = 0
- for i in elements:
- result += i[0]
- return [(key, result)]
-
- def clear(self, context: ProcessWindowFunction.Context) -> None:
- pass
-
-data_stream = env.from_collection([
- (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
- type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
-data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
- .window(TumblingEventWindowAssigner()) \
- .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))
-```
-### WindowFunction
-
-In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`. This
-is an older version of `ProcessWindowFunction` that provides less contextual
information and does
-not have some advances features, such as per-window keyed state. This
interface will be deprecated
-at some point.
-
-The signature of a `WindowFunction` looks as follows:
-
-```python
-class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
- """
- Base interface for functions that are evaluated over keyed (grouped)
windows.
- """
-
- @abstractmethod
- def apply(self, key: KEY, window: W, inputs: Iterable[IN]) ->
Iterable[OUT]:
- """
- Evaluates the window and outputs none or several elements.
-
- :param key: The key for which this window is evaluated.
- :param window: The window that is being evaluated.
- :param inputs: The elements in the window being evaluated.
- """
- pass
-```
-
-It can be used like this:
-
-```python
-from typing import Tuple, Iterable
-
-from pyflink.common.typeinfo import Types
-from pyflink.datastream.window import TimeWindow
-
-class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
-
- def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
- result = 0
- for i in inputs:
- result += i[0]
- return [(key, result)]
-
-data_stream = env.from_collection([
- (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
- type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
-data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
- .window(TumblingEventWindowAssigner()) \
- .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
-```
-
-## Triggers
-
-A Trigger determines when a window (as formed by the window assigner) is ready
to be processed by the window function.
-Each WindowAssigner comes with a default Trigger. You can specify a custom
trigger using trigger(...).
-
-The signature of `ProcessWindowFunction` looks as follows:
-
-```python
-class Trigger(ABC, Generic[T, W]):
- """
- A Trigger determines when a pane of a window should be evaluated to emit
the results for that
- part of the window.
-
- A pane is the bucket of elements that have the same key (assigned by the
KeySelector) and same
- Window. An element can be in multiple panes if it was assigned to multiple
windows by the
- WindowAssigner. These panes all have their own instance of the Trigger.
-
- Triggers must not maintain state internally since they can be re-created
or reused for different
- keys. All necessary state should be persisted using the state abstraction
available on the
- TriggerContext.
-
- When used with a MergingWindowAssigner the Trigger must return true from
:func:`can_merge` and
- :func:`on_merge` most be properly implemented.
- """
-
- class TriggerContext(ABC):
- """
- A context object that is given to :class:`Trigger` methods to allow
them to register timer
- callbacks and deal with state.
- """
-
- @abstractmethod
- def get_current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def get_metric_group(self) -> MetricGroup:
- """
- Returns the metric group for this :class:`Trigger`. This is the
same metric group that
- would be returned from
-
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user
function.
-
- :return: The metric group.
- """
- pass
-
- @abstractmethod
- def get_current_watermark(self) -> int:
- """
- :return: The current watermark time.
- """
- pass
-
- @abstractmethod
- def register_processing_time_timer(self, time: int) -> None:
- """
- Register a system time callback. When the current system time
passes the specified time
- :func:`~Trigger.on_processing_time` is called with the time
specified here.
-
- :param time: The time at which to invoke
:func:`~Trigger.on_processing_time`.
- """
- pass
-
- @abstractmethod
- def register_event_time_timer(self, time: int) -> None:
- """
- Register an event-time callback. When the current watermark passes
the specified time
- :func:`~Trigger.on_event_time` is called with the time specified
here.
-
- :param time: The watermark at which to invoke
:func:`~Trigger.on_event_time`.
- """
- pass
-
- @abstractmethod
- def delete_processing_time_timer(self, time: int) -> None:
- """
- Delete the processing time trigger for the given time.
- """
- pass
-
- @abstractmethod
- def delete_event_time_timer(self, time: int) -> None:
- """
- Delete the event-time trigger for the given time.
- """
- pass
-
- @abstractmethod
- def get_partitioned_state(self, state_descriptor: StateDescriptor) ->
State:
- """
- Retrieves a :class:`State` object that can be used to interact
with fault-tolerant state
- that is scoped to the window and key of the current trigger
invocation.
-
- :param state_descriptor: The StateDescriptor that contains the
name and type of the
- state that is being accessed.
- :return: The partitioned state object.
- """
- pass
-
- class OnMergeContext(TriggerContext):
- """
- Extension of :class:`TriggerContext` that is given to
:func:`~Trigger.on_merge`.
- """
-
- @abstractmethod
- def merge_partitioned_state(self, state_descriptor: StateDescriptor)
-> None:
- pass
-
- @abstractmethod
- def on_element(self,
- element: T,
- timestamp: int,
- window: W,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- """
- Called for every element that gets added to a pane. The result of this
will determine
- whether the pane is evaluated to emit results.
-
- :param element: The element that arrived.
- :param timestamp: The timestamp of the element that arrived.
- :param window: The window to which the element is being added.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- @abstractmethod
- def on_processing_time(self,
- time: int,
- window: W,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- """
- Called when a processing-time timer that was set using the trigger
context fires.
-
- :param time: The timestamp at which the timer fired.
- :param window: The window for which the timer fired.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- @abstractmethod
- def on_event_time(self, time: int, window: W, ctx:
'Trigger.TriggerContext') -> TriggerResult:
- """
- Called when an event-time timer that was set using the trigger context
fires.
-
- :param time: The timestamp at which the timer fired.
- :param window: The window for which the timer fired.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- def can_merge(self) -> bool:
- """
- .. note:: If this returns true you must properly implement
:func:`~Trigger.on_merge`
-
- :return: True if this trigger supports merging of trigger state and
can therefore be used
- with a MergingWindowAssigner.
- """
- return False
-
- @abstractmethod
- def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
- """
- Called when several windows have been merged into one window by the
:class:`WindowAssigner`.
-
- :param window: The new window that results from the merge.
- :param ctx: A context object that can be used to register timer
callbacks and access state.
- """
- pass
-
- @abstractmethod
- def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
- """
- Clears any state that the trigger might still hold for the given
window. This is called when
- a window is purged. Timers set using
:func:`~TriggerContext.register_event_time_timer` and
- :func:`~TriggerContext.register_processing_time_timer` should be
deleted here as well as
- state acquired using :func:`~TriggerContext.get_partitioned_state`.
- """
- pass
-```
-
-Two things to notice about the above methods are:
-
-1) The first three(*on_element*, *on_processing_time* and *on_event_time*)
decide how to act on their invocation event by returning a `TriggerResult`.
-The action can be one of the following:
-
-* `CONTINUE`: do nothing,
-* `FIRE`: trigger the computation,
-* `PURGE`: clear the elements in the window, and
-* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the
window afterwards.
-
-2) Any of these methods can be used to register processing- or event-time
timers for future actions.
-
-### Fire and Purge
-
-Once a trigger determines that a window is ready for processing, it fires,
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the
window operator
-to emit the result of the current window. Given a window with a
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
-
-When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE`
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
-By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
-
-You can implement a custom EventTimeTrigger as follows:
-
-```python
-from typing import Tuple
-from pyflink.datastream.window import TimeWindow
-
-class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
-
- def on_element(self,
- element: Tuple,
- timestamp: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- return TriggerResult.CONTINUE
-
- def on_processing_time(self,
- time: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- return TriggerResult.CONTINUE
-
- def on_event_time(self,
- time: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- if time >= window.max_timestamp():
- return TriggerResult.FIRE_AND_PURGE
- else:
- return TriggerResult.CONTINUE
-
- def on_merge(self,
- window: TimeWindow,
- ctx: 'Trigger.OnMergeContext') -> None:
- pass
-
- def clear(self,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> None:
- pass
-```
-
-## Allowed Lateness
-
-When working with *event-time* windowing, it can happen that elements arrive
late, *i.e.* the watermark that Flink uses to
-keep track of the progress of event-time is already past the end timestamp of
a window to which an element belongs. See
-[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks"
>}}) and especially [late elements]({{< ref
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a
more thorough
-discussion of how Flink deals with event time.
-
-By default, late elements are dropped when the watermark is past the end of
the window. However,
-Flink allows to specify a maximum *allowed lateness* for window operators.
Allowed lateness
-specifies by how much time elements can be late before they are dropped, and
its default value is 0.
-Elements that arrive after the watermark has passed the end of the window but
before it passes the end of
-the window plus the allowed lateness, are still added to the window. Depending
on the trigger used,
-a late but not dropped element may cause the window to fire again.
-
-In order to make this work, Flink keeps the state of windows until their
allowed lateness expires. Once this happens, Flink removes the window and
deletes its state, as
-also described in the [Window Lifecycle](#window-lifecycle) section.
-
-By default, the allowed lateness is set to `0`. That is, elements that arrive
behind the watermark will be dropped.
-
-You can specify an allowed lateness like this:
-
-```python
-data_stream.key_by(<key selector>) \
- .window(<window assigner>) \
- .allowed_lateness(<time>) \
- .<windowed transformation>(<window function>)
-```
+<meta http-equiv="refresh" content="0; url={{< ref
"docs/dev/datastream/operators/windows" >}} "/>
diff --git a/docs/content/docs/dev/datastream/operators/windows.md
b/docs/content/docs/dev/datastream/operators/windows.md
index 44bf56cb5aa..582d357582a 100644
--- a/docs/content/docs/dev/datastream/operators/windows.md
+++ b/docs/content/docs/dev/datastream/operators/windows.md
@@ -37,6 +37,9 @@ for the rest of the page.
**Keyed Windows**
+{{< tabs "Keyed Windows" >}}
+
+{{< tab "Java/Scala" >}}
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
@@ -47,8 +50,26 @@ for the rest of the page.
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
+{{< /tab >}}
+
+{{< tab "Python" >}}
+ stream
+ .key_by(...)
+ .window(...) <- required: "assigner"
+ [.trigger(...)] <- optional: "trigger" (else default
trigger)
+ [.allowed_lateness(...)] <- optional: "lateness" (else zero)
+ [.side_output_late_data(...)] <- optional: "output tag" (else no
side output for late data)
+ .reduce/aggregate/apply() <- required: "function"
+ [.get_side_output(...)] <- optional: "output tag"
+
+{{< /tab >}}
+{{< /tabs >}}
+
**Non-Keyed Windows**
+{{< tabs "Non-Keyed Windows" >}}
+
+{{< tab "Java/Scala" >}}
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default
trigger)
@@ -58,10 +79,24 @@ for the rest of the page.
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
+{{< /tab >}}
+
+{{< tab "Python" >}}
+ stream
+ .window_all(...) <- required: "assigner"
+ [.trigger(...)] <- optional: "trigger" (else default
trigger)
+ [.allowed_lateness(...)] <- optional: "lateness" (else zero)
+ [.side_output_late_data(...)] <- optional: "output tag" (else no
side output for late data)
+ .reduce/aggregate/apply() <- required: "function"
+ [.get_side_output(...)] <- optional: "output tag"
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
windowing logic in many different ways so that it best fits your needs.
-{{< hint info >}} Note: Non-Keyed windows is still not supported in Python
DataStream API. {{< /hint >}}
-
+{{< hint info >}} Note: `Evictor` is still not supported in Python DataStream
API. {{< /hint >}}
## Window Lifecycle
@@ -841,51 +876,51 @@ class ProcessWindowFunction(Function, Generic[IN, OUT,
KEY, W]):
"""
pass
-class Context(ABC, Generic[W2]):
- """
- The context holding window metadata.
- """
-
- @abstractmethod
- def window(self) -> W2:
+ class Context(ABC, Generic[W2]):
"""
- :return: The window that is being evaluated.
+ The context holding window metadata.
"""
- pass
-
- @abstractmethod
- def current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def current_watermark(self) -> int:
- """
- :return: The current event-time watermark.
- """
- pass
-
- @abstractmethod
- def window_state(self) -> KeyedStateStore:
- """
- State accessor for per-key and per-window state.
-
- .. note::
- If you use per-window state you have to ensure that you clean it
up by implementing
- :func:`~ProcessWindowFunction.clear`.
-
- :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
- """
- pass
-
- @abstractmethod
- def global_state(self) -> KeyedStateStore:
- """
- State accessor for per-key global state.
- """
- pass
+
+ @abstractmethod
+ def window(self) -> W2:
+ """
+ :return: The window that is being evaluated.
+ """
+ pass
+
+ @abstractmethod
+ def current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def current_watermark(self) -> int:
+ """
+ :return: The current event-time watermark.
+ """
+ pass
+
+ @abstractmethod
+ def window_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key and per-window state.
+
+ .. note::
+ If you use per-window state you have to ensure that you clean
it up by implementing
+ :func:`~ProcessWindowFunction.clear`.
+
+ :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ """
+ pass
+
+ @abstractmethod
+ def global_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key global state.
+ """
+ pass
```
{{< /tab >}}
{{< /tabs >}}
@@ -1455,6 +1490,8 @@ elements of a window have to be passed to the evictor
before applying the comput
This means windows with evictors will create significantly more state.
{{< /hint >}}
+{{< hint info >}} Note: `Evictor` is still not supported in Python DataStream
API. {{< /hint >}}
+
Flink provides no guarantees about the order of the elements within
a window. This implies that although an evictor may remove elements from the
beginning of the window, these are not
necessarily the ones that arrive first or last.
diff --git a/docs/content/docs/dev/python/datastream/operators/windows.md
b/docs/content/docs/dev/python/datastream/operators/windows.md
index a79d7c744db..bcf555c8b84 100644
--- a/docs/content/docs/dev/python/datastream/operators/windows.md
+++ b/docs/content/docs/dev/python/datastream/operators/windows.md
@@ -2,6 +2,8 @@
title: "Windows"
weight: 2
type: docs
+aliases:
+- /docs/dev/python/datastream/operators/windows.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -22,564 +24,4 @@ specific language governing permissions and limitations
under the License.
-->
-# Windows
-
-Windows are at the heart of processing infinite streams. Windows split the
stream into "buckets" of finite size,
-over which we can apply computations. This document focuses on how windowing
is performed in Flink and how the
-programmer can benefit to the maximum from its offered functionality.
-
-Currently, the widow operation is only supported in *keyed* streams
-
-**Keyed Windows**
-
- stream
- .key_by(...)
- .window(...) <- required: "assigner"
- [.trigger(...)] <- optional: "trigger" (else default
trigger)
- [.allowed_lateness(...)] <- optional: "lateness" (else zero)
- .apply/process() <- required: "function"
-
-In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
-windowing logic in many different ways so that it best fits your needs.
-
-## Window Lifecycle
-
-In a nutshell, a window is **created** as soon as the first element that
should belong to this window arrives, and the
-window is **completely removed** when the time (event or processing time)
passes its end timestamp plus the user-specified
-`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink
guarantees removal only for time-based
-windows and not for other types, *e.g.* global windows (see [Window
Assigners](#window-assigners)). For example, with an
-event-time-based windowing strategy that creates non-overlapping (or tumbling)
windows every 5 minutes and has an allowed
-lateness of 1 min, Flink will create a new window for the interval between
`12:00` and `12:05` when the first element with
-a timestamp that falls into this interval arrives, and it will remove it when
the watermark passes the `12:06`
-timestamp.
-
-In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and
a function (`WindowFunction` or `ProcessWindowFunction`)
-(see [Window Functions](#window-functions)) attached to it. The function will
contain the computation to
-be applied to the contents of the window, while the `Trigger` specifies the
conditions under which the window is
-considered ready for the function to be applied. A triggering policy might be
something like "when the number of elements
-in the window is more than 4", or "when the watermark passes the end of the
window". A trigger can also decide to
-purge a window's contents any time between its creation and removal. Purging
in this case only refers to the elements
-in the window, and *not* the window metadata. This means that new data can
still be added to that window.
-
-In the following we go into more detail for each of the components above. We
start with the required parts in the above
-snippet (see [Keyed Windows](#keyed-windows), [Window
Assigner](#window-assigners), and [Window Function](#window-functions))
-before moving to the optional ones.
-
-## Keyed Windows
-
-The first thing to specify is whether your stream should be keyed or not. This
has to be done before defining the window.
-Using the `key_by(...)` will split your infinite stream into logical keyed
streams. If `key_by(...)` is not called, your
-stream is not keyed.
-
-In the case of keyed streams, any attribute of your incoming events can be
used as a key
-(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state"
>}}#keyed-datastream)). Having a keyed stream will
-allow your windowed computation to be performed in parallel by multiple tasks,
as each logical keyed stream can be processed
-independently from the rest. All elements referring to the same key will be
sent to the same parallel task.
-
-## Window Assigners
-
-After specifying your stream is keyed, the next step is to define a *window
assigner*.
-The window assigner defines how elements are assigned to windows. This is done
by specifying the `WindowAssigner`
-of your choice in the `window(...)` (for *keyed* streams) call.
-
-A `WindowAssigner` is responsible for assigning each incoming element to one
or more windows.
-You can implement a custom window assigner by extending the `WindowAssigner`
class.
-
-Time-based windows have a *start timestamp* (inclusive) and an *end timestamp*
(exclusive)
-that together describe the size of the window. In code, Flink uses
`TimeWindow` when working with
-time-based windows which has methods for querying the start- and end-timestamp
and also an
-additional method `max_timestamp()` that returns the largest allowed timestamp
for a given windows.
-
-In the following, we show how to custom a *tumbling windows* assigner. For
details of Tumbling Windows, you can
-refer to the [the relevant documentation]({{< ref
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
-
-```python
-from typing import Tuple, Collection
-
-from pyflink.common.serializer import TypeSerializer
-from pyflink.datastream import WindowAssigner, Trigger
-from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
-
-class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
-
- def __init__(self, size: int, offset: int, is_event_time: bool):
- self._size = size
- self._offset = offset
- self._is_event_time = is_event_time
-
- def assign_windows(self,
- element: Tuple,
- timestamp: int,
- context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
- start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
- return [TimeWindow(start, start + self._size)]
-
- def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
- return EventTimeTrigger()
-
- def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
- return TimeWindowSerializer()
-
- def is_event_time(self) -> bool:
- return False
-```
-
-## Window Functions
-
-After defining the window assigner, we need to specify the computation that we
want
-to perform on each of these windows. This is the responsibility of the *window
function*, which is used to process the
-elements of each keyed window once the system determines that a window is
ready for processing
-(see [triggers](#triggers) for how Flink determines when a window is ready).
-
-The window function can be `ProcessWindowFunction` or `WindowFunction`. They
get an `Iterable` for all the elements contained in a
-window and additional meta information about the window to which the elements
belong.
-
-In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`.
-This is an older version of ProcessWindowFunction that provides less
contextual information and
-does not have some advances features, such as per-window keyed state. We will
look at examples for each of these variants.
-
-### ProcessWindowFunction
-
-A ProcessWindowFunction gets an Iterable containing all the elements of the
window, and a Context
-object with access to time and state information, which enables it to provide
more flexibility than
-other window functions. This comes at the cost of performance and resource
consumption, because
-elements cannot be incrementally aggregated but instead need to be buffered
internally until the
-window is considered ready for processing.
-
-The signature of `ProcessWindowFunction` looks as follows:
-
-```python
-class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
- """
- Base interface for functions that are evaluated over keyed (grouped)
windows using a context
- for retrieving extra information.
- """
-
- class Context(ABC, Generic[W2]):
- """
- The context holding window metadata.
- """
-
- @abstractmethod
- def window(self) -> W2:
- """
- :return: The window that is being evaluated.
- """
- pass
-
- @abstractmethod
- def current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def current_watermark(self) -> int:
- """
- :return: The current event-time watermark.
- """
- pass
-
- @abstractmethod
- def window_state(self) -> KeyedStateStore:
- """
- State accessor for per-key and per-window state.
-
- .. note::
- If you use per-window state you have to ensure that you clean
it up by implementing
- :func:`~ProcessWindowFunction.clear`.
-
- :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
- """
- pass
-
- @abstractmethod
- def global_state(self) -> KeyedStateStore:
- """
- State accessor for per-key global state.
- """
- pass
-
- @abstractmethod
- def process(self,
- key: KEY,
- content: 'ProcessWindowFunction.Context',
- elements: Iterable[IN]) -> Iterable[OUT]:
- """
- Evaluates the window and outputs none or several elements.
-
- :param key: The key for which this window is evaluated.
- :param content: The context in which the window is being evaluated.
- :param elements: The elements in the window being evaluated.
- :return: The iterable object which produces the elements to emit.
- """
- pass
-
- @abstractmethod
- def clear(self, context: 'ProcessWindowFunction.Context') -> None:
- """
- Deletes any state in the :class:`Context` when the Window expires (the
watermark passes its
- max_timestamp + allowed_lateness).
-
- :param context: The context to which the window is being evaluated.
- """
- pass
-```
-
-The `key` parameter is the key that is extracted
-via the `KeySelector` that was specified for the `key_by()` invocation. In
case of tuple-index
-keys or string-field references this key type is always `Tuple` and you have
to manually cast
-it to a tuple of the correct size to extract the key fields.
-
-A `ProcessWindowFunction` can be defined and used like this:
-
-```python
-from typing import Tuple, Iterable
-
-from pyflink.common.typeinfo import Types
-from pyflink.datastream.window import TimeWindow
-
-class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str,
TimeWindow]):
-
- def process(self,
- key: str,
- content: ProcessWindowFunction.Context,
- elements: Iterable[Tuple]) -> Iterable[tuple]:
- result = 0
- for i in elements:
- result += i[0]
- return [(key, result)]
-
- def clear(self, context: ProcessWindowFunction.Context) -> None:
- pass
-
-data_stream = env.from_collection([
- (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
- type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
-data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
- .window(TumblingEventWindowAssigner()) \
- .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))
-```
-### WindowFunction
-
-In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`. This
-is an older version of `ProcessWindowFunction` that provides less contextual
information and does
-not have some advances features, such as per-window keyed state. This
interface will be deprecated
-at some point.
-
-The signature of a `WindowFunction` looks as follows:
-
-```python
-class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
- """
- Base interface for functions that are evaluated over keyed (grouped)
windows.
- """
-
- @abstractmethod
- def apply(self, key: KEY, window: W, inputs: Iterable[IN]) ->
Iterable[OUT]:
- """
- Evaluates the window and outputs none or several elements.
-
- :param key: The key for which this window is evaluated.
- :param window: The window that is being evaluated.
- :param inputs: The elements in the window being evaluated.
- """
- pass
-```
-
-It can be used like this:
-
-```python
-from typing import Tuple, Iterable
-
-from pyflink.common.typeinfo import Types
-from pyflink.datastream.window import TimeWindow
-
-class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
-
- def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
- result = 0
- for i in inputs:
- result += i[0]
- return [(key, result)]
-
-data_stream = env.from_collection([
- (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
- type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
-data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
- .window(TumblingEventWindowAssigner()) \
- .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
-```
-
-## Triggers
-
-A Trigger determines when a window (as formed by the window assigner) is ready
to be processed by the window function.
-Each WindowAssigner comes with a default Trigger. You can specify a custom
trigger using trigger(...).
-
-The signature of `ProcessWindowFunction` looks as follows:
-
-```python
-class Trigger(ABC, Generic[T, W]):
- """
- A Trigger determines when a pane of a window should be evaluated to emit
the results for that
- part of the window.
-
- A pane is the bucket of elements that have the same key (assigned by the
KeySelector) and same
- Window. An element can be in multiple panes if it was assigned to multiple
windows by the
- WindowAssigner. These panes all have their own instance of the Trigger.
-
- Triggers must not maintain state internally since they can be re-created
or reused for different
- keys. All necessary state should be persisted using the state abstraction
available on the
- TriggerContext.
-
- When used with a MergingWindowAssigner the Trigger must return true from
:func:`can_merge` and
- :func:`on_merge` most be properly implemented.
- """
-
- class TriggerContext(ABC):
- """
- A context object that is given to :class:`Trigger` methods to allow
them to register timer
- callbacks and deal with state.
- """
-
- @abstractmethod
- def get_current_processing_time(self) -> int:
- """
- :return: The current processing time.
- """
- pass
-
- @abstractmethod
- def get_metric_group(self) -> MetricGroup:
- """
- Returns the metric group for this :class:`Trigger`. This is the
same metric group that
- would be returned from
-
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user
function.
-
- :return: The metric group.
- """
- pass
-
- @abstractmethod
- def get_current_watermark(self) -> int:
- """
- :return: The current watermark time.
- """
- pass
-
- @abstractmethod
- def register_processing_time_timer(self, time: int) -> None:
- """
- Register a system time callback. When the current system time
passes the specified time
- :func:`~Trigger.on_processing_time` is called with the time
specified here.
-
- :param time: The time at which to invoke
:func:`~Trigger.on_processing_time`.
- """
- pass
-
- @abstractmethod
- def register_event_time_timer(self, time: int) -> None:
- """
- Register an event-time callback. When the current watermark passes
the specified time
- :func:`~Trigger.on_event_time` is called with the time specified
here.
-
- :param time: The watermark at which to invoke
:func:`~Trigger.on_event_time`.
- """
- pass
-
- @abstractmethod
- def delete_processing_time_timer(self, time: int) -> None:
- """
- Delete the processing time trigger for the given time.
- """
- pass
-
- @abstractmethod
- def delete_event_time_timer(self, time: int) -> None:
- """
- Delete the event-time trigger for the given time.
- """
- pass
-
- @abstractmethod
- def get_partitioned_state(self, state_descriptor: StateDescriptor) ->
State:
- """
- Retrieves a :class:`State` object that can be used to interact
with fault-tolerant state
- that is scoped to the window and key of the current trigger
invocation.
-
- :param state_descriptor: The StateDescriptor that contains the
name and type of the
- state that is being accessed.
- :return: The partitioned state object.
- """
- pass
-
- class OnMergeContext(TriggerContext):
- """
- Extension of :class:`TriggerContext` that is given to
:func:`~Trigger.on_merge`.
- """
-
- @abstractmethod
- def merge_partitioned_state(self, state_descriptor: StateDescriptor)
-> None:
- pass
-
- @abstractmethod
- def on_element(self,
- element: T,
- timestamp: int,
- window: W,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- """
- Called for every element that gets added to a pane. The result of this
will determine
- whether the pane is evaluated to emit results.
-
- :param element: The element that arrived.
- :param timestamp: The timestamp of the element that arrived.
- :param window: The window to which the element is being added.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- @abstractmethod
- def on_processing_time(self,
- time: int,
- window: W,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- """
- Called when a processing-time timer that was set using the trigger
context fires.
-
- :param time: The timestamp at which the timer fired.
- :param window: The window for which the timer fired.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- @abstractmethod
- def on_event_time(self, time: int, window: W, ctx:
'Trigger.TriggerContext') -> TriggerResult:
- """
- Called when an event-time timer that was set using the trigger context
fires.
-
- :param time: The timestamp at which the timer fired.
- :param window: The window for which the timer fired.
- :param ctx: A context object that can be used to register timer
callbacks.
- """
- pass
-
- def can_merge(self) -> bool:
- """
- .. note:: If this returns true you must properly implement
:func:`~Trigger.on_merge`
-
- :return: True if this trigger supports merging of trigger state and
can therefore be used
- with a MergingWindowAssigner.
- """
- return False
-
- @abstractmethod
- def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
- """
- Called when several windows have been merged into one window by the
:class:`WindowAssigner`.
-
- :param window: The new window that results from the merge.
- :param ctx: A context object that can be used to register timer
callbacks and access state.
- """
- pass
-
- @abstractmethod
- def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
- """
- Clears any state that the trigger might still hold for the given
window. This is called when
- a window is purged. Timers set using
:func:`~TriggerContext.register_event_time_timer` and
- :func:`~TriggerContext.register_processing_time_timer` should be
deleted here as well as
- state acquired using :func:`~TriggerContext.get_partitioned_state`.
- """
- pass
-```
-
-Two things to notice about the above methods are:
-
-1) The first three(*on_element*, *on_processing_time* and *on_event_time*)
decide how to act on their invocation event by returning a `TriggerResult`.
-The action can be one of the following:
-
-* `CONTINUE`: do nothing,
-* `FIRE`: trigger the computation,
-* `PURGE`: clear the elements in the window, and
-* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the
window afterwards.
-
-2) Any of these methods can be used to register processing- or event-time
timers for future actions.
-
-### Fire and Purge
-
-Once a trigger determines that a window is ready for processing, it fires,
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the
window operator
-to emit the result of the current window. Given a window with a
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
-
-When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE`
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
-By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
-
-You can implement a custom EventTimeTrigger as follows:
-
-```python
-from typing import Tuple
-from pyflink.datastream.window import TimeWindow
-
-class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
-
- def on_element(self,
- element: Tuple,
- timestamp: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- return TriggerResult.CONTINUE
-
- def on_processing_time(self,
- time: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- return TriggerResult.CONTINUE
-
- def on_event_time(self,
- time: int,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> TriggerResult:
- if time >= window.max_timestamp():
- return TriggerResult.FIRE_AND_PURGE
- else:
- return TriggerResult.CONTINUE
-
- def on_merge(self,
- window: TimeWindow,
- ctx: 'Trigger.OnMergeContext') -> None:
- pass
-
- def clear(self,
- window: TimeWindow,
- ctx: 'Trigger.TriggerContext') -> None:
- pass
-```
-
-## Allowed Lateness
-
-When working with *event-time* windowing, it can happen that elements arrive
late, *i.e.* the watermark that Flink uses to
-keep track of the progress of event-time is already past the end timestamp of
a window to which an element belongs. See
-[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks"
>}}) and especially [late elements]({{< ref
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a
more thorough
-discussion of how Flink deals with event time.
-
-By default, late elements are dropped when the watermark is past the end of
the window. However,
-Flink allows to specify a maximum *allowed lateness* for window operators.
Allowed lateness
-specifies by how much time elements can be late before they are dropped, and
its default value is 0.
-Elements that arrive after the watermark has passed the end of the window but
before it passes the end of
-the window plus the allowed lateness, are still added to the window. Depending
on the trigger used,
-a late but not dropped element may cause the window to fire again.
-
-In order to make this work, Flink keeps the state of windows until their
allowed lateness expires. Once this happens, Flink removes the window and
deletes its state, as
-also described in the [Window Lifecycle](#window-lifecycle) section.
-
-By default, the allowed lateness is set to `0`. That is, elements that arrive
behind the watermark will be dropped.
-
-You can specify an allowed lateness like this:
-
-```python
-data_stream.key_by(<key selector>) \
- .window(<window assigner>) \
- .allowed_lateness(<time>) \
- .<windowed transformation>(<window function>)
-```
+<meta http-equiv="refresh" content="0; url={{< ref
"docs/dev/datastream/operators/windows" >}} "/>