dianfu commented on a change in pull request #19126:
URL: https://github.com/apache/flink/pull/19126#discussion_r830716015
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -1174,6 +1174,66 @@ def process_element(self, value, ctx:
'KeyedProcessFunction.Context'):
return self.process(FilterKeyedProcessFunctionAdapter(func),
self._original_data_type_info)\
.name("Filter")
+ def sum(self, position_to_sum: Union[int, str]) -> 'DataStream':
+ """
+ Applies an aggregation that gives a rolling sum of the data stream at
the
+ given position grouped by the given key. An independent aggregate is
kept
+ per key.
+
+ Example(Tuple data to sum):
+ ::
+
+ >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b',
5)])
+ >>> ds.key_by(lambda x: x[0]).sum(1)
+
+ Example(Row data to sum):
+ ::
+
+ >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b',
1), ('b', 2)],
+ ...
type_info=Types.ROW([Types.STRING(), Types.INT()]))
+ >>> ds.key_by(lambda x: x[0]).sum(1)
+
+ Example(Row data with fields name to sum):
+ ::
+
+ >>> ds = env.from_collection(
+ ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)],
+ ... type_info=Types.ROW_NAMED(["key", "value"],
[Types.STRING(), Types.INT()])
+ ... )
+ >>> ds.key_by(lambda x: x[0]).sum("value")
+
+ :param position_to_sum:
+ The field position in the data points to sum, type can be int or
str.
+ This is applicable to Tuple types, and :class:`pyflink.common.Row`
types.
+ :return: The transformed DataStream.
+ """
+ if not isinstance(position_to_sum, int) and not
isinstance(position_to_sum, str):
+ raise TypeError("The input must be of int or str type to locate
the value to sum")
+
+ class SumReduceFunction(ReduceFunction):
+
+ def __init__(self, position_to_sum):
+ self._pos = position_to_sum
+
+ def reduce(self, value1, value2):
Review comment:
What about refactoring it as following?
```
def init_reduce_func(value1):
if isinstance(value1, tuple):
def reduce_func(v1, v2):
v1_list = list(v1)
v1_list[self._pos] = v1[self._pos] +
v2[self._pos]
return tuple(v1_list)
self._reduce_func = reduce_func
elif isinstance(value1, (list, Row)):
def reduce_func(v1, v2):
v1[self._pos] = v1[self._pos] + v2[self._pos]
self._reduce_func = reduce_func
else:
raise TypeError("Sum operator only process the data
of "
"Tuple type and
{pyflink.common.types.Row} type. "
f"Actual type: {type(value1)}")
from numbers import Number
if not isinstance(value1[self._pos], Number):
raise TypeError("The value to sum by given position must
be of numeric type; "
f"actual {type(value1[self._pos])},
expected Number")
if not self._reduce_func:
init_reduce_func(value1)
return self._reduce_func(value1, value2)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]