robertwb commented on a change in pull request #15351:
URL: https://github.com/apache/beam/pull/15351#discussion_r696029511
##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -277,6 +277,9 @@ def __init__(self,
self.setup_done = False
self.step_name = None # type: Optional[str]
+ def actuate_pushdown(self, fields):
Review comment:
I don't off-hand, but it's a stack of DoFnInvokers and ParDoEvaluators
that stuff that should be traceable from the bundle evaluation code. There are
more layers, but I think most of them will just be pass-throughs.
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
creator, parameter_type = self._known_urns[transform_proto.spec.urn]
payload = proto_utils.parse_Bytes(
transform_proto.spec.payload, parameter_type)
- return creator(self, transform_id, transform_proto, payload, consumers)
+ operation = creator(self, transform_id, transform_proto, payload,
consumers)
+ if common_urns.actuate_pushdown_annotation in transform_proto.annotations:
Review comment:
This is not the case for the legacy worker, but we should not be
developing new features on the legacy worker.
(We could probably add this information in the v1beta3 translation if we
wanted to support legacy.)
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -197,6 +198,38 @@ def run_pipeline(self,
return self._latest_run_result
def run_via_runner_api(self, pipeline_proto):
+
+ def pushdown_projections(pipeline_proto):
+ leaf_consumers = collections.defaultdict(list)
+ for transform in pipeline_proto.components.transforms.values():
+ for pc in transform.inputs.values():
+ if not transform.subtransforms:
+ leaf_consumers[pc].append(transform)
+
+ for transform in pipeline_proto.components.transforms.values():
+ if transform.subtransforms or not transform.outputs:
+ continue
+ if not common_urns.support_pushdown_annotation in
transform.annotations:
+ continue
+ # The annotations should really be per input and output.
Review comment:
If T declares that it requests or passes through projections, it should
be sure to have an implementation that supports these thinned-down inputs. I'm
still not sure how to support absorbing pushdowns without forcing it to do so
in its primitives.
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
creator, parameter_type = self._known_urns[transform_proto.spec.urn]
payload = proto_utils.parse_Bytes(
transform_proto.spec.payload, parameter_type)
- return creator(self, transform_id, transform_proto, payload, consumers)
+ operation = creator(self, transform_id, transform_proto, payload,
consumers)
+ if common_urns.actuate_pushdown_annotation in transform_proto.annotations:
+ operation.actuate_pushdown(
Review comment:
This is only called on transforms that previously set the
support_pushdown_annotation bit. (Currently, that's only DoFns, but any
transform could work here.) One restriction is that this is only for primitive,
execution-time transforms.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]