HuangXingBo commented on a change in pull request #15028:
URL: https://github.com/apache/flink/pull/15028#discussion_r583503604
##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -810,6 +847,15 @@ def get_reducing_state(self, name, coder, reduce_function):
self._all_states[name] = reducing_state
return reducing_state
+ def get_aggregating_state(self, name, coder, agg_function):
Review comment:
The most logic of `get_aggregating_state` , `get_value_state`,
`get_reducing_state` and `get_list_state` are the same, maybe we can extract a
method to reduce code duplication.
##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -847,6 +893,15 @@ def validate_reducing_state(self, name, coder):
if state._internal_state._value_coder != coder:
raise Exception("State name corrupted: %s" % name)
+ def validate_aggregating_state(self, name, coder):
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]