[
https://issues.apache.org/jira/browse/BEAM-3736?focusedWorklogId=506915&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-506915
]
ASF GitHub Bot logged work on BEAM-3736:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Nov/20 14:00
Start Date: 03/Nov/20 14:00
Worklog Time Spent: 10m
Work Description: tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r516397331
##########
File path: sdks/python/apache_beam/transforms/userstate.py
##########
@@ -357,6 +357,10 @@ def prefetch(self):
# The default implementation here does nothing.
pass
+ def finalize(self):
Review comment:
@robertwb do you see any concerns with adding a top level `finalize`
method here?
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
return FlattenInputVisitor()
+ @staticmethod
+ def combinefn_visitor():
+ # Imported here to avoid circular dependencies.
+ from apache_beam.pipeline import PipelineVisitor
+ from apache_beam import core
+
+ class CombineFnVisitor(PipelineVisitor):
+ """Checks if `CombineFn` has non-default setup or teardown methods.
+ If yes, raises `ValueError`.
+ """
+ def visit_transform(self, applied_transform):
+ transform = applied_transform.transform
+ if isinstance(transform, core.ParDo) and isinstance(
+ transform.fn, core.CombineValuesDoFn):
+ if self._overrides_setup_or_teardown(transform.fn.combinefn):
+ raise ValueError(
+ 'CombineFn.setup and CombineFn.teardown are '
+ 'not supported with non-portable Dataflow '
+ 'runner. Please use Dataflow Runner V2 instead.')
+
+ @staticmethod
+ def _overrides_setup_or_teardown(combinefn):
+ return combinefn.__class__.setup is not core.CombineFn.setup or \
Review comment:
nit: prefer to use `()` instead of `\` .
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1975,10 +1990,13 @@ def add_input_types(transform):
return combined
if self.has_defaults:
- combine_fn = (
- self.fn if isinstance(self.fn, CombineFn) else
- CombineFn.from_callable(self.fn))
- default_value = combine_fn.apply([], *self.args, **self.kwargs)
+ combine_fn = copy.deepcopy(self.fn) if isinstance(self.fn, CombineFn) \
Review comment:
nit:
[prefer](https://www.python.org/dev/peps/pep-0008/#:~:text=The%20preferred%20way%20of%20wrapping,a%20backslash%20for%20line%20continuation.)
using () to line continuation token.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 506915)
Time Spent: 9h 20m (was: 9h 10m)
> Add SetUp() and TearDown() for CombineFns
> -----------------------------------------
>
> Key: BEAM-3736
> URL: https://issues.apache.org/jira/browse/BEAM-3736
> Project: Beam
> Issue Type: Improvement
> Components: beam-model, sdk-py-core
> Reporter: Chuan Yu Foo
> Assignee: Kamil Wasilewski
> Priority: P3
> Time Spent: 9h 20m
> Remaining Estimate: 0h
>
> I have a CombineFn that has a large amount of state that needs to be loaded
> once before it can add_input or merge_combiners (for example, the CombineFn
> might load up a large lookup table used for combining).
> Right now, to initialise this state, for each of the methods, I check if the
> state has already been initialised, and if not, I initialise it. It would be
> nice if CombineFn provided a SetUp() method that is called once to initialise
> this state (and a corresponding TearDown() method to clean up this state if
> necessary).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)