javacaoyu commented on a change in pull request #19126:
URL: https://github.com/apache/flink/pull/19126#discussion_r829785324



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1168,6 +1168,96 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = self.env.from_collection([('a', 1), ('a', 2), ('a', 3), 
('b', 1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = self.env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and {pyflink.common.types.Row} 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be a int or str type for locate 
the value to sum")
+
+        output_type = 
_from_java_type(self._original_data_type_info.get_java_type_info())
+
+        class SumKeyedProcessFunctionAdapter(KeyedProcessFunction):

Review comment:
       Its a good idea
   By logic, Apply reduce as the underlying implementation of sum operation is 
good design i think.
   I'll try to apply ReduceFunction from the new design to see if it's easier 
to implement.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to