dianfu commented on code in PR #19758:
URL: https://github.com/apache/flink/pull/19758#discussion_r876618388
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1548,6 +1552,20 @@ def count_window(self, size: int, slide: int = 0):
else:
return WindowedStream(self, CountSlidingWindowAssigner(size,
slide))
+ def window_all(self, window_assigner: WindowAssigner) ->
'AllWindowedStream':
Review Comment:
Could we also add `count_window_all` to align with Java API?
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1548,6 +1552,20 @@ def count_window(self, size: int, slide: int = 0):
else:
return WindowedStream(self, CountSlidingWindowAssigner(size,
slide))
+ def window_all(self, window_assigner: WindowAssigner) ->
'AllWindowedStream':
Review Comment:
This method should appear in DataStream instead of KeyedStream.
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
j_python_data_stream_function_operator))
+class AllWindowedStream(object):
+ """
+ A AllWindowedStream represents a data stream where the stream of elements
is split into windows
+ based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+ If an Evictor is specified it will be used to evict elements from the
window after evaluation
+ was triggered by the Trigger but before the actual evaluation of the
window.
+ When using an evictor, window performance will degrade significantly,
since pre-aggregation of
+ window results cannot be used.
+
+ Note that the AllWindowedStream is purely an API construct, during runtime
the AllWindowedStream
+ will be collapsed together with the operation over the window into one
single operation.
+ """
+
+ def __init__(self, data_stream: DataStream, window_assigner:
WindowAssigner):
+ self._input_stream = data_stream.key_by(NullByteKeySelector())
+ self._window_assigner = window_assigner
+ self._allowed_lateness = 0
+ self._late_data_output_tag = None # type: Optional[OutputTag]
+ self._window_trigger = window_assigner.get_default_trigger(
+ self._input_stream.get_execution_environment()) # type: Trigger
+
+ def get_input_type(self):
+ return
_from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+ def trigger(self, trigger: Trigger):
Review Comment:
```suggestion
def trigger(self, trigger: Trigger) -> 'AllWindowedStream':
```
Need also update the other methods.
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
j_python_data_stream_function_operator))
+class AllWindowedStream(object):
+ """
+ A AllWindowedStream represents a data stream where the stream of elements
is split into windows
+ based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+ If an Evictor is specified it will be used to evict elements from the
window after evaluation
+ was triggered by the Trigger but before the actual evaluation of the
window.
+ When using an evictor, window performance will degrade significantly,
since pre-aggregation of
+ window results cannot be used.
+
+ Note that the AllWindowedStream is purely an API construct, during runtime
the AllWindowedStream
+ will be collapsed together with the operation over the window into one
single operation.
+ """
+
+ def __init__(self, data_stream: DataStream, window_assigner:
WindowAssigner):
+ self._input_stream = data_stream.key_by(NullByteKeySelector())
+ self._window_assigner = window_assigner
+ self._allowed_lateness = 0
+ self._late_data_output_tag = None # type: Optional[OutputTag]
Review Comment:
this field is never used. I guess you missed method `sideOutputLateData`
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
j_python_data_stream_function_operator))
+class AllWindowedStream(object):
+ """
+ A AllWindowedStream represents a data stream where the stream of elements
is split into windows
+ based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+ If an Evictor is specified it will be used to evict elements from the
window after evaluation
+ was triggered by the Trigger but before the actual evaluation of the
window.
+ When using an evictor, window performance will degrade significantly,
since pre-aggregation of
+ window results cannot be used.
+
+ Note that the AllWindowedStream is purely an API construct, during runtime
the AllWindowedStream
+ will be collapsed together with the operation over the window into one
single operation.
+ """
+
+ def __init__(self, data_stream: DataStream, window_assigner:
WindowAssigner):
+ self._input_stream = data_stream.key_by(NullByteKeySelector())
+ self._window_assigner = window_assigner
+ self._allowed_lateness = 0
+ self._late_data_output_tag = None # type: Optional[OutputTag]
+ self._window_trigger = window_assigner.get_default_trigger(
+ self._input_stream.get_execution_environment()) # type: Trigger
+
+ def get_input_type(self):
+ return
_from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+ def trigger(self, trigger: Trigger):
+ """
+ Sets the Trigger that should be used to trigger window emission.
+ """
+ if isinstance(self._window_assigner, MergingWindowAssigner) \
+ and (trigger.can_merge() is not True):
+ raise TypeError("A merging window assigner cannot be used with a
trigger that does "
+ "not support merging.")
+
+ self._window_trigger = trigger
+ return self
+
+ def allowed_lateness(self, time_ms: int):
+ """
+ Sets the time by which elements are allowed to be late. Elements that
arrive behind the
+ watermark by more than the specified time will be dropped. By default,
the allowed lateness
+ is 0.
+
+ Setting an allowed lateness is only valid for event-time windows.
+ """
+ self._allowed_lateness = time_ms
+ return self
+
+ def apply(self,
+ window_function: AllWindowFunction,
+ output_type: TypeInformation = None) -> DataStream:
+ """
+ Applies the given window function to each window. The window function
is called for each
+ evaluation of the window. The output of the window function is
interpreted as a regular
+ non-windowed stream.
+
+ Note that this function requires that all data in the windows is
buffered until the window
+ is evaluated, as the function provides no means of incremental
aggregation.
+
+ :param window_function: The window function.
+ :param output_type: Type information for the result type of the window
function.
+ :return: The data stream that is the result of applying the window
function to the window.
+ """
+ internal_window_function = InternalIterableAllWindowFunction(
+ window_function) # type: InternalWindowFunction
+ list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME,
self.get_input_type())
+ return self._get_result_data_stream(internal_window_function,
+ list_state_descriptor,
+ output_type)
+
+ def process(self,
+ process_window_function: ProcessAllWindowFunction,
+ output_type: TypeInformation = None) -> DataStream:
+ """
+ Applies the given window function to each window. The window function
is called for each
+ evaluation of the window for each key individually. The output of the
window function is
+ interpreted as a regular non-windowed stream.
+
+ Note that this function requires that all data in the windows is
buffered until the window
+ is evaluated, as the function provides no means of incremental
aggregation.
+
+ :param process_window_function: The window function.
+ :param output_type: Type information for the result type of the window
function.
+ :return: The data stream that is the result of applying the window
function to the window.
+ """
+ internal_window_function = InternalIterableProcessAllWindowFunction(
+ process_window_function) # type: InternalWindowFunction
+ list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME,
self.get_input_type())
+ return self._get_result_data_stream(internal_window_function,
+ list_state_descriptor,
+ output_type)
+
+ def _get_result_data_stream(self,
Review Comment:
It seems that the implementation is the same as
WindowedStream._get_result_data_stream. Could we abstract it a bit to avoid
duplication?
##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -38,6 +38,7 @@
'ReduceFunction',
'AggregateFunction',
'KeySelector',
+ 'NullByteKeySelector',
Review Comment:
We don't need to expose it as it's just used internally.
##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -979,6 +1008,68 @@ def clear(self, context: 'ProcessWindowFunction.Context')
-> None:
pass
+class ProcessAllWindowFunction(Function, Generic[IN, OUT, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows using a context
Review Comment:
```suggestion
Base interface for functions that are evaluated over non-keyed windows
using a context
```
##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -979,6 +1008,68 @@ def clear(self, context: 'ProcessWindowFunction.Context')
-> None:
pass
+class ProcessAllWindowFunction(Function, Generic[IN, OUT, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows using a context
+ for retrieving extra information.
+ """
+
+ class Context(ABC, Generic[W2]):
+ """
+ The context holding window metadata.
+ """
+
+ @abstractmethod
+ def window(self) -> W2:
+ """
+ :return: The window that is being evaluated.
+ """
+ pass
+
+ @abstractmethod
+ def window_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key and per-window state.
+
+ .. note::
+ If you use per-window state you have to ensure that you clean
it up by implementing
+ :func:`~ProcessWindowFunction.clear`.
+
+ :return: The :class:`KeyedStateStore` used to access per-key and
per-window states.
+ """
+ pass
+
+ @abstractmethod
+ def global_state(self) -> KeyedStateStore:
+ """
+ State accessor for per-key global state.
+ """
+ pass
+
+ @abstractmethod
+ def process(self,
+ context: 'ProcessAllWindowFunction.Context',
+ elements: Iterable[IN]) -> Iterable[OUT]:
+ """
+ Evaluates the window and outputs none or several elements.
+
+ :param context: The context in which the window is being evaluated.
+ :param elements: The elements in the window being evaluated.
+ :return: The iterable object which produces the elements to emit.
+ """
+ pass
+
+ @abstractmethod
Review Comment:
The `abstractmethod` could be removed as users don't have to override this
method. See the Java ones for more details. It seems that the clear method in
`ProcessWindowFunction` should also be updated.
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
j_python_data_stream_function_operator))
+class AllWindowedStream(object):
+ """
+ A AllWindowedStream represents a data stream where the stream of elements
is split into windows
+ based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+ If an Evictor is specified it will be used to evict elements from the
window after evaluation
+ was triggered by the Trigger but before the actual evaluation of the
window.
+ When using an evictor, window performance will degrade significantly,
since pre-aggregation of
+ window results cannot be used.
+
+ Note that the AllWindowedStream is purely an API construct, during runtime
the AllWindowedStream
+ will be collapsed together with the operation over the window into one
single operation.
+ """
+
+ def __init__(self, data_stream: DataStream, window_assigner:
WindowAssigner):
+ self._input_stream = data_stream.key_by(NullByteKeySelector())
+ self._window_assigner = window_assigner
+ self._allowed_lateness = 0
+ self._late_data_output_tag = None # type: Optional[OutputTag]
+ self._window_trigger = window_assigner.get_default_trigger(
+ self._input_stream.get_execution_environment()) # type: Trigger
+
+ def get_input_type(self):
+ return
_from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+ def trigger(self, trigger: Trigger):
+ """
+ Sets the Trigger that should be used to trigger window emission.
+ """
+ if isinstance(self._window_assigner, MergingWindowAssigner) \
+ and (trigger.can_merge() is not True):
+ raise TypeError("A merging window assigner cannot be used with a
trigger that does "
+ "not support merging.")
+
+ self._window_trigger = trigger
+ return self
+
+ def allowed_lateness(self, time_ms: int):
+ """
+ Sets the time by which elements are allowed to be late. Elements that
arrive behind the
+ watermark by more than the specified time will be dropped. By default,
the allowed lateness
+ is 0.
+
+ Setting an allowed lateness is only valid for event-time windows.
+ """
+ self._allowed_lateness = time_ms
+ return self
+
+ def apply(self,
+ window_function: AllWindowFunction,
+ output_type: TypeInformation = None) -> DataStream:
+ """
+ Applies the given window function to each window. The window function
is called for each
+ evaluation of the window. The output of the window function is
interpreted as a regular
+ non-windowed stream.
+
+ Note that this function requires that all data in the windows is
buffered until the window
+ is evaluated, as the function provides no means of incremental
aggregation.
+
+ :param window_function: The window function.
+ :param output_type: Type information for the result type of the window
function.
+ :return: The data stream that is the result of applying the window
function to the window.
+ """
+ internal_window_function = InternalIterableAllWindowFunction(
+ window_function) # type: InternalWindowFunction
+ list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME,
self.get_input_type())
+ return self._get_result_data_stream(internal_window_function,
+ list_state_descriptor,
+ output_type)
+
+ def process(self,
Review Comment:
aggregate/reduce functions are missing.
##########
flink-python/pyflink/datastream/functions.py:
##########
@@ -901,6 +914,22 @@ def apply(self, key: KEY, window: W, inputs: Iterable[IN])
-> Iterable[OUT]:
pass
+class AllWindowFunction(Function, Generic[IN, OUT, W]):
+ """
+ Base interface for functions that are evaluated over keyed (grouped)
windows.
Review Comment:
```suggestion
Base interface for functions that are evaluated over non-keyed windows.
```
##########
flink-python/pyflink/datastream/data_stream.py:
##########
@@ -1881,6 +1899,129 @@ def _get_result_data_stream(self,
j_python_data_stream_function_operator))
+class AllWindowedStream(object):
+ """
+ A AllWindowedStream represents a data stream where the stream of elements
is split into windows
+ based on a WindowAssigner. Window emission is triggered based on a Trigger.
+
+ If an Evictor is specified it will be used to evict elements from the
window after evaluation
+ was triggered by the Trigger but before the actual evaluation of the
window.
+ When using an evictor, window performance will degrade significantly,
since pre-aggregation of
+ window results cannot be used.
+
+ Note that the AllWindowedStream is purely an API construct, during runtime
the AllWindowedStream
+ will be collapsed together with the operation over the window into one
single operation.
+ """
+
+ def __init__(self, data_stream: DataStream, window_assigner:
WindowAssigner):
+ self._input_stream = data_stream.key_by(NullByteKeySelector())
+ self._window_assigner = window_assigner
+ self._allowed_lateness = 0
+ self._late_data_output_tag = None # type: Optional[OutputTag]
+ self._window_trigger = window_assigner.get_default_trigger(
+ self._input_stream.get_execution_environment()) # type: Trigger
+
+ def get_input_type(self):
+ return
_from_java_type(self._input_stream._original_data_type_info.get_java_type_info())
+
+ def trigger(self, trigger: Trigger):
+ """
+ Sets the Trigger that should be used to trigger window emission.
+ """
+ if isinstance(self._window_assigner, MergingWindowAssigner) \
+ and (trigger.can_merge() is not True):
+ raise TypeError("A merging window assigner cannot be used with a
trigger that does "
+ "not support merging.")
+
+ self._window_trigger = trigger
+ return self
+
+ def allowed_lateness(self, time_ms: int):
+ """
+ Sets the time by which elements are allowed to be late. Elements that
arrive behind the
+ watermark by more than the specified time will be dropped. By default,
the allowed lateness
+ is 0.
+
+ Setting an allowed lateness is only valid for event-time windows.
+ """
+ self._allowed_lateness = time_ms
+ return self
+
+ def apply(self,
+ window_function: AllWindowFunction,
+ output_type: TypeInformation = None) -> DataStream:
+ """
+ Applies the given window function to each window. The window function
is called for each
+ evaluation of the window. The output of the window function is
interpreted as a regular
+ non-windowed stream.
+
+ Note that this function requires that all data in the windows is
buffered until the window
+ is evaluated, as the function provides no means of incremental
aggregation.
+
+ :param window_function: The window function.
+ :param output_type: Type information for the result type of the window
function.
+ :return: The data stream that is the result of applying the window
function to the window.
+ """
+ internal_window_function = InternalIterableAllWindowFunction(
+ window_function) # type: InternalWindowFunction
+ list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME,
self.get_input_type())
+ return self._get_result_data_stream(internal_window_function,
+ list_state_descriptor,
+ output_type)
+
+ def process(self,
+ process_window_function: ProcessAllWindowFunction,
+ output_type: TypeInformation = None) -> DataStream:
+ """
+ Applies the given window function to each window. The window function
is called for each
+ evaluation of the window for each key individually. The output of the
window function is
+ interpreted as a regular non-windowed stream.
+
+ Note that this function requires that all data in the windows is
buffered until the window
+ is evaluated, as the function provides no means of incremental
aggregation.
+
+ :param process_window_function: The window function.
+ :param output_type: Type information for the result type of the window
function.
+ :return: The data stream that is the result of applying the window
function to the window.
+ """
+ internal_window_function = InternalIterableProcessAllWindowFunction(
+ process_window_function) # type: InternalWindowFunction
+ list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME,
self.get_input_type())
+ return self._get_result_data_stream(internal_window_function,
+ list_state_descriptor,
+ output_type)
+
+ def _get_result_data_stream(self,
+ internal_window_function:
InternalWindowFunction,
+ window_state_descriptor: StateDescriptor,
+ output_type: TypeInformation):
+ if self._window_trigger is None:
+ self._window_trigger = self._window_assigner.get_default_trigger(
+ self._input_stream.get_execution_environment())
+ window_serializer = self._window_assigner.get_window_serializer()
+ window_operation_descriptor = WindowOperationDescriptor(
+ self._window_assigner,
+ self._window_trigger,
+ self._allowed_lateness,
+ self._late_data_output_tag,
+ window_state_descriptor,
+ window_serializer,
+ internal_window_function)
+
+ from pyflink.fn_execution import flink_fn_execution_pb2
+ j_python_data_stream_function_operator, j_output_type_info = \
+ _get_one_input_stream_operator(
+ self._input_stream,
+ window_operation_descriptor,
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.WINDOW,
# type: ignore
+ output_type)
+
+ return DataStream(self._input_stream._j_data_stream.transform(
+ "WINDOW",
Review Comment:
For the operator name, I suggest to keep it consistent with the Java API to
use the class name of the window assigner instead of just using `WINDOW`.
Besides, we could also use the set_description to give more detailed
description for the operator just as what is done in the Java WindowedStream.
##########
flink-python/pyflink/datastream/tests/test_window.py:
##########
@@ -441,6 +442,35 @@ def process(self, key, context:
ProcessWindowFunction.Context,
expected = ['(hi,2)', '(hi,2)', '(hi,2)']
self.assert_equals_sorted(expected, results)
+ def test_event_time_tumbling_window_all(self):
+ data_stream = self.env.from_collection([
+ ('hi', 1), ('hello', 2), ('hi', 3), ('hello', 4), ('hello', 5),
('hi', 8), ('hi', 9),
+ ('hi', 15)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type:
DataStream
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+ class CountAllWindowProcessFunction(ProcessAllWindowFunction[tuple,
tuple, TimeWindow]):
+ def process(self, context: 'ProcessAllWindowFunction.Context',
+ elements: Iterable[tuple]) -> Iterable[tuple]:
+ return [
+ (context.window().start, context.window().end, len([e for
e in elements]))]
+
+ def clear(self, context: 'ProcessAllWindowFunction.Context') ->
None:
+ pass
+
+ data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
Review Comment:
window_all should applied to DataStream instead of KeyedStream and so please
remove this line.
##########
docs/content.zh/docs/dev/datastream/operators/windows.md:
##########
@@ -1604,6 +1604,21 @@ val globalResults = resultsPerKey
.process(new TopKWindowFunction())
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+input = ... # type: DataStream
+
+input \
+ .key_by(<key selector>) \
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .reduce(Summer())
+
+input \
Review Comment:
This is not consistent with the Java example. Please double check that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]