Yes, I think that bug is what I want.  Thank you.

So I guess the current reason is that we don't want to buffer up numMapper
incoming streams. So we just iterate through each and transfer it over in
full because that is more network efficient?

I'm not sure I understand why you wouldn't want to sort on the composite
(parition_id, hash).  I think using the partitionKeyComparator should be
ok, because the other case of merging with spilled files uses it and that
works out ok.

The aggregation I am doing basically has as many output rows as input rows
so I am seeing a lot of memory pressure on the reduce side but it doesn't
have the same ability to spill like map does.

-jc



On Tue, Jun 2, 2015 at 2:08 PM, Josh Rosen <rosenvi...@gmail.com> wrote:

> The relevant JIRA that springs to mind is
> https://issues.apache.org/jira/browse/SPARK-2926
>
> If an aggregator and ordering are both defined, then the map side of
> sort-based shuffle will sort based on the key ordering so that map-side
> spills can be efficiently merged.  We do not currently do a sort-based
> merge on the reduce side; implementing this is a little tricky because it
> will require more map partitions' output to be buffered on the reduce
> side.  I think that SPARK-2926 has some proposals of how to deal with this,
> including hierarchical merging of reduce outputs.
>
> RE: ExternalSorter#partitionedIterator, I don't think it's safe to do 
> !ordering.isDefined
> && !aggregator.isDefined.  If an aggregator is defined but we don't have
> an ordering, then I don't think it makes sense to sort the keys based on
> their hashcodes or some default ordering, since hashcode collisions would
> lead to incorrect results for sort-based aggregation.
>
> On Tue, Jun 2, 2015 at 1:50 PM, John Carrino <john.carr...@gmail.com>
> wrote:
>
>> One thing I have noticed with ExternalSorter is that if an ordering is
>> not defined, it does the sort using only the partition_id, instead of
>> (parition_id, hash).  This means that on the reduce side you need to pull
>> the entire dataset into memory before you can begin iterating over the
>> results.
>>
>> I figure since we are doing a sort of the data anyway it doesn't seem
>> more expensive to sort by (parition, hash).  That way the reducer can do a
>> merge and only has the hold in memory the data for a single int hashCode
>> before it can combine then and start returning results form the iterator.
>>
>> Has this already been discussed?  If so, can someone point me in the
>> right direction to find out more?
>>
>> Thanks for any help!
>> -jc
>>
>> p.s. I am using spark version 1.3.1.  The code I am looking at below is
>> from ExternalSorter#partitionedIterator.  I think maybe
>> !ordering.isDefined should also include "&& !aggregator.isDefined"
>>
>>    if (spills.isEmpty && partitionWriters == null) {
>>       // Special case: if we have only in-memory data, we don't need to
>> merge streams, and perhaps
>>       // we don't even need to sort by anything other than partition ID
>>       if (!ordering.isDefined) {
>>         // The user hasn't requested sorted keys, so only sort by
>> partition ID, not key
>>
>> groupByPartition(collection.destructiveSortedIterator(partitionComparator))
>>       } else {
>>         // We do need to sort by both partition ID and key
>>
>> groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
>>       }
>>
>
>

Reply via email to