[ 
https://issues.apache.org/jira/browse/PIG-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohini Palaniswamy updated PIG-3775:
------------------------------------

    Description: 
When implementing Pig union, we need to gather data from two or more upstream 
vertexes without sorting. The vertex itself might consists of several tasks. 
Same can be done for the partitioner vertex in orderby and skewed join instead 
of 1-1 edge for some cases of parallelism.

TEZ-661 has been created to add custom output and input for that in Tez. It is 
currently not in the Tez team priorities but it is important for us as it will 
give good performance gains. We can write the custom input/output and 
contribute it to Tez and make the corresponding changes in Pig. 

This is a candidate project for Google summer of code 2014. More information 
about the program can be found at 
https://cwiki.apache.org/confluence/display/PIG/GSoc2014

  was:
When implementing Pig union, we need to gather data from two or more upstream 
vertexes without sorting. The vertex itself might consists of several tasks. 
Same can be done for the partitioner vertex in orderby and skewed join instead 
of 1-1 edge for some cases of parallelism.

TEZ-661 has been created to add custom output and input for that in Tez. It is 
currently not in the Tez team priorities but it is important for us as it will 
give good performance gains. We can write the custom input/output and 
contribute it to Tez and make the corresponding changes in Pig. Marking this as 
a candidate for GSOC 2014. 

         Labels: GSOC2014  (was: gsoc2014)

To give more background and details and copying information on what I exchanged 
with one of the interested students.

Pig on Tez:
   Tez is a DAG execution framework and is intended as the replacement 
execution engine for Pig instead of the traditional Mapreduce. 
You can read about Tez 
(https://github.com/apache/incubator-tez/blob/master/README.md) and Pig on Tez 
(https://issues.apache.org/jira/browse/PIG-3446) and watch the presentations 
from HUG (Hadoop user group) Feb 2014- 
http://www.youtube.com/results?search_query=pig%20on%20tez%20hug&sm=3. There 
are slides availables as well on Tez 
-http://qconsf.com/system/files/presentation-slides/Apache-Tez-Accelerating-Hadoop-Query-Processing.pdf.

Union:
    The case of union is simple. Consider the case
{code}
A = LOAD a ..;
B = LOAD b ...;
C = UNION A, B PARALLEL 10;
D = GROUP BY ...;
{code}

In Tez loading of A will be in Vertex 1, loading of B in Vertex 2. Input of A 
and B will be sent to Vertex 3 (Group by) as a composite input using Alias 
Vertex (PIG-3743) using a scatter-gather edge. Scatter-Gather edge sorts and 
partitions output(OnFileSortedOutput) on the predecessor vertex , shuffles and 
does merge sort of the input(ShuffledMergedInput) on the successor vertex 
similar to what happens between map tasks and reducer tasks in MR. But since 
union does not require sorting and grouping we can go with 
OnFileUnorderedPartitionedOutput and ShuffledUnorderedKVInput (TEZ-910). 
OnFileUnorderedPartitionedOutput needs to be implemented for this problem and 
contributed back to Tez (TEZ-661) as it will be a generic output that can be 
reused by other projects like Hive. 

Distributed Orderby and Skewed Join:
    These are more complex and somewhat similar in concept. So will just go 
into detail on one of them - Distributed Orderby. In MR there are three jobs to 
do an orderby
Job 1 - Map only job that loads the hdfs data, process any statements like 
filter before order by and store it in hdfs in Pig intermediate file format.
Job 2 (Sampler job) - Map-Reduce job where map tasks load the intermediate data 
from Job1, outputs 100 samples per task and there is a single reducer which 
aggregates the samples and creates a quantile map taking the skew of the data 
into account and stores that in hdfs.
 Job 3 (Orderby job) - Map-Reduce job where map tasks loads the intermediate 
data from Job 1 and the quantiles map from Job 2 (using Distributed Cache) and 
partitions the data using WeightedRangePartitioner which uses the quantiles map 
to do range partitioning. The data produced by the reducer is sorted. It is a 
distributed order by because, iff there are 10 reducers generating part files 
part-r-00000 to part-r-00009, then data is totally ordered when the files are 
read in order because of the range partitioning.
 
In Tez this is currently implemented as 
Vertex 1 - Read hdfs data once, compute both the samples (100 or configurable 
number of samples per task using reservoir sampling) and also the arrange the 
data based on the order by key. The sample is sent to Vertex2 and the data is 
sent to Vertex 3.
Vertex 2 - This has only 1 task where samples from Vertex 1 tasks are 
aggregated and quantile map is constructed.
Vertex 3 - The tasks in Vertex 3 take the data from Vertex 1 using 1-1 edge. 
The WeightedRangePartitioner in Vertex 3 tasks takes the quantile map from 
Vertex 2 through broadcast input. Data is partitioned by the 
WeightedRangePartitioner and the outputs are sent to Vertex 4. This is the 
equivalent of the Map phase in Job 3 of MR
Vertex 4 - This is the equivalent of Reduce phase of Job 3 in MR.

Vertex 1 -> Vertex 2 is Scatter Gather edge.
Vertex 2 -> Vertex 3 is Broadcast edge. 
Vertex 1 -> Vertex 3 is 1-1 edge. 
Vertex 3 -> Vertex 4 is Scatter Gather edge.

Parallelism (no. of tasks):
Vertex 1 - Parallelism would be equal to the number of input splits in hdfs 
data.
Vertex 2 - Parallelism is 1 as we need 1 task to aggregate all samples
Vertex 3 - Same as Vertex 1 as it is a 1-1 edge
Vertex 4 - Parallelism set in Order by using PARALLEL clause or the default 
parallelism set for pig script. 

Initially Vertex 1 -> Vertex 3 was Scatter-Gather edge with 
RoundRobinPartitioner. Vertex 3 had same parallelism as Vertex 4. But the 
performance was worse due to the overhead of shuffle and sort. Also 
RoundRobinPartitioner was also bad for performance as if the data was already 
slightly sorted it would not take advantage of that. So we moved to 1-1 edge as 
it gave way better performance. But we want to move to Scatter-Gather edge with 
unordered non-grouped partitioned output.
Reasons being:
  - The idea of using 1-1 edge is that the tasks of 1-1 edge for Vertex 1 and 
Vertex 3 will be run on the same node or jvm reused and data will be read from 
local disk. But that is not the case now and 1-1 scheduling needs to be fixed 
in Tez (TEZ-800) and currently is not in the priority list of Tez team.
  - Another reason is if input data size is big, Vertex 1 launches a lot of 
tasks because there are lot of input splits (Lets say 10K). If there was a 
filter clause before order by, then vertex 1 will reduce the size of data a 
lot. But because it is 1-1 edge vertex 3 launches same no. of tasks as vertex 1 
(10K) which is bad. Data by now is probably reduced to 1K tasks worth. This is 
a good candidate for application of feature of Automatic Reducer Parallelism 
(ARP) in Tez. This is not a problem with MR as Job 1 does the filter and stores 
the intermediate output to disk. Number of map tasks of Job 2 and Job 3  will 
depend on the input splits of the intermediate input.
  - If there are 10K tasks in Vertex 3 and Vertex 4 parallelism is set to 500, 
each reducer has to merge 10K map outputs from Vertex 3 which is very bad for 
performance.
  
To summarize:
  For Distributed Orderby, we need to do Scatter-Gather edge with unordered 
non-grouped partitioned output from Vertex 1 to Vertex 3 and also determine the 
parallelism of Vertex 3 using "Automatic Reducer Parallelism" in Tez. Also need 
to come up with a good algorithm for the Partitioner that does equal 
distribution of the skewed data to Vertex 3 efficiently. A similar 
implementation needs to be done for Skewed join as well.
  If time permits also need to determine parallelism of Vertex 4 dynamically 
when there is no PARALLEL clause or a default parallelism is set for pig 
script. That is very tricky as it has to be determined during execution of 
Vertex 1, as the sample job needs to know the parallelism of Vertex 4 to 
construct the quantile map.  

 Hoping that by the time, GSOC project is started the committers working on Pig 
on Tez should have the basic framework for ARP in Pig to be usable for this 
case.

> Use unsorted shuffle in Union, Orderby, Skewed Join to improve performance in 
> Tez
> ---------------------------------------------------------------------------------
>
>                 Key: PIG-3775
>                 URL: https://issues.apache.org/jira/browse/PIG-3775
>             Project: Pig
>          Issue Type: Sub-task
>          Components: tez
>            Reporter: Rohini Palaniswamy
>              Labels: GSOC2014
>             Fix For: tez-branch
>
>
> When implementing Pig union, we need to gather data from two or more upstream 
> vertexes without sorting. The vertex itself might consists of several tasks. 
> Same can be done for the partitioner vertex in orderby and skewed join 
> instead of 1-1 edge for some cases of parallelism.
> TEZ-661 has been created to add custom output and input for that in Tez. It 
> is currently not in the Tez team priorities but it is important for us as it 
> will give good performance gains. We can write the custom input/output and 
> contribute it to Tez and make the corresponding changes in Pig. 
> This is a candidate project for Google summer of code 2014. More information 
> about the program can be found at 
> https://cwiki.apache.org/confluence/display/PIG/GSoc2014



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to