[
https://issues.apache.org/jira/browse/TEZ-3865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252182#comment-16252182
]
Gopal V edited comment on TEZ-3865 at 11/14/17 9:14 PM:
--------------------------------------------------------
[~rohini]: is there some overlap between this idea and TEZ-3209 ?
was (Author: gopalv):
[~rohini]: is there some overlap between this idea and TEZ-3269 ?
> A new vertex manager to partition data for STORE
> ------------------------------------------------
>
> Key: TEZ-3865
> URL: https://issues.apache.org/jira/browse/TEZ-3865
> Project: Apache Tez
> Issue Type: New Feature
> Reporter: Rohini Palaniswamy
>
> Restricting number of files in output is a very common use case. In Pig,
> currently users add a ORDER BY, GROUP BY or DISTINCT with the required
> parallelism before STORE to achieve it. All of the above operations create
> unnecessary overhead in processing. It would be ideal if STORE clause
> supported the PARALLEL statement and the partitioning of data was handled in
> a more simple and efficient manner.
> Partitioning of the data can be achieved using a very efficient vertex
> manager as described below. Going to call it PartitionVertexManager (PVM) for
> now till someone proposes a better name. Will be explaining using Pig
> examples, but the logic is same for hive as well.
> There are multiple cases to consider when storing
> 1) No partitions
> - Data is stored into a single directory using FileOutputFormat
> implementations
> 2) Partitions
> - Data is stored into multiple partitions. Case of static or dynamic
> partitioning with HCat
> 3) HBase
> I have kind of forgotten what exactly my thoughts were on this when
> storing to multiple regions. Will update once I remember.
> Let us consider below script with pig.exec.bytes.per.reducer (this setting is
> usually translated to tez.shuffle-vertex-manager.desired-task-input-size with
> ShuffleVertexManager) set to 1G.
> {code}
> A = LOAD 'data' ....;
> B = GROUP A BY $0 PARALLEL 1000;
> C = FOREACH B GENERATE group, COUNT(A.a), SUM(A.b), ..;
> D = STORE C into 'output' using SomeStoreFunc() PARALLEL 20;
> {code}
> The implementation will have 3 vertices.
> v1 - LOAD vertex
> v2 - GROUP BY vertex
> v3 - STORE vertex
> PVM will be used on v3. It is going to be similar to ShuffleVertexManager but
> with some differences. The main difference is that the source vertex does not
> care about the parallelism of destination vertex and the number of
> partitioned outputs it produces does not depend on that.
> 1) Case of no partitions
> Each task in vertex v2 will produce a single partition output (no
> Partitioner is required). The PVM will bucket this single partition data from
> 1000 source tasks into multiple destination tasks of v3 trying to keep 1G per
> task but max of 20 tasks (auto parallelism).
>
> 2) Partitions
> Let us say the table has 2 partition keys (dt and region). Since there
> could be any number of regions for a given date, we will use store
> parallelism as the upper limit on the number of partitions. i.e a
> HashPartitioner with numReduceTasks as 20 and (dt, region) as the partition
> key. If there are only 5 regions then each task of v2 will produce 5
> partitions (with rest 15 being empty) if there is no hash collision. If there
> are 30 regions, then each task of v2 will produce 20 partitions.
>
> The PVM when it groups will try to group all Partition0 segments as much
> as possible into one v3 task. Based on skew it could end up in more tasks.
> i.e there is no restriction on one partition going to same reducer task.
> Doing this will avoid having to open multiple ORC files in one task when
> doing dynamic partitioning and will be very efficient reducing namespace
> usage even further while keeping file sizes more uniform.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)