ibzib commented on a change in pull request #15351:
URL: https://github.com/apache/beam/pull/15351#discussion_r692391409



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -277,6 +277,9 @@ def __init__(self,
     self.setup_done = False
     self.step_name = None  # type: Optional[str]
 
+  def actuate_pushdown(self, fields):

Review comment:
       Do you know what the equivalent class is in Java?

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, 
consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:
+      operation.actuate_pushdown(

Review comment:
       Does this assume the transform is a ParDo?

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -197,6 +198,38 @@ def run_pipeline(self,
     return self._latest_run_result
 
   def run_via_runner_api(self, pipeline_proto):
+
+    def pushdown_projections(pipeline_proto):
+      leaf_consumers = collections.defaultdict(list)
+      for transform in pipeline_proto.components.transforms.values():
+        for pc in transform.inputs.values():
+          if not transform.subtransforms:
+            leaf_consumers[pc].append(transform)
+
+      for transform in pipeline_proto.components.transforms.values():
+        if transform.subtransforms or not transform.outputs:
+          continue
+        if not common_urns.support_pushdown_annotation in 
transform.annotations:
+          continue
+        # The annotations should really be per input and output.

Review comment:
       This is a really good point. And it will require us to be careful with 
composite sources. Let's say we have a composite source T:
   
   ptransform T : pcollection A -> pcoll C
   - pardo T.f : pcoll A -> pcoll B
   - ptransform T.g : pcoll B -> pcoll C
   
   If we request pushdown on pcollection C, but the actual read from source 
happens in T.f, what do we do? T.f doesn't know anything about pcoll C. The 
main problem is that we're saying arbitrary PTransforms support pushdown, but 
really only a single ParDo does.

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, 
consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:

Review comment:
       This approach requires the worker to have access to the runner API proto 
(or at least some derivative of it that also contains the pushdown 
annotations). Do you know if that's the case for Dataflow's legacy Java worker?
   
   cc @apilloud 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to