kamilwu commented on a change in pull request #13081:
URL: https://github.com/apache/beam/pull/13081#discussion_r503794860
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -888,13 +888,32 @@ class CombineFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
5. The extract_output operation is invoked on the final accumulator to get
the output value.
- Note: If this **CombineFn** is used with a transform that has defaults,
- **apply** will be called with an empty list at expansion time to get the
- default value.
"""
def default_label(self):
return self.__class__.__name__
+ def default_value(self, *args, **kwargs):
+ """Returns a default reduction of an empty input.
+
+ Some combiners require a default value when reducing an empty collection,
+ which may be necessary when combining elements in an empty window.
+
+ If **CombineFn** is used with a transform that requires defaults,
+ default_value may be called during transform expansion.
+
+ Args:
+ *args: Additional arguments and side inputs.
+ **kwargs: Additional arguments and side inputs.
+ """
+ # Defalut values may be evaluated at pipeline construction time.
Review comment:
nit: Default
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -888,13 +888,32 @@ class CombineFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
5. The extract_output operation is invoked on the final accumulator to get
the output value.
- Note: If this **CombineFn** is used with a transform that has defaults,
- **apply** will be called with an empty list at expansion time to get the
- default value.
"""
def default_label(self):
return self.__class__.__name__
+ def default_value(self, *args, **kwargs):
+ """Returns a default reduction of an empty input.
+
+ Some combiners require a default value when reducing an empty collection,
+ which may be necessary when combining elements in an empty window.
+
+ If **CombineFn** is used with a transform that requires defaults,
+ default_value may be called during transform expansion.
+
+ Args:
+ *args: Additional arguments and side inputs.
+ **kwargs: Additional arguments and side inputs.
+ """
+ # Defalut values may be evaluated at pipeline construction time.
+ # Make a copy to avoid passing any side-effects to the serialized pipeline
+ # representaiton.
+ combine_copy = copy.copy(self)
Review comment:
The user can provide a combiner that has nested combiners, e.g.
`TupleCombineFn`, so we should make a deep copy instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]