Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1255#issuecomment-165453966
I executed the Flink WordCount example on 4 nodes with 8 parallel tasks and
roughly 17GB of input data once with hash partitioning and once with range
partitioning. Both times no combiner was used.
First of all, both programs compute the same result and the result of the
range partitioned WordCount is ordered. So the implementation is correct.
The hash partitioned WC took 8:00 mins and the range partitioned 13:17
mins.
The breakdown of the range partitioned WC looks as follows:
1. Source+FlatMap: 3:01 mins
2. LocalSample: 3:01 mins
3. GlobalSample: 0:15 mins
4. Histogram: 24 ms
5. PreparePartition: 8:49 mins
6. Partition: 8:48 mins
7. GroupReduce: 10:14 mins
8. Sink: 1:09 mins
The breakdown of the hash partitioned WC is:
1. Source + FlatMap: 6:26 mins
2. Partition: 6:25 mins
3. GroupReduce: 7:58 mins
4. Sink: 1:21 mins
So, the overhead of the range partitioned WC comes from additional IO of
reading the flatMapped words and the additional 4-byte integer. Also the
sorting of the group reduce does not happen concurrently with the source IO.
Reducing (w/o sort) and sink take about the same amount of time.
I also check the distribution of input and output records / bytes for the
GroupReduce.
| | min records-in | min bytes-in | max records-in | max bytes-in |
| --- | --- | --- | --- | --- |
| Hash | 197M | 1.82GB | 346M | 2.90GB |
| Range | 177M | 1.41GB | 352M | 2.90GB |
| | min records-out | min bytes-out | max records-out | max bytes-out |
| --- | --- | --- | --- | --- |
Hash | 2.5M | 26.5MB | 2.5M | 26.5MB
Range | 2.3K | 28.2KB | 14M | 154MB
We see that the range partitioner does not perform better (in fact a bit
worse) than the hash partitioner (the differences for output records are
expected). Maybe increasing the sample size helps? The overhead of reading the
the intermediate data set from disk is so high, that a more fine-grained
histogram can be justified, IMO. How about increasing the sample size from
`parallelism * 20` to `parallelism * 1000`?
Other thoughts?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---