[ 
https://issues.apache.org/jira/browse/TEZ-3865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252543#comment-16252543
 ] 

Rohini Palaniswamy commented on TEZ-3865:
-----------------------------------------

Just read through the jira description and design doc. Thanks for catching the 
similiarity [~gopalv].

{code}
The P10 belonging to the first 10 mappers will be processed by one reducer with 
1GB input data. The P10 belonging to the second 10 mappers will be processed by 
another reducer, etc.

● Each task B only processes one partition’s data. We might want to consider 
having one
task B to process several empty or small partitions together later, similar to 
to how
auto­parallelism works. But for now, the goal is to cut down the run time of 
longest
running tasks, less about reducing the total number of tasks.
● Each task B only processes up to a max of input size defined by a config 
similar to
tez.shuffle­vertex­manager.desired­task­input­size. For example, if it is 
defined to be 1G,
and the output size of each of {A1P2, A2P2, A3P2, A4P2} is 0.5G, then it will 
have one
task process {A1P2, A2P2} and another task process {A3P2, A4P2}.
{code}

The idea mentioned to distribute skewed partitions is exactly same as what I 
have described for the case of partitions.  So we can combine this into the 
FairVertexManager.  

But there are additional things required for this jira. Items mentioned below 
will have to be added as enhancements (configurable options) to FVM.
1) Will need the ability for each task to process more than one partition 
similar to auto-parallelism though. It is currently listed as an open issue and 
future work in FVM
2) For case of no partition, probably can just treat it as only P0 has data and 
other partitions are empty. But this will not be efficient with the unordered 
writer as having no partition at all (no spill). So will need to add explicit 
support for the case of no partition. i.e the ability to keep the number of 
output partitions of the source vertex separate from the number of destination 
tasks. This can be used even in case of multiple partitions. For eg: If it is 
known that there will be only 3 partitions, we can configure that instead of 
the destination parallelism of 20 (example script in jira description) which 
will avoid empty partitions.
3) FVM has restriction of range bucketing. i.e output of maps are consumed in 
order in the tasks. This is not required here. Just need best fit based on 
desired input size.

Features not relevant:
1) Do not care about support for MultipleOutputs, as the output format 
implementation of HCatStorer takes care of writing to different ORC files in 
different partition folders in the case of dynamic partitioning. Same with 
MultiStorage.
2) There is no over partitioning in this scenario. So do not care about Volume 
based partitioning. As long as the ability to configure different partitioners 
is there. That is good.


> A new vertex manager to partition data for STORE
> ------------------------------------------------
>
>                 Key: TEZ-3865
>                 URL: https://issues.apache.org/jira/browse/TEZ-3865
>             Project: Apache Tez
>          Issue Type: New Feature
>            Reporter: Rohini Palaniswamy
>
> Restricting number of files in output is a very common use case. In Pig, 
> currently users add a ORDER BY, GROUP BY or DISTINCT with the required 
> parallelism before STORE to achieve it. All of the above operations create 
> unnecessary overhead in processing. It would be ideal if STORE clause 
> supported the PARALLEL statement and the partitioning of data was handled in 
> a more simple and efficient manner.
> Partitioning of the data can be achieved using a very efficient vertex 
> manager as described below. Going to call it PartitionVertexManager (PVM) for 
> now till someone proposes a better name. Will be explaining using Pig 
> examples, but the logic is same for hive as well.
> There are multiple cases to consider when storing
> 1) No partitions
>        - Data is stored into a single directory using FileOutputFormat 
> implementations
> 2) Partitions
>       - Data is stored into multiple partitions. Case of static or dynamic 
> partitioning with HCat
> 3) HBase
>     I have kind of forgotten what exactly my thoughts were on this when 
> storing to multiple regions. Will update once I remember.
> Let us consider below script with pig.exec.bytes.per.reducer (this setting is 
> usually translated to tez.shuffle-vertex-manager.desired-task-input-size with 
> ShuffleVertexManager) set to 1G.
> {code}
> A = LOAD 'data' ....;
> B = GROUP A BY $0 PARALLEL 1000;
> C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..;
> D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20;
> {code}
> The implementation will have 3 vertices.
> v1 - LOAD vertex
> v2 - GROUP BY vertex
> v3 - STORE vertex
> PVM will be used on v3. It is going to be similar to ShuffleVertexManager but 
> with some differences. The main difference is that the source vertex does not 
> care about the parallelism of destination vertex and the number of 
> partitioned outputs it produces does not depend on that.
> 1) Case of no partitions
>    Each task in vertex v2 will produce a single partition output (no 
> Partitioner is required). The PVM will bucket this single partition data from 
> 1000 source tasks into multiple destination tasks of v3 trying to keep 1G per 
> task but max of 20 tasks (auto parallelism).
>    
> 2) Partitions
>    Let us say the table has 2 partition keys (dt and region). Since there 
> could be any number of regions for a given date, we will use store 
> parallelism as the upper limit on the number of partitions. i.e a 
> HashPartitioner with numReduceTasks as 20 and (dt, region) as the partition 
> key. If there are only 5 regions then each task of v2 will produce 5 
> partitions (with rest 15 being empty) if there is no hash collision. If there 
> are 30 regions, then each task of v2 will produce 20 partitions.
>    
>    The PVM when it groups will try to group all Partition0 segments as much 
> as possible into one v3 task. Based on skew it could end up in more tasks. 
> i.e there is no restriction on one partition going to same reducer task. 
> Doing this will avoid having to open multiple ORC files in one task when 
> doing dynamic partitioning and will be very efficient reducing namespace 
> usage even further while keeping file sizes more uniform.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to