Koji Noguchi commented on PIG-5041:

Rohini, you seem to have one debug statement in the patch.

As for using hashcode, I'm not sure if it'll work for Tuple with a bag.
Looking at DefaultAbstractBag, I see that hashcode is taken without sorting.
If same logic of non-determistic order of bag applies here, I'm afraid hashcode 
may differ by retries.

Outside of this jira, I'm worried with the implementation of DefaultAbstractBag 
where taking hashcode does NOT sort but compareTo() does.

> RoundRobinPartitioner is not deterministic when order of input records change
> -----------------------------------------------------------------------------
>                 Key: PIG-5041
>                 URL: https://issues.apache.org/jira/browse/PIG-5041
>             Project: Pig
>          Issue Type: Bug
>            Reporter: Rohini Palaniswamy
>            Assignee: Rohini Palaniswamy
>            Priority: Critical
>             Fix For: 0.16.1
>         Attachments: PIG-5041-1.patch
> Maps can be rerun due to shuffle fetch failures. Half of the reducers can end 
> up successfully pulling partitions from first run of the map while other half 
> could pull from the rerun after shuffle fetch failures. If the data is not 
> partitioned by the Partitioner exactly the same way every time then it could 
> lead to incorrect results (loss of records and duplicated records). 
> There is a good probability of order of input records changing
>     - With OrderedGroupedMergedKVInput (shuffle input), they keys are sorted 
> but values can be in any order as the shuffle and merge depends on the order 
> in which inputs are fetched. Anything involving FLATTEN can produce different 
> order of output records.
>     - With UnorderedKVInput, the records could be in any order depending on 
> order of shuffle fetch. 
> RoundRobinPartitioner can partition records differently everytime as order of 
> input records change which is very bad. We need to get rid of 
> RoundRobinPartitioner. Since the key is empty whenever we use  
> RoundRobinPartitioner we need to partitioning based on hashcode of values to 
> produce consistent partitioning.
> Partitioning based on hashcode is required for correctness, but disadvantage 
> is that it
>     - adds a lot of performance overhead with hashcode computation
>     - with the random distribution due to hashcode (as opposed to batched 
> round robin) input records sorted on some column could get distributed to 
> different reducers and if union is followed by a store, the output can have 
> bad compression.

This message was sent by Atlassian JIRA

Reply via email to