dianfu commented on a change in pull request #19054:
URL: https://github.com/apache/flink/pull/19054#discussion_r828775940



##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -462,6 +462,17 @@ input
     .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input: DataStream = ...
+
+input

Review comment:
       ```suggestion
   input \
   ```

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -462,6 +462,17 @@ input
     .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

Review comment:
       Should also update file 
content.zh/docs/dev/datastream/operators/windows.md

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -462,6 +462,17 @@ input
     .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input: DataStream = ...

Review comment:
       ```suggestion
   input = ...  # type: DataStream
   ```

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -462,6 +462,17 @@ input
     .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input: DataStream = ...
+
+input
+    .key_by(<key selector>)
+    .window(<window assigner>)

Review comment:
       ```suggestion
       .window(<window assigner>) \
   ```

##########
File path: docs/content/docs/dev/datastream/operators/windows.md
##########
@@ -462,6 +462,17 @@ input
     .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
 ```
 {{< /tab >}}
+{{< tab "Python" >}}
+```python
+input: DataStream = ...
+
+input
+    .key_by(<key selector>)

Review comment:
       ```suggestion
       .key_by(<key selector>) \
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1347,8 +1353,58 @@ def allowed_lateness(self, time_ms: int):
         self._allowed_lateness = time_ms
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[WindowFunction, ProcessWindowFunction] = 
None,
+               output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies a reduce function to the window. The window function is called 
for each evaluation
+        of the window for each key individually. The output of the reduce 
function is interpreted as
+        a regular non-windowed stream.
+
+        This window will try and incrementally aggregate data as much as the 
window policies
+        permit. For example, tumbling time windows can aggregate the data, 
meaning that only one
+        element per key is stored. Sliding time windows will aggregate on the 
granularity of the
+        slide interval, so a few elements are stored per key (one per slide 
interval). Custom
+        windows may not be able to incrementally aggregate, or may need to 
store extra values in an
+        aggregation tree.
+
+        Example:
+        ::
+
+            >>> ds.key_by(lambda x: x[1]) \
+                    .window(TumblingEventTimeWindow.of(Time.seconds(5))) \

Review comment:
       ```suggestion
                       .window(TumblingEventTimeWindows.of(Time.seconds(5))) \
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1347,8 +1353,58 @@ def allowed_lateness(self, time_ms: int):
         self._allowed_lateness = time_ms
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[WindowFunction, ProcessWindowFunction] = 
None,
+               output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies a reduce function to the window. The window function is called 
for each evaluation
+        of the window for each key individually. The output of the reduce 
function is interpreted as
+        a regular non-windowed stream.
+
+        This window will try and incrementally aggregate data as much as the 
window policies
+        permit. For example, tumbling time windows can aggregate the data, 
meaning that only one
+        element per key is stored. Sliding time windows will aggregate on the 
granularity of the
+        slide interval, so a few elements are stored per key (one per slide 
interval). Custom
+        windows may not be able to incrementally aggregate, or may need to 
store extra values in an
+        aggregation tree.
+
+        Example:
+        ::
+
+            >>> ds.key_by(lambda x: x[1]) \
+                    .window(TumblingEventTimeWindow.of(Time.seconds(5))) \

Review comment:
       The format needs to be corrected. Currently, the generated doc looks 
like the following:
   
![image](https://user-images.githubusercontent.com/5466492/158746445-b5e7a7ec-2e58-41f0-8841-8776afd7ebeb.png)
   

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1347,8 +1353,58 @@ def allowed_lateness(self, time_ms: int):
         self._allowed_lateness = time_ms
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[WindowFunction, ProcessWindowFunction] = 
None,
+               output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies a reduce function to the window. The window function is called 
for each evaluation
+        of the window for each key individually. The output of the reduce 
function is interpreted as
+        a regular non-windowed stream.
+
+        This window will try and incrementally aggregate data as much as the 
window policies
+        permit. For example, tumbling time windows can aggregate the data, 
meaning that only one
+        element per key is stored. Sliding time windows will aggregate on the 
granularity of the
+        slide interval, so a few elements are stored per key (one per slide 
interval). Custom
+        windows may not be able to incrementally aggregate, or may need to 
store extra values in an
+        aggregation tree.
+
+        Example:
+        ::
+
+            >>> ds.key_by(lambda x: x[1]) \
+                    .window(TumblingEventTimeWindow.of(Time.seconds(5))) \
+                    .reduce(lambda a, b: a[0] + b[0], b[1])
+
+        :param reduce_function: The reduce function.
+        :param window_function: The window function.
+        :param output_type: Type information for the result type of the window 
function.
+        :return: The data stream that is the result of applying the reduce 
function to the window.
+
+        .. versionadded:: 1.16

Review comment:
       ```suggestion
           .. versionadded:: 1.16.0
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to