dianfu commented on a change in pull request #19176:
URL: https://github.com/apache/flink/pull/19176#discussion_r831686463



##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -557,6 +557,31 @@ input
     .aggregate(new AverageAggregate)
 ```
 {{< /tab >}}
+{{< tab "Python" >}}

Review comment:
       should also update the chinese doc: 
docs.zh/content/docs/dev/datastream/operators/windows.md

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1403,6 +1403,50 @@ def reduce(self,
                                             reducing_state_descriptor,
                                             output_type)
 
+    def aggregate(self,
+                  aggregate_function: AggregateFunction,
+                  window_function: Union[WindowFunction, 
ProcessWindowFunction] = None,
+                  accumulator_type: TypeInformation = None,
+                  result_type: TypeInformation = None) -> DataStream:

Review comment:
       ```suggestion
                     output_type: TypeInformation = None) -> DataStream:
   ```
   
   Keep it consistent with the other methods.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1403,6 +1403,50 @@ def reduce(self,
                                             reducing_state_descriptor,
                                             output_type)
 
+    def aggregate(self,
+                  aggregate_function: AggregateFunction,
+                  window_function: Union[WindowFunction, 
ProcessWindowFunction] = None,
+                  accumulator_type: TypeInformation = None,
+                  result_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function 
is called for each
+        evaluation of the window for each key individually. The output of the 
window function is
+        interpreted as a regular non-windowed stream.
+
+        Arriving data is incrementally aggregated using the given aggregate 
function. This means
+        that the window function typically has only a single value to process 
when called.
+
+        :param aggregate_function: The aggregation function that is used for 
incremental
+            aggregation.
+        :param window_function: The window function.
+        :param accumulator_type: Type information for the internal accumulator 
type of the
+            aggregation function.
+        :param result_type: Type information for the result type of the window 
function.
+        :return: The data stream that is the result of applying the window 
function to the window.
+        """

Review comment:
       .. versionadded:: 1.16.0

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -676,6 +701,72 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: 
Window] extends Function
 }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
+
+  @abstractmethod

Review comment:
       The method `clear` is missing. Should also update the Java/Scala example.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1403,6 +1403,50 @@ def reduce(self,
                                             reducing_state_descriptor,
                                             output_type)
 
+    def aggregate(self,
+                  aggregate_function: AggregateFunction,
+                  window_function: Union[WindowFunction, 
ProcessWindowFunction] = None,
+                  accumulator_type: TypeInformation = None,
+                  result_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function 
is called for each
+        evaluation of the window for each key individually. The output of the 
window function is
+        interpreted as a regular non-windowed stream.
+
+        Arriving data is incrementally aggregated using the given aggregate 
function. This means
+        that the window function typically has only a single value to process 
when called.
+

Review comment:
       What about showing a simple example about this API?

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -814,6 +926,27 @@ input
 
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input = ...  # type: DataStream
+
+input \
+  .key_by(<key selector>) \
+  .window(<window assigner>) \
+  .reduce(lambda v1, v2: (v1[0], v1[1] + v2[1]),

Review comment:
       The implementation is incorrect. It doesn't match the Java/Scala 
example, also doesn't match the description of this section: "The following 
example shows how an incremental ReduceFunction can be combined with a 
ProcessWindowFunction to return the smallest event in a window along with the 
start time of the window."

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -1017,6 +1205,16 @@ input
     .apply(new MyWindowFunction())
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input = ...  # type: DataStream
+
+input \
+    .key_by(<key selector>) \
+    .window(<window assigner>) \
+    .apply(new MyWindowFunction())

Review comment:
       ```suggestion
       .apply(MyWindowFunction())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to