bzablocki commented on code in PR #28310:
URL: https://github.com/apache/beam/pull/28310#discussion_r1319812013
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -225,19 +287,7 @@ def create_ptransform(self, spec, input_pcolls):
providers_by_input[pcoll] for pcoll in input_pcolls
if pcoll in providers_by_input
]
Review Comment:
ok
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
def compute_outputs(self, transform_id):
return expand_transform(self._transforms_by_uuid[transform_id], self)
+ def best_provider(self, t, input_providers):
+ if isinstance(t, dict):
+ spec = t
+ else:
+ spec = self._transforms_by_uuid[self.get_transform_id(t)]
+ possible_providers = [
+ p for p in self.providers[spec['type']] if p.available()
+ ]
+ if not possible_providers:
+ raise ValueError(
+ 'No available provider for type %r at %s' %
+ (spec['type'], identify_object(spec)))
+
+ # Only one choice, no need to rank further.
+ if len(possible_providers) == 1:
+ return possible_providers[0]
+
+ def best_matches(
+ possible_providers: Iterable[yaml_provider.Provider],
+ adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+ ) -> Iterable[yaml_provider.Provider]:
+ providers_by_score = collections.defaultdict(list)
+ for p in possible_providers:
+ providers_by_score[sum(
+ max(p.affinity(ap) for ap in apo)
+ for apo in adjacent_provider_options)].append(p)
+ return providers_by_score[max(providers_by_score.keys())]
Review Comment:
Got it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]