Rohini Palaniswamy commented on PIG-5041:

Currently UnorderedPartitionedKVOutput+UnorderedKVInput+RoundRobinPartitioner 
only comes into play only when union optimizer is turned off or union optimizer 
could not be applied due to some condition.   We hit the issue with a script 
that had two levels of union followed by replicate join which does not have 
union optimization with vertex groups due to  PIG-3856.

a = load 'file:///tmp/input' as (x:int, y:chararray);
b = load 'file:///tmp/input' as (y:chararray, x:int);
c = union onschema a, b;" +
d = load 'file:///tmp/input1' as (x:int, z:chararray);
e = join c by x, d by x using 'replicated';
f = load 'file:///tmp/input' as (y:chararray, x:int);
g = union onschema e, f;" +
h = join g by y, d by y using 'replicated';
i = group h by x;
store i into 'file:///tmp/pigoutput';

Vertex processing c,e had unorderedinput and produced unorderedoutput with 
RoundRobinPartitioner. Rerun of one of the tasks in this vertex due to shuffle 
fetch failure caused incorrect results to be sent to vertex processing g,h. 
This is because first run processed in the order of a,b and second run 
processed in the order of b,a based on whichever shuffle file was fetched 
first. Map output of vertices processing a and b could also run into the 
non-deterministic partitioning issue if they had any operation that changed 
order of records but in this case since they read records from input file and 
output in same order without any intermediate processing those will not have 
issue on rerun even with RoundRobinPartitioner.  

> RoundRobinPartitioner is not deterministic when order of input records change
> -----------------------------------------------------------------------------
>                 Key: PIG-5041
>                 URL: https://issues.apache.org/jira/browse/PIG-5041
>             Project: Pig
>          Issue Type: Bug
>            Reporter: Rohini Palaniswamy
>            Assignee: Rohini Palaniswamy
>            Priority: Critical
>             Fix For: 0.16.1
> Maps can be rerun due to shuffle fetch failures. Half of the reducers can end 
> up successfully pulling partitions from first run of the map while other half 
> could pull from the rerun after shuffle fetch failures. If the data is not 
> partitioned by the Partitioner exactly the same way every time then it could 
> lead to incorrect results (loss of records and duplicated records). 
> There is a good probability of order of input records changing
>     - With OrderedGroupedMergedKVInput (shuffle input), they keys are sorted 
> but values can be in any order as the shuffle and merge depends on the order 
> in which inputs are fetched. Anything involving FLATTEN can produce different 
> order of output records.
>     - With UnorderedKVInput, the records could be in any order depending on 
> order of shuffle fetch. 
> RoundRobinPartitioner can partition records differently everytime as order of 
> input records change which is very bad. We need to get rid of 
> RoundRobinPartitioner. Since the key is empty whenever we use  
> RoundRobinPartitioner we need to partitioning based on hashcode of values to 
> produce consistent partitioning. It adds a lot of performance overhead, but 
> required for correctness.

This message was sent by Atlassian JIRA

Reply via email to