dianfu commented on a change in pull request #19126:
URL: https://github.com/apache/flink/pull/19126#discussion_r830706322



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 
1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and :class:`pyflink.common.Row` 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be of int or str type to locate 
the value to sum")
+
+        class SumReduceFunction(ReduceFunction):
+
+            def __init__(self, position_to_sum):
+                self._pos = position_to_sum
+
+            def reduce(self, value1, value2):
+                from numbers import Number
+                if not isinstance(value1[self._pos], Number):
+                    raise TypeError("The value to sum by given position must 
be of numeric type; "
+                                    f"actual {type(value1[self._pos])}, 
expected Number")
+                if isinstance(value1, tuple):
+                    value1_list = list(value1)
+                    value1_list[self._pos] = value1[self._pos] + 
value2[self._pos]
+                    value1 = tuple(value1_list)
+                elif isinstance(value1, Row):
+                    value1[self._pos] = value1[self._pos] + value2[self._pos]
+                else:
+                    raise TypeError("Sum operator only process the data of "
+                                    "Tuple type and {pyflink.common.types.Row} 
type. "

Review comment:
       ```suggestion
                                       "Tuple type and Row type. "
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 
1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and :class:`pyflink.common.Row` 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be of int or str type to locate 
the value to sum")
+
+        class SumReduceFunction(ReduceFunction):
+
+            def __init__(self, position_to_sum):
+                self._pos = position_to_sum
+
+            def reduce(self, value1, value2):
+                from numbers import Number
+                if not isinstance(value1[self._pos], Number):
+                    raise TypeError("The value to sum by given position must 
be of numeric type; "
+                                    f"actual {type(value1[self._pos])}, 
expected Number")
+                if isinstance(value1, tuple):

Review comment:
       Is is possible that the input type is a list?

##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -1072,6 +1072,44 @@ def test_reduce_with_state(self):
         expected = ['+I[a, 0]', '+I[ab, 0]', '+I[c, 1]', '+I[cd, 1]', '+I[cde, 
1]']
         self.assert_equals_sorted(expected, results)
 
+    def test_keyed_sum_with_tuple_type(self):

Review comment:
       What about merge the added test cases into one, e.g. 
sum(xxx).sum(yyy).sum(zzz)? IT case is expensive and we should try to reduce 
them as much as possible.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 
1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and :class:`pyflink.common.Row` 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be of int or str type to locate 
the value to sum")
+
+        class SumReduceFunction(ReduceFunction):
+
+            def __init__(self, position_to_sum):
+                self._pos = position_to_sum
+
+            def reduce(self, value1, value2):
+                from numbers import Number
+                if not isinstance(value1[self._pos], Number):

Review comment:
       I guess you should check value2 which is the input element?

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 
1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and :class:`pyflink.common.Row` 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be of int or str type to locate 
the value to sum")

Review comment:
       ```suggestion
               raise TypeError("The field position must be of int or str type 
to locate the value to sum")
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
         return self.process(FilterKeyedProcessFunctionAdapter(func), 
self._original_data_type_info)\
             .name("Filter")
 
+    def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+        """
+        Applies an aggregation that gives a rolling sum of the data stream at 
the
+        given position grouped by the given key. An independent aggregate is 
kept
+        per key.
+
+        Example(Tuple data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 
5)])
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data to sum):
+        ::
+
+            >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 
1), ('b', 2)],
+            ...                                
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+            >>> ds.key_by(lambda x: x[0]).sum(1)
+
+        Example(Row data with fields name to sum):
+        ::
+
+            >>> ds = env.from_collection(
+            ...     [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+            ...     type_info=Types.ROW_NAMED(["key", "value"], 
[Types.STRING(), Types.INT()])
+            ... )
+            >>> ds.key_by(lambda x: x[0]).sum("value")
+
+        :param position_to_sum:
+            The field position in the data points to sum, type can be int or 
str.
+            This is applicable to Tuple types, and :class:`pyflink.common.Row` 
types.
+        :return: The transformed DataStream.
+        """
+        if not isinstance(position_to_sum, int) and not 
isinstance(position_to_sum, str):
+            raise TypeError("The input must be of int or str type to locate 
the value to sum")
+
+        class SumReduceFunction(ReduceFunction):
+
+            def __init__(self, position_to_sum):
+                self._pos = position_to_sum
+
+            def reduce(self, value1, value2):
+                from numbers import Number
+                if not isinstance(value1[self._pos], Number):
+                    raise TypeError("The value to sum by given position must 
be of numeric type; "
+                                    f"actual {type(value1[self._pos])}, 
expected Number")
+                if isinstance(value1, tuple):
+                    value1_list = list(value1)
+                    value1_list[self._pos] = value1[self._pos] + 
value2[self._pos]
+                    value1 = tuple(value1_list)
+                elif isinstance(value1, Row):
+                    value1[self._pos] = value1[self._pos] + value2[self._pos]
+                else:

Review comment:
       In the Java implementation, it also supports the following cases:
   1) the input type is array
   2) the input type is a basic type and the position is 0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to