TheNeuralBit commented on a change in pull request #16101:
URL: https://github.com/apache/beam/pull/16101#discussion_r769036114
##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -345,29 +357,47 @@ def is_scalar(expr):
@_memoize
def expr_to_stages(expr):
- assert expr not in inputs
+ if expr in inputs:
+ # Don't create a stage for each input, but it is still useful to record
+ # what which stages inputs are available from.
+ return []
+
# First attempt to compute this expression as part of an existing stage,
# if possible.
- #
- # If expr does not require partitioning, just grab any stage, else grab
- # the first stage where all of expr's inputs are partitioned as required.
- # In either case, use the first such stage because earlier stages are
- # closer to the inputs (have fewer intermediate stages).
- required_partitioning = expr.requires_partition_by()
- for stage in common_stages([expr_to_stages(arg) for arg in expr.args()
- if arg not in inputs]):
- if is_computable_in_stage(expr, stage):
- break
+ if all(arg in inputs for arg in expr.args()):
+ # All input arguments; try to pick a stage that already has as many
+ # of the inputs, correctly partitioned, as possible.
+ inputs_by_stage = collections.defaultdict(int)
+ for arg in expr.args():
+ for stage in expr_to_stages(arg):
+ if is_computable_in_stage(expr, stage):
+ inputs_by_stage[stage] += 1 + 100 * (
+ expr.requires_partition_by() == stage.partitioning)
+ if inputs_by_stage:
+ stage = sorted(inputs_by_stage.items(), key=lambda kv: kv[1])[-1][0]
Review comment:
`max` does accept a key:
https://docs.python.org/3.6/library/functions.html#max
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]