[
https://issues.apache.org/jira/browse/BEAM-10988?focusedWorklogId=498656&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498656
]
ASF GitHub Bot logged work on BEAM-10988:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Oct/20 16:54
Start Date: 09/Oct/20 16:54
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502557533
##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
partitioned_pcoll = next(pcolls.values()).pipeline |
beam.Create([{}])
elif self.stage.partitioning != partitionings.Nothing():
+ # Partitioning required for these operations.
+ # Compute the number of partitions to use based on estimated size.
+ if self.stage.partitioning == partitionings.Singleton():
+ # Always a single partition, don't waste time computing sizes.
+ num_partitions = 1
+ else:
+ # Estimate the sizes from the outputs of a *previous* stage such
+ # that using these estimates will not cause a fusion break.
+ input_sizes = [
+ estimate_size(input, same_stage_ok=False)
+ for input in tabular_inputs
+ ]
+ if None in input_sizes:
+ # We were unable to (cheaply) compute the size of one or more
+ # inputs.
+ num_partitions = DEFAULT_PARTITIONS
+ else:
+ num_partitions = beam.pvalue.AsSingleton(
+ input_sizes
+ | 'FlattenSizes' >> beam.Flatten()
+ | 'SumSizes' >> beam.CombineGlobally(sum)
Review comment:
The reason I mentioned the outputs of this stage is that it looks like
the objective is for the output partition sizes to be equal to
`TARGET_PARTITION_SIZE` iff the size of each output is equal to the sum of the
sizes of the inputs. It looks like you do have a comment to that effect in
`estimate_sizes`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 498656)
Time Spent: 3h 10m (was: 3h)
> Intelligently partition dataframes
> ----------------------------------
>
> Key: BEAM-10988
> URL: https://issues.apache.org/jira/browse/BEAM-10988
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Robert Bradshaw
> Assignee: Robert Bradshaw
> Priority: P2
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Currently we hard code exactly 10.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)