[ 
https://issues.apache.org/jira/browse/PIG-1875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13001192#comment-13001192
 ] 

Alan Gates commented on PIG-1875:
---------------------------------

Thoughts so far on a possible implementation:
As mentioned above, the tuple would stay in serialized form when read as part 
of the reduce iterator.  It would later deserialize itself the first time one 
of its methods was invoked.  If the serialized tuple size was of a sufficient 
size, it could even be compressed before being serialized by the map.

This can be done using a new implementation of tuple that wraps an existing 
implementation.  If we call the new tuple MToRTuple and assume that the 
existing one is DefaultTuple, then MToRTuple.write() would call 
DefaultTuple.write(), passing it an input stream connected to a bytearray.  It 
would then write itself to disk.  MToRTuple.readFields would just read the data 
from disk into a bytearray.  It would not decompress it.  Then the first time a 
method was called to access the tuple, it would call DefaultTuple.readFields() 
on the bytearray to deserialize the data.  From that point on it would pass 
calls through to the underlying default tuple.

There are a few open questions to be answered here:  

# Will raw comparators work ok with compressed data?  We may have to not 
compress the keys, but that's probably ok.  In my experiments using gzip the 
break even point for compression was 150 bytes, and it didn't get useful until 
200 bytes or so.  I'm guessing key lengths rarely exceed 200 bytes.
# What is the CPU overhead of compressing data?  Will we be slowing down jobs 
that never need to spill?
# Is there any significant advantages to this in cases where the combiner or 
Accumulator are already use?  Would we want to only turn it on if neither of 
these apply?

To really make use of a feature like this, we would need to integrate it with a 
couple of other changes:

# One place I see this being a huge win is for joins since we have to keep n - 
1 streams from the join in memory.  If we could change join to only deserialize 
the join key and keep the value serialized/compressed we may be able to 
significantly reduce the cases where join spills
# To really make use of this we need to throw away the deserialized tuple as 
soon as we're done with it.  We could do this by forcing the tuple to 
deserialize every time (way too slow), or by creating a destructive iterator 
where the element is thrown away once it's read.  This would work for the left 
most stream in a join (though not any other streams in an n-way join with n > 
2).  If we wanted to use this for aggregate UDFs we could also allow them to 
tell us whether they could use a destructive iterator.


> Keep tuples serialized to limit spilling and speed it when it happens
> ---------------------------------------------------------------------
>
>                 Key: PIG-1875
>                 URL: https://issues.apache.org/jira/browse/PIG-1875
>             Project: Pig
>          Issue Type: Improvement
>          Components: impl
>            Reporter: Alan Gates
>            Priority: Minor
>
> Currently Pig reads records off of the reduce iterator and immediately 
> deserializes them into Java objects.  This takes up much more memory than 
> serialized versions, thus Pig spills sooner then if it stored them in 
> serialized form.  Also, if it does have to spill, it has to serialize them 
> again, and then again deserialize them after reading from the spill file.
> We should explore storing them in memory serialized when they are read off of 
> the reduce iterator.

-- 
This message is automatically generated by JIRA.
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to