TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502557533



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | 
beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       The reason I mentioned the outputs of this stage is that it looks like 
the objective is for the output partition sizes to be equal to 
`TARGET_PARTITION_SIZE` iff the size of each output is equal to the sum of the 
sizes of the inputs. It looks like you do have a comment to that effect in 
`estimate_sizes`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to