bzablocki commented on code in PR #28310:
URL: https://github.com/apache/beam/pull/28310#discussion_r1319892050


##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
   def compute_outputs(self, transform_id):
     return expand_transform(self._transforms_by_uuid[transform_id], self)
 
+  def best_provider(self, t, input_providers):
+    if isinstance(t, dict):
+      spec = t
+    else:
+      spec = self._transforms_by_uuid[self.get_transform_id(t)]
+    possible_providers = [
+        p for p in self.providers[spec['type']] if p.available()
+    ]
+    if not possible_providers:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    # Only one choice, no need to rank further.
+    if len(possible_providers) == 1:
+      return possible_providers[0]
+
+    def best_matches(
+        possible_providers: Iterable[yaml_provider.Provider],
+        adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+    ) -> Iterable[yaml_provider.Provider]:
+      providers_by_score = collections.defaultdict(list)
+      for p in possible_providers:
+        providers_by_score[sum(
+            max(p.affinity(ap) for ap in apo)
+            for apo in adjacent_provider_options)].append(p)
+      return providers_by_score[max(providers_by_score.keys())]
+
+    # If there are any inputs, prefer to match them.
+    if input_providers:
+      possible_providers = best_matches(
+          possible_providers, [[p] for p in input_providers])
+
+    # Try to match downstream operations, continuing until there is no tie.
+    adjacent_transforms = [spec['__uuid__']]
+    while len(possible_providers) > 1:
+      # Go downstream one step. (This is why we start with spec itself.)
+      adjacent_transforms = sum(
+          [list(self.followers(t)) for t in adjacent_transforms], [])

Review Comment:
   Got it,  it is more readable now, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to