robertwb commented on code in PR #28310:
URL: https://github.com/apache/beam/pull/28310#discussion_r1318882103
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -225,19 +287,7 @@ def create_ptransform(self, spec, input_pcolls):
providers_by_input[pcoll] for pcoll in input_pcolls
if pcoll in providers_by_input
]
Review Comment:
I prefer to keep the signature of best_provider in terms of providers
themselves.
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
def compute_outputs(self, transform_id):
return expand_transform(self._transforms_by_uuid[transform_id], self)
+ def best_provider(self, t, input_providers):
+ if isinstance(t, dict):
+ spec = t
+ else:
+ spec = self._transforms_by_uuid[self.get_transform_id(t)]
+ possible_providers = [
+ p for p in self.providers[spec['type']] if p.available()
+ ]
+ if not possible_providers:
+ raise ValueError(
+ 'No available provider for type %r at %s' %
+ (spec['type'], identify_object(spec)))
+
+ # Only one choice, no need to rank further.
+ if len(possible_providers) == 1:
+ return possible_providers[0]
+
+ def best_matches(
+ possible_providers: Iterable[yaml_provider.Provider],
+ adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+ ) -> Iterable[yaml_provider.Provider]:
+ providers_by_score = collections.defaultdict(list)
+ for p in possible_providers:
+ providers_by_score[sum(
+ max(p.affinity(ap) for ap in apo)
+ for apo in adjacent_provider_options)].append(p)
+ return providers_by_score[max(providers_by_score.keys())]
+
+ # If there are any inputs, prefer to match them.
+ if input_providers:
+ possible_providers = best_matches(
+ possible_providers, [[p] for p in input_providers])
+
+ # Try to match downstream operations, continuing until there is no tie.
+ adjacent_transforms = [spec['__uuid__']]
+ while len(possible_providers) > 1:
+ # Go downstream one step. (This is why we start with spec itself.)
+ adjacent_transforms = sum(
+ [list(self.followers(t)) for t in adjacent_transforms], [])
Review Comment:
adjacent_transforms may have any number of transforms in it. I've
restructured the logic to be a bit more verbose, but hopefully clearer.
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
def compute_outputs(self, transform_id):
return expand_transform(self._transforms_by_uuid[transform_id], self)
+ def best_provider(self, t, input_providers):
+ if isinstance(t, dict):
+ spec = t
+ else:
+ spec = self._transforms_by_uuid[self.get_transform_id(t)]
+ possible_providers = [
+ p for p in self.providers[spec['type']] if p.available()
+ ]
+ if not possible_providers:
+ raise ValueError(
+ 'No available provider for type %r at %s' %
+ (spec['type'], identify_object(spec)))
+
+ # Only one choice, no need to rank further.
+ if len(possible_providers) == 1:
+ return possible_providers[0]
+
+ def best_matches(
+ possible_providers: Iterable[yaml_provider.Provider],
+ adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
+ ) -> Iterable[yaml_provider.Provider]:
+ providers_by_score = collections.defaultdict(list)
+ for p in possible_providers:
+ providers_by_score[sum(
+ max(p.affinity(ap) for ap in apo)
+ for apo in adjacent_provider_options)].append(p)
+ return providers_by_score[max(providers_by_score.keys())]
Review Comment:
No, as possible_providers is never empty, so we always put something there.
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
def compute_outputs(self, transform_id):
return expand_transform(self._transforms_by_uuid[transform_id], self)
+ def best_provider(self, t, input_providers):
+ if isinstance(t, dict):
+ spec = t
+ else:
+ spec = self._transforms_by_uuid[self.get_transform_id(t)]
+ possible_providers = [
+ p for p in self.providers[spec['type']] if p.available()
+ ]
+ if not possible_providers:
+ raise ValueError(
+ 'No available provider for type %r at %s' %
+ (spec['type'], identify_object(spec)))
+
+ # Only one choice, no need to rank further.
+ if len(possible_providers) == 1:
+ return possible_providers[0]
+
+ def best_matches(
+ possible_providers: Iterable[yaml_provider.Provider],
+ adjacent_provider_options: Iterable[Iterable[yaml_provider.Provider]]
Review Comment:
This is the dimensionality of the problem: we may have many downstream
operations each of which may be satisfied by many providers.
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -208,6 +220,56 @@ def get_outputs(self, transform_name):
def compute_outputs(self, transform_id):
return expand_transform(self._transforms_by_uuid[transform_id], self)
+ def best_provider(self, t, input_providers):
Review Comment:
I refactored it a bit and added some more comments, but I agree it's not the
easiest to follow. Does this at least help?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]