This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 50dcd81 [FLINK-22350][python][docs] Add documentation for
user-defined window in Python DataStream API
50dcd81 is described below
commit 50dcd819b0a82b307f85b3b6dbb89a7d6600331e
Author: huangxingbo <[email protected]>
AuthorDate: Wed Apr 21 17:55:27 2021 +0800
[FLINK-22350][python][docs] Add documentation for user-defined window in
Python DataStream API
This closes #15702.
---
.../docs/dev/python/datastream/operators/_index.md | 23 +
.../{operators.md => operators/overview.md} | 6 +-
.../dev/python/datastream/operators/windows.md | 585 +++++++++++++++++++++
.../docs/dev/python/datastream/operators/_index.md | 23 +
.../{operators.md => operators/overview.md} | 4 +-
.../dev/python/datastream/operators/windows.md | 585 +++++++++++++++++++++
6 files changed, 1221 insertions(+), 5 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/datastream/operators/_index.md
b/docs/content.zh/docs/dev/python/datastream/operators/_index.md
new file mode 100644
index 0000000..574fa3e
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/datastream/operators/_index.md
@@ -0,0 +1,23 @@
+---
+title: Operators
+bookCollapseSection: true
+weight: 21
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content.zh/docs/dev/python/datastream/operators.md
b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
similarity index 97%
rename from docs/content.zh/docs/dev/python/datastream/operators.md
rename to docs/content.zh/docs/dev/python/datastream/operators/overview.md
index 78eba1a..8e1ac66 100644
--- a/docs/content.zh/docs/dev/python/datastream/operators.md
+++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
@@ -1,9 +1,9 @@
---
-title: "Operators"
-weight: 21
+title: "Overview"
+weight: 1
type: docs
aliases:
- - /zh/dev/python/datastream-api-users-guide/operators.html
+ - /zh/dev/python/datastream-api-users-guide/operators.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content.zh/docs/dev/python/datastream/operators/windows.md
b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
new file mode 100644
index 0000000..a79d7c7
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
@@ -0,0 +1,585 @@
+---
+title: "Windows"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Windows
+
+Windows are at the heart of processing infinite streams. Windows split the
stream into "buckets" of finite size,
+over which we can apply computations. This document focuses on how windowing
is performed in Flink and how the
+programmer can benefit to the maximum from its offered functionality.
+
+Currently, the widow operation is only supported in *keyed* streams
+
+**Keyed Windows**
+
+ stream
+ .key_by(...)
+ .window(...) <- required: "assigner"
+ [.trigger(...)] <- optional: "trigger" (else default
trigger)
+ [.allowed_lateness(...)] <- optional: "lateness" (else zero)
+ .apply/process() <- required: "function"
+
+In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
+windowing logic in many different ways so that it best fits your needs.
+
+## Window Lifecycle
+
+In a nutshell, a window is **created** as soon as the first element that
should belong to this window arrives, and the
+window is **completely removed** when the time (event or processing time)
passes its end timestamp plus the user-specified
+`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink
guarantees removal only for time-based
+windows and not for other types, *e.g.* global windows (see [Window
Assigners](#window-assigners)). For example, with an
+event-time-based windowing strategy that creates non-overlapping (or tumbling)
windows every 5 minutes and has an allowed
+lateness of 1 min, Flink will create a new window for the interval between
`12:00` and `12:05` when the first element with
+a timestamp that falls into this interval arrives, and it will remove it when
the watermark passes the `12:06`
+timestamp.
+
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and
a function (`WindowFunction` or `ProcessWindowFunction`)
+(see [Window Functions](#window-functions)) attached to it. The function will
contain the computation to
+be applied to the contents of the window, while the `Trigger` specifies the
conditions under which the window is
+considered ready for the function to be applied. A triggering policy might be
something like "when the number of elements
+in the window is more than 4", or "when the watermark passes the end of the
window". A trigger can also decide to
+purge a window's contents any time between its creation and removal. Purging
in this case only refers to the elements
+in the window, and *not* the window metadata. This means that new data can
still be added to that window.
+
+In the following we go into more detail for each of the components above. We
start with the required parts in the above
+snippet (see [Keyed Windows](#keyed-windows), [Window
Assigner](#window-assigners), and [Window Function](#window-functions))
+before moving to the optional ones.
+
+## Keyed Windows
+
+The first thing to specify is whether your stream should be keyed or not. This
has to be done before defining the window.
+Using the `key_by(...)` will split your infinite stream into logical keyed
streams. If `key_by(...)` is not called, your
+stream is not keyed.
+
+In the case of keyed streams, any attribute of your incoming events can be
used as a key
+(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state"
>}}#keyed-datastream)). Having a keyed stream will
+allow your windowed computation to be performed in parallel by multiple tasks,
as each logical keyed stream can be processed
+independently from the rest. All elements referring to the same key will be
sent to the same parallel task.
+
+## Window Assigners
+
+After specifying your stream is keyed, the next step is to define a *window
assigner*.
+The window assigner defines how elements are assigned to windows. This is done
by specifying the `WindowAssigner`
+of your choice in the `window(...)` (for *keyed* streams) call.
+
+A `WindowAssigner` is responsible for assigning each incoming element to one
or more windows.
+You can implement a custom window assigner by extending the `WindowAssigner`
class.
+
+Time-based windows have a *start timestamp* (inclusive) and an *end timestamp*
(exclusive)
+that together describe the size of the window. In code, Flink uses
`TimeWindow` when working with
+time-based windows which has methods for querying the start- and end-timestamp
and also an
+additional method `max_timestamp()` that returns the largest allowed timestamp
for a given windows.
+
+In the following, we show how to custom a *tumbling windows* assigner. For
details of Tumbling Windows, you can
+refer to the [the relevant documentation]({{< ref
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
+
+```python
+from typing import Tuple, Collection
+
+from pyflink.common.serializer import TypeSerializer
+from pyflink.datastream import WindowAssigner, Trigger
+from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
+
+class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
+
+ def __init__(self, size: int, offset: int, is_event_time: bool):
+ self._size = size
+ self._offset = offset
+ self._is_event_time = is_event_time
+
+ def assign_windows(self,
+ element: Tuple,
+ timestamp: int,
+ context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
+ start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
+ return [TimeWindow(start, start + self._size)]
+
+ def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
+ return EventTimeTrigger()
+
+ def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
+ return TimeWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return False
+```
+
+## Window Functions
+
+After defining the window assigner, we need to specify the computation that we
want
+to perform on each of these windows. This is the responsibility of the *window
function*, which is used to process the
+elements of each keyed window once the system determines that a window is
ready for processing
+(see [triggers](#triggers) for how Flink determines when a window is ready).
+
+The window function can be `ProcessWindowFunction` or `WindowFunction`. They
get an `Iterable` for all the elements contained in a
+window and additional meta information about the window to which the elements
belong.
+
+In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`.
+This is an older version of ProcessWindowFunction that provides less
contextual information and
+does not have some advances features, such as per-window keyed state. We will
look at examples for each of these variants.
+
+### ProcessWindowFunction
+
+A ProcessWindowFunction gets an Iterable containing all the elements of the
window, and a Context
+object with access to time and state information, which enables it to provide
more flexibility than
+other window functions. This comes at the cost of performance and resource
consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered
internally until the
+window is considered ready for processing.
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows using a context
+ for retrieving extra information.
+ """
+
+ class Context(ABC, Generic[W2]):
+ """
+ The context holding window metadata.
+ """
+
+ @abstractmethod
+ def window(self) -> W2:
+ """
+ :return: The window that is being evaluated.
+ """
+ pass
+
+ @abstractmethod
+ def current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def current_watermark(self) -> int:
+ """
+ :return: The current event-time watermark.
+ """
+ pass
+
+ @abstractmethod
+ def window_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key and per-window state.
+
+ .. note::
+ If you use per-window state you have to ensure that you clean
it up by implementing
+ :func:`~ProcessWindowFunction.clear`.
+
+ :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ """
+ pass
+
+ @abstractmethod
+ def global_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key global state.
+ """
+ pass
+
+ @abstractmethod
+ def process(self,
+ key: KEY,
+ content: 'ProcessWindowFunction.Context',
+ elements: Iterable[IN]) -> Iterable[OUT]:
+ """
+ Evaluates the window and outputs none or several elements.
+
+ :param key: The key for which this window is evaluated.
+ :param content: The context in which the window is being evaluated.
+ :param elements: The elements in the window being evaluated.
+ :return: The iterable object which produces the elements to emit.
+ """
+ pass
+
+ @abstractmethod
+ def clear(self, context: 'ProcessWindowFunction.Context') -> None:
+ """
+ Deletes any state in the :class:`Context` when the Window expires (the
watermark passes its
+ max_timestamp + allowed_lateness).
+
+ :param context: The context to which the window is being evaluated.
+ """
+ pass
+```
+
+The `key` parameter is the key that is extracted
+via the `KeySelector` that was specified for the `key_by()` invocation. In
case of tuple-index
+keys or string-field references this key type is always `Tuple` and you have
to manually cast
+it to a tuple of the correct size to extract the key fields.
+
+A `ProcessWindowFunction` can be defined and used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str,
TimeWindow]):
+
+ def process(self,
+ key: str,
+ content: ProcessWindowFunction.Context,
+ elements: Iterable[Tuple]) -> Iterable[tuple]:
+ result = 0
+ for i in elements:
+ result += i[0]
+ return [(key, result)]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+data_stream = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+ type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+ .window(TumblingEventWindowAssigner()) \
+ .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))
+```
+### WindowFunction
+
+In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual
information and does
+not have some advances features, such as per-window keyed state. This
interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+```python
+class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows.
+ """
+
+ @abstractmethod
+ def apply(self, key: KEY, window: W, inputs: Iterable[IN]) ->
Iterable[OUT]:
+ """
+ Evaluates the window and outputs none or several elements.
+
+ :param key: The key for which this window is evaluated.
+ :param window: The window that is being evaluated.
+ :param inputs: The elements in the window being evaluated.
+ """
+ pass
+```
+
+It can be used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
+
+ def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
+ result = 0
+ for i in inputs:
+ result += i[0]
+ return [(key, result)]
+
+data_stream = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+ type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+ .window(TumblingEventWindowAssigner()) \
+ .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+```
+
+## Triggers
+
+A Trigger determines when a window (as formed by the window assigner) is ready
to be processed by the window function.
+Each WindowAssigner comes with a default Trigger. You can specify a custom
trigger using trigger(...).
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class Trigger(ABC, Generic[T, W]):
+ """
+ A Trigger determines when a pane of a window should be evaluated to emit
the results for that
+ part of the window.
+
+ A pane is the bucket of elements that have the same key (assigned by the
KeySelector) and same
+ Window. An element can be in multiple panes if it was assigned to multiple
windows by the
+ WindowAssigner. These panes all have their own instance of the Trigger.
+
+ Triggers must not maintain state internally since they can be re-created
or reused for different
+ keys. All necessary state should be persisted using the state abstraction
available on the
+ TriggerContext.
+
+ When used with a MergingWindowAssigner the Trigger must return true from
:func:`can_merge` and
+ :func:`on_merge` most be properly implemented.
+ """
+
+ class TriggerContext(ABC):
+ """
+ A context object that is given to :class:`Trigger` methods to allow
them to register timer
+ callbacks and deal with state.
+ """
+
+ @abstractmethod
+ def get_current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def get_metric_group(self) -> MetricGroup:
+ """
+ Returns the metric group for this :class:`Trigger`. This is the
same metric group that
+ would be returned from
+
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user
function.
+
+ :return: The metric group.
+ """
+ pass
+
+ @abstractmethod
+ def get_current_watermark(self) -> int:
+ """
+ :return: The current watermark time.
+ """
+ pass
+
+ @abstractmethod
+ def register_processing_time_timer(self, time: int) -> None:
+ """
+ Register a system time callback. When the current system time
passes the specified time
+ :func:`~Trigger.on_processing_time` is called with the time
specified here.
+
+ :param time: The time at which to invoke
:func:`~Trigger.on_processing_time`.
+ """
+ pass
+
+ @abstractmethod
+ def register_event_time_timer(self, time: int) -> None:
+ """
+ Register an event-time callback. When the current watermark passes
the specified time
+ :func:`~Trigger.on_event_time` is called with the time specified
here.
+
+ :param time: The watermark at which to invoke
:func:`~Trigger.on_event_time`.
+ """
+ pass
+
+ @abstractmethod
+ def delete_processing_time_timer(self, time: int) -> None:
+ """
+ Delete the processing time trigger for the given time.
+ """
+ pass
+
+ @abstractmethod
+ def delete_event_time_timer(self, time: int) -> None:
+ """
+ Delete the event-time trigger for the given time.
+ """
+ pass
+
+ @abstractmethod
+ def get_partitioned_state(self, state_descriptor: StateDescriptor) ->
State:
+ """
+ Retrieves a :class:`State` object that can be used to interact
with fault-tolerant state
+ that is scoped to the window and key of the current trigger
invocation.
+
+ :param state_descriptor: The StateDescriptor that contains the
name and type of the
+ state that is being accessed.
+ :return: The partitioned state object.
+ """
+ pass
+
+ class OnMergeContext(TriggerContext):
+ """
+ Extension of :class:`TriggerContext` that is given to
:func:`~Trigger.on_merge`.
+ """
+
+ @abstractmethod
+ def merge_partitioned_state(self, state_descriptor: StateDescriptor)
-> None:
+ pass
+
+ @abstractmethod
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called for every element that gets added to a pane. The result of this
will determine
+ whether the pane is evaluated to emit results.
+
+ :param element: The element that arrived.
+ :param timestamp: The timestamp of the element that arrived.
+ :param window: The window to which the element is being added.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ @abstractmethod
+ def on_processing_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called when a processing-time timer that was set using the trigger
context fires.
+
+ :param time: The timestamp at which the timer fired.
+ :param window: The window for which the timer fired.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ @abstractmethod
+ def on_event_time(self, time: int, window: W, ctx:
'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called when an event-time timer that was set using the trigger context
fires.
+
+ :param time: The timestamp at which the timer fired.
+ :param window: The window for which the timer fired.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ def can_merge(self) -> bool:
+ """
+ .. note:: If this returns true you must properly implement
:func:`~Trigger.on_merge`
+
+ :return: True if this trigger supports merging of trigger state and
can therefore be used
+ with a MergingWindowAssigner.
+ """
+ return False
+
+ @abstractmethod
+ def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
+ """
+ Called when several windows have been merged into one window by the
:class:`WindowAssigner`.
+
+ :param window: The new window that results from the merge.
+ :param ctx: A context object that can be used to register timer
callbacks and access state.
+ """
+ pass
+
+ @abstractmethod
+ def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
+ """
+ Clears any state that the trigger might still hold for the given
window. This is called when
+ a window is purged. Timers set using
:func:`~TriggerContext.register_event_time_timer` and
+ :func:`~TriggerContext.register_processing_time_timer` should be
deleted here as well as
+ state acquired using :func:`~TriggerContext.get_partitioned_state`.
+ """
+ pass
+```
+
+Two things to notice about the above methods are:
+
+1) The first three(*on_element*, *on_processing_time* and *on_event_time*)
decide how to act on their invocation event by returning a `TriggerResult`.
+The action can be one of the following:
+
+* `CONTINUE`: do nothing,
+* `FIRE`: trigger the computation,
+* `PURGE`: clear the elements in the window, and
+* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the
window afterwards.
+
+2) Any of these methods can be used to register processing- or event-time
timers for future actions.
+
+### Fire and Purge
+
+Once a trigger determines that a window is ready for processing, it fires,
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the
window operator
+to emit the result of the current window. Given a window with a
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
+
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE`
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
+
+You can implement a custom EventTimeTrigger as follows:
+
+```python
+from typing import Tuple
+from pyflink.datastream.window import TimeWindow
+
+class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
+
+ def on_element(self,
+ element: Tuple,
+ timestamp: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if time >= window.max_timestamp():
+ return TriggerResult.FIRE_AND_PURGE
+ else:
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: TimeWindow,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ pass
+
+ def clear(self,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> None:
+ pass
+```
+
+## Allowed Lateness
+
+When working with *event-time* windowing, it can happen that elements arrive
late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp of
a window to which an element belongs. See
+[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks"
>}}) and especially [late elements]({{< ref
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a
more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped when the watermark is past the end of
the window. However,
+Flink allows to specify a maximum *allowed lateness* for window operators.
Allowed lateness
+specifies by how much time elements can be late before they are dropped, and
its default value is 0.
+Elements that arrive after the watermark has passed the end of the window but
before it passes the end of
+the window plus the allowed lateness, are still added to the window. Depending
on the trigger used,
+a late but not dropped element may cause the window to fire again.
+
+In order to make this work, Flink keeps the state of windows until their
allowed lateness expires. Once this happens, Flink removes the window and
deletes its state, as
+also described in the [Window Lifecycle](#window-lifecycle) section.
+
+By default, the allowed lateness is set to `0`. That is, elements that arrive
behind the watermark will be dropped.
+
+You can specify an allowed lateness like this:
+
+```python
+data_stream.key_by(<key selector>) \
+ .window(<window assigner>) \
+ .allowed_lateness(<time>) \
+ .<windowed transformation>(<window function>)
+```
diff --git a/docs/content/docs/dev/python/datastream/operators/_index.md
b/docs/content/docs/dev/python/datastream/operators/_index.md
new file mode 100644
index 0000000..574fa3e
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/_index.md
@@ -0,0 +1,23 @@
+---
+title: Operators
+bookCollapseSection: true
+weight: 21
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/dev/python/datastream/operators.md
b/docs/content/docs/dev/python/datastream/operators/overview.md
similarity index 99%
rename from docs/content/docs/dev/python/datastream/operators.md
rename to docs/content/docs/dev/python/datastream/operators/overview.md
index bf49d52..30cd642 100644
--- a/docs/content/docs/dev/python/datastream/operators.md
+++ b/docs/content/docs/dev/python/datastream/operators/overview.md
@@ -1,6 +1,6 @@
---
-title: "Operators"
-weight: 21
+title: "Overview"
+weight: 1
type: docs
aliases:
- /dev/python/datastream-api-users-guide/operators.html
diff --git a/docs/content/docs/dev/python/datastream/operators/windows.md
b/docs/content/docs/dev/python/datastream/operators/windows.md
new file mode 100644
index 0000000..a79d7c7
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/windows.md
@@ -0,0 +1,585 @@
+---
+title: "Windows"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Windows
+
+Windows are at the heart of processing infinite streams. Windows split the
stream into "buckets" of finite size,
+over which we can apply computations. This document focuses on how windowing
is performed in Flink and how the
+programmer can benefit to the maximum from its offered functionality.
+
+Currently, the widow operation is only supported in *keyed* streams
+
+**Keyed Windows**
+
+ stream
+ .key_by(...)
+ .window(...) <- required: "assigner"
+ [.trigger(...)] <- optional: "trigger" (else default
trigger)
+ [.allowed_lateness(...)] <- optional: "lateness" (else zero)
+ .apply/process() <- required: "function"
+
+In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
+windowing logic in many different ways so that it best fits your needs.
+
+## Window Lifecycle
+
+In a nutshell, a window is **created** as soon as the first element that
should belong to this window arrives, and the
+window is **completely removed** when the time (event or processing time)
passes its end timestamp plus the user-specified
+`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink
guarantees removal only for time-based
+windows and not for other types, *e.g.* global windows (see [Window
Assigners](#window-assigners)). For example, with an
+event-time-based windowing strategy that creates non-overlapping (or tumbling)
windows every 5 minutes and has an allowed
+lateness of 1 min, Flink will create a new window for the interval between
`12:00` and `12:05` when the first element with
+a timestamp that falls into this interval arrives, and it will remove it when
the watermark passes the `12:06`
+timestamp.
+
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and
a function (`WindowFunction` or `ProcessWindowFunction`)
+(see [Window Functions](#window-functions)) attached to it. The function will
contain the computation to
+be applied to the contents of the window, while the `Trigger` specifies the
conditions under which the window is
+considered ready for the function to be applied. A triggering policy might be
something like "when the number of elements
+in the window is more than 4", or "when the watermark passes the end of the
window". A trigger can also decide to
+purge a window's contents any time between its creation and removal. Purging
in this case only refers to the elements
+in the window, and *not* the window metadata. This means that new data can
still be added to that window.
+
+In the following we go into more detail for each of the components above. We
start with the required parts in the above
+snippet (see [Keyed Windows](#keyed-windows), [Window
Assigner](#window-assigners), and [Window Function](#window-functions))
+before moving to the optional ones.
+
+## Keyed Windows
+
+The first thing to specify is whether your stream should be keyed or not. This
has to be done before defining the window.
+Using the `key_by(...)` will split your infinite stream into logical keyed
streams. If `key_by(...)` is not called, your
+stream is not keyed.
+
+In the case of keyed streams, any attribute of your incoming events can be
used as a key
+(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state"
>}}#keyed-datastream)). Having a keyed stream will
+allow your windowed computation to be performed in parallel by multiple tasks,
as each logical keyed stream can be processed
+independently from the rest. All elements referring to the same key will be
sent to the same parallel task.
+
+## Window Assigners
+
+After specifying your stream is keyed, the next step is to define a *window
assigner*.
+The window assigner defines how elements are assigned to windows. This is done
by specifying the `WindowAssigner`
+of your choice in the `window(...)` (for *keyed* streams) call.
+
+A `WindowAssigner` is responsible for assigning each incoming element to one
or more windows.
+You can implement a custom window assigner by extending the `WindowAssigner`
class.
+
+Time-based windows have a *start timestamp* (inclusive) and an *end timestamp*
(exclusive)
+that together describe the size of the window. In code, Flink uses
`TimeWindow` when working with
+time-based windows which has methods for querying the start- and end-timestamp
and also an
+additional method `max_timestamp()` that returns the largest allowed timestamp
for a given windows.
+
+In the following, we show how to custom a *tumbling windows* assigner. For
details of Tumbling Windows, you can
+refer to the [the relevant documentation]({{< ref
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
+
+```python
+from typing import Tuple, Collection
+
+from pyflink.common.serializer import TypeSerializer
+from pyflink.datastream import WindowAssigner, Trigger
+from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
+
+class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
+
+ def __init__(self, size: int, offset: int, is_event_time: bool):
+ self._size = size
+ self._offset = offset
+ self._is_event_time = is_event_time
+
+ def assign_windows(self,
+ element: Tuple,
+ timestamp: int,
+ context: WindowAssigner.WindowAssignerContext) ->
Collection[TimeWindow]:
+ start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
+ return [TimeWindow(start, start + self._size)]
+
+ def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
+ return EventTimeTrigger()
+
+ def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
+ return TimeWindowSerializer()
+
+ def is_event_time(self) -> bool:
+ return False
+```
+
+## Window Functions
+
+After defining the window assigner, we need to specify the computation that we
want
+to perform on each of these windows. This is the responsibility of the *window
function*, which is used to process the
+elements of each keyed window once the system determines that a window is
ready for processing
+(see [triggers](#triggers) for how Flink determines when a window is ready).
+
+The window function can be `ProcessWindowFunction` or `WindowFunction`. They
get an `Iterable` for all the elements contained in a
+window and additional meta information about the window to which the elements
belong.
+
+In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`.
+This is an older version of ProcessWindowFunction that provides less
contextual information and
+does not have some advances features, such as per-window keyed state. We will
look at examples for each of these variants.
+
+### ProcessWindowFunction
+
+A ProcessWindowFunction gets an Iterable containing all the elements of the
window, and a Context
+object with access to time and state information, which enables it to provide
more flexibility than
+other window functions. This comes at the cost of performance and resource
consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered
internally until the
+window is considered ready for processing.
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows using a context
+ for retrieving extra information.
+ """
+
+ class Context(ABC, Generic[W2]):
+ """
+ The context holding window metadata.
+ """
+
+ @abstractmethod
+ def window(self) -> W2:
+ """
+ :return: The window that is being evaluated.
+ """
+ pass
+
+ @abstractmethod
+ def current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def current_watermark(self) -> int:
+ """
+ :return: The current event-time watermark.
+ """
+ pass
+
+ @abstractmethod
+ def window_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key and per-window state.
+
+ .. note::
+ If you use per-window state you have to ensure that you clean
it up by implementing
+ :func:`~ProcessWindowFunction.clear`.
+
+ :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ """
+ pass
+
+ @abstractmethod
+ def global_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key global state.
+ """
+ pass
+
+ @abstractmethod
+ def process(self,
+ key: KEY,
+ content: 'ProcessWindowFunction.Context',
+ elements: Iterable[IN]) -> Iterable[OUT]:
+ """
+ Evaluates the window and outputs none or several elements.
+
+ :param key: The key for which this window is evaluated.
+ :param content: The context in which the window is being evaluated.
+ :param elements: The elements in the window being evaluated.
+ :return: The iterable object which produces the elements to emit.
+ """
+ pass
+
+ @abstractmethod
+ def clear(self, context: 'ProcessWindowFunction.Context') -> None:
+ """
+ Deletes any state in the :class:`Context` when the Window expires (the
watermark passes its
+ max_timestamp + allowed_lateness).
+
+ :param context: The context to which the window is being evaluated.
+ """
+ pass
+```
+
+The `key` parameter is the key that is extracted
+via the `KeySelector` that was specified for the `key_by()` invocation. In
case of tuple-index
+keys or string-field references this key type is always `Tuple` and you have
to manually cast
+it to a tuple of the correct size to extract the key fields.
+
+A `ProcessWindowFunction` can be defined and used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str,
TimeWindow]):
+
+ def process(self,
+ key: str,
+ content: ProcessWindowFunction.Context,
+ elements: Iterable[Tuple]) -> Iterable[tuple]:
+ result = 0
+ for i in elements:
+ result += i[0]
+ return [(key, result)]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+data_stream = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+ type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+ .window(TumblingEventWindowAssigner()) \
+ .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))
+```
+### WindowFunction
+
+In some places where a `ProcessWindowFunction` can be used you can also use a
`WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual
information and does
+not have some advances features, such as per-window keyed state. This
interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+```python
+class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows.
+ """
+
+ @abstractmethod
+ def apply(self, key: KEY, window: W, inputs: Iterable[IN]) ->
Iterable[OUT]:
+ """
+ Evaluates the window and outputs none or several elements.
+
+ :param key: The key for which this window is evaluated.
+ :param window: The window that is being evaluated.
+ :param inputs: The elements in the window being evaluated.
+ """
+ pass
+```
+
+It can be used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
+
+ def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
+ result = 0
+ for i in inputs:
+ result += i[0]
+ return [(key, result)]
+
+data_stream = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+ type_info=Types.TUPLE([Types.INT(), Types.STRING()])) # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+ .window(TumblingEventWindowAssigner()) \
+ .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+```
+
+## Triggers
+
+A Trigger determines when a window (as formed by the window assigner) is ready
to be processed by the window function.
+Each WindowAssigner comes with a default Trigger. You can specify a custom
trigger using trigger(...).
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class Trigger(ABC, Generic[T, W]):
+ """
+ A Trigger determines when a pane of a window should be evaluated to emit
the results for that
+ part of the window.
+
+ A pane is the bucket of elements that have the same key (assigned by the
KeySelector) and same
+ Window. An element can be in multiple panes if it was assigned to multiple
windows by the
+ WindowAssigner. These panes all have their own instance of the Trigger.
+
+ Triggers must not maintain state internally since they can be re-created
or reused for different
+ keys. All necessary state should be persisted using the state abstraction
available on the
+ TriggerContext.
+
+ When used with a MergingWindowAssigner the Trigger must return true from
:func:`can_merge` and
+ :func:`on_merge` most be properly implemented.
+ """
+
+ class TriggerContext(ABC):
+ """
+ A context object that is given to :class:`Trigger` methods to allow
them to register timer
+ callbacks and deal with state.
+ """
+
+ @abstractmethod
+ def get_current_processing_time(self) -> int:
+ """
+ :return: The current processing time.
+ """
+ pass
+
+ @abstractmethod
+ def get_metric_group(self) -> MetricGroup:
+ """
+ Returns the metric group for this :class:`Trigger`. This is the
same metric group that
+ would be returned from
+
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user
function.
+
+ :return: The metric group.
+ """
+ pass
+
+ @abstractmethod
+ def get_current_watermark(self) -> int:
+ """
+ :return: The current watermark time.
+ """
+ pass
+
+ @abstractmethod
+ def register_processing_time_timer(self, time: int) -> None:
+ """
+ Register a system time callback. When the current system time
passes the specified time
+ :func:`~Trigger.on_processing_time` is called with the time
specified here.
+
+ :param time: The time at which to invoke
:func:`~Trigger.on_processing_time`.
+ """
+ pass
+
+ @abstractmethod
+ def register_event_time_timer(self, time: int) -> None:
+ """
+ Register an event-time callback. When the current watermark passes
the specified time
+ :func:`~Trigger.on_event_time` is called with the time specified
here.
+
+ :param time: The watermark at which to invoke
:func:`~Trigger.on_event_time`.
+ """
+ pass
+
+ @abstractmethod
+ def delete_processing_time_timer(self, time: int) -> None:
+ """
+ Delete the processing time trigger for the given time.
+ """
+ pass
+
+ @abstractmethod
+ def delete_event_time_timer(self, time: int) -> None:
+ """
+ Delete the event-time trigger for the given time.
+ """
+ pass
+
+ @abstractmethod
+ def get_partitioned_state(self, state_descriptor: StateDescriptor) ->
State:
+ """
+ Retrieves a :class:`State` object that can be used to interact
with fault-tolerant state
+ that is scoped to the window and key of the current trigger
invocation.
+
+ :param state_descriptor: The StateDescriptor that contains the
name and type of the
+ state that is being accessed.
+ :return: The partitioned state object.
+ """
+ pass
+
+ class OnMergeContext(TriggerContext):
+ """
+ Extension of :class:`TriggerContext` that is given to
:func:`~Trigger.on_merge`.
+ """
+
+ @abstractmethod
+ def merge_partitioned_state(self, state_descriptor: StateDescriptor)
-> None:
+ pass
+
+ @abstractmethod
+ def on_element(self,
+ element: T,
+ timestamp: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called for every element that gets added to a pane. The result of this
will determine
+ whether the pane is evaluated to emit results.
+
+ :param element: The element that arrived.
+ :param timestamp: The timestamp of the element that arrived.
+ :param window: The window to which the element is being added.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ @abstractmethod
+ def on_processing_time(self,
+ time: int,
+ window: W,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called when a processing-time timer that was set using the trigger
context fires.
+
+ :param time: The timestamp at which the timer fired.
+ :param window: The window for which the timer fired.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ @abstractmethod
+ def on_event_time(self, time: int, window: W, ctx:
'Trigger.TriggerContext') -> TriggerResult:
+ """
+ Called when an event-time timer that was set using the trigger context
fires.
+
+ :param time: The timestamp at which the timer fired.
+ :param window: The window for which the timer fired.
+ :param ctx: A context object that can be used to register timer
callbacks.
+ """
+ pass
+
+ def can_merge(self) -> bool:
+ """
+ .. note:: If this returns true you must properly implement
:func:`~Trigger.on_merge`
+
+ :return: True if this trigger supports merging of trigger state and
can therefore be used
+ with a MergingWindowAssigner.
+ """
+ return False
+
+ @abstractmethod
+ def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
+ """
+ Called when several windows have been merged into one window by the
:class:`WindowAssigner`.
+
+ :param window: The new window that results from the merge.
+ :param ctx: A context object that can be used to register timer
callbacks and access state.
+ """
+ pass
+
+ @abstractmethod
+ def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
+ """
+ Clears any state that the trigger might still hold for the given
window. This is called when
+ a window is purged. Timers set using
:func:`~TriggerContext.register_event_time_timer` and
+ :func:`~TriggerContext.register_processing_time_timer` should be
deleted here as well as
+ state acquired using :func:`~TriggerContext.get_partitioned_state`.
+ """
+ pass
+```
+
+Two things to notice about the above methods are:
+
+1) The first three(*on_element*, *on_processing_time* and *on_event_time*)
decide how to act on their invocation event by returning a `TriggerResult`.
+The action can be one of the following:
+
+* `CONTINUE`: do nothing,
+* `FIRE`: trigger the computation,
+* `PURGE`: clear the elements in the window, and
+* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the
window afterwards.
+
+2) Any of these methods can be used to register processing- or event-time
timers for future actions.
+
+### Fire and Purge
+
+Once a trigger determines that a window is ready for processing, it fires,
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the
window operator
+to emit the result of the current window. Given a window with a
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
+
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE`
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
+
+You can implement a custom EventTimeTrigger as follows:
+
+```python
+from typing import Tuple
+from pyflink.datastream.window import TimeWindow
+
+class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
+
+ def on_element(self,
+ element: Tuple,
+ timestamp: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_processing_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ return TriggerResult.CONTINUE
+
+ def on_event_time(self,
+ time: int,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> TriggerResult:
+ if time >= window.max_timestamp():
+ return TriggerResult.FIRE_AND_PURGE
+ else:
+ return TriggerResult.CONTINUE
+
+ def on_merge(self,
+ window: TimeWindow,
+ ctx: 'Trigger.OnMergeContext') -> None:
+ pass
+
+ def clear(self,
+ window: TimeWindow,
+ ctx: 'Trigger.TriggerContext') -> None:
+ pass
+```
+
+## Allowed Lateness
+
+When working with *event-time* windowing, it can happen that elements arrive
late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp of
a window to which an element belongs. See
+[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks"
>}}) and especially [late elements]({{< ref
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a
more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped when the watermark is past the end of
the window. However,
+Flink allows to specify a maximum *allowed lateness* for window operators.
Allowed lateness
+specifies by how much time elements can be late before they are dropped, and
its default value is 0.
+Elements that arrive after the watermark has passed the end of the window but
before it passes the end of
+the window plus the allowed lateness, are still added to the window. Depending
on the trigger used,
+a late but not dropped element may cause the window to fire again.
+
+In order to make this work, Flink keeps the state of windows until their
allowed lateness expires. Once this happens, Flink removes the window and
deletes its state, as
+also described in the [Window Lifecycle](#window-lifecycle) section.
+
+By default, the allowed lateness is set to `0`. That is, elements that arrive
behind the watermark will be dropped.
+
+You can specify an allowed lateness like this:
+
+```python
+data_stream.key_by(<key selector>) \
+ .window(<window assigner>) \
+ .allowed_lateness(<time>) \
+ .<windowed transformation>(<window function>)
+```