bzablocki commented on code in PR #28310:
URL: https://github.com/apache/beam/pull/28310#discussion_r1317401680


##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
   def compute_outputs(self, transform_id):
     return expand_transform(self._transforms_by_uuid[transform_id], self)
 
+  def best_provider(self, t, input_providers):
+    if isinstance(t, dict):
+      spec = t
+    else:
+      spec = self._transforms_by_uuid[self.get_transform_id(t)]
+    possible_providers = [
+        p for p in self.providers[spec['type']] if p.available()
+    ]
+    if not possible_providers:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    # Only one choice, no need to rank further.
+    if len(possible_providers) == 1:
+      return possible_providers[0]
+
+    def best_matches(
+        possible_providers: Iterable[yaml_provider.Provider],
+        adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+    ) -> Iterable[yaml_provider.Provider]:
+      providers_by_score = collections.defaultdict(list)
+      for p in possible_providers:
+        providers_by_score[sum(
+            max(p.affinity(ap) for ap in apo)
+            for apo in adjacent_provider_options)].append(p)
+      return providers_by_score[max(providers_by_score.keys())]

Review Comment:
   Can `providers_by_score` be an empty array and therefore throw an exception 
here?



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
   def compute_outputs(self, transform_id):
     return expand_transform(self._transforms_by_uuid[transform_id], self)
 
+  def best_provider(self, t, input_providers):
+    if isinstance(t, dict):
+      spec = t
+    else:
+      spec = self._transforms_by_uuid[self.get_transform_id(t)]
+    possible_providers = [
+        p for p in self.providers[spec['type']] if p.available()
+    ]
+    if not possible_providers:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    # Only one choice, no need to rank further.
+    if len(possible_providers) == 1:
+      return possible_providers[0]
+
+    def best_matches(
+        possible_providers: Iterable[yaml_provider.Provider],
+        adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]

Review Comment:
   Can we lower the number of dimensions somehow and not have nested lists?



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -225,19 +287,7 @@ def create_ptransform(self, spec, input_pcolls):
         providers_by_input[pcoll] for pcoll in input_pcolls
         if pcoll in providers_by_input
     ]

Review Comment:
   Perhaps the code in lines 283-289
   ```
    # TODO(yaml): Perhaps we can do better than a greedy choice here.
       # TODO(yaml): Figure out why this is needed.
       providers_by_input = {k: v for k, v in self.input_providers.items()}
       input_providers = [
           providers_by_input[pcoll] for pcoll in input_pcolls
           if pcoll in providers_by_input
       ]
   ```
   could be moved to the `best_provider()` function? We don't need 
`input_providers` and `providers_by_input` in the scope of this function.



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
   def compute_outputs(self, transform_id):
     return expand_transform(self._transforms_by_uuid[transform_id], self)
 
+  def best_provider(self, t, input_providers):

Review Comment:
   This function is pretty complicated and uses a lot of nested lists/dicts. 
It's hard to keep track of what is happening without types and defined 
structures. Would you agree with this? Can we perhaps plan some work to 
refactor it a little bit?



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
   def compute_outputs(self, transform_id):
     return expand_transform(self._transforms_by_uuid[transform_id], self)
 
+  def best_provider(self, t, input_providers):
+    if isinstance(t, dict):
+      spec = t
+    else:
+      spec = self._transforms_by_uuid[self.get_transform_id(t)]
+    possible_providers = [
+        p for p in self.providers[spec['type']] if p.available()
+    ]
+    if not possible_providers:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    # Only one choice, no need to rank further.
+    if len(possible_providers) == 1:
+      return possible_providers[0]
+
+    def best_matches(
+        possible_providers: Iterable[yaml_provider.Provider],
+        adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+    ) -> Iterable[yaml_provider.Provider]:
+      providers_by_score = collections.defaultdict(list)
+      for p in possible_providers:
+        providers_by_score[sum(
+            max(p.affinity(ap) for ap in apo)
+            for apo in adjacent_provider_options)].append(p)
+      return providers_by_score[max(providers_by_score.keys())]
+
+    # If there are any inputs, prefer to match them.
+    if input_providers:
+      possible_providers = best_matches(
+          possible_providers, [[p] for p in input_providers])
+
+    # Try to match downstream operations, continuing until there is no tie.
+    adjacent_transforms = [spec['__uuid__']]
+    while len(possible_providers) > 1:
+      # Go downstream one step. (This is why we start with spec itself.)
+      adjacent_transforms = sum(
+          [list(self.followers(t)) for t in adjacent_transforms], [])

Review Comment:
   `adjacent_transforms` is always a list of length 1, why do we have to keep 
it as a list and iterate through it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to