[ 
https://issues.apache.org/jira/browse/TEZ-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15385449#comment-15385449
 ] 

Ming Ma commented on TEZ-3209:
------------------------------

Thanks [~sseth]!

bq. primarily targeted towards Unordered Data? 
While the fair routing is motivated by unordered data scenario, it might be 
useful for Hive or others. For example, in the case of skew shuffle join(some 
partition of unbucketed lhs table has too much data), it might be useful to 
have multiple reducers each of which join part of lhs table's partition with 
the complete partition of unbucketed rhs table. Feel free to correct me if 
there are other better ways to address this.

bq. At some point, would we want to use a different slow-start / scheduling 
policy in this VertexManager.
Very good point. I did for sake of convenience. 
{{FairShuffleVertexManager#getToBeScheduledTasks}} has already scheduled tasks 
based on the completed source task id, but using global slow start as 
additional constraint might not be appropriate.

bq. Should a different strategy be employed to determine when to trigger 
parallelism determination in this case?
So far, the condition to trigger parallelism determination between 
{{FairShuffleVertexManager}} and {{ShuffleVertexManager}} look the same, e.g. 
after enough source tasks have finished.

bq. My vote would be for separate config parameter names.
ok. Then we can define abstract method like {{getDesiredTaskInputDataSize}} in 
{{ShuffleVertexManagerBase}}.

bq. If the plan is to eventually move to a different set of scheduling 
strategies - I suspect a lot of the code in ShuffleVMBase will go away.
Here are the functionalities provided by {{ShuffleVertexManagerBase}}.

* Parse partition stats in VME.
* Async event handling and interaction between {{VertexManager}} and 
{{VertexManagerPlugin}}. For example, use {{pendingVMEvents}} to queue pending 
VME when the destination vertex hasn't started.
* Handle multiple source vertexes for things like reconfigureVertex.
* Determine when to trigger routing computation.
* Global slow start scheduling policy ( to be removed )

IMO, the main benefit of refactor is the "Async event handling" aspect and thus 
the refactor seems worthy. What do you think?

> Support for fair custom data routing
> ------------------------------------
>
>                 Key: TEZ-3209
>                 URL: https://issues.apache.org/jira/browse/TEZ-3209
>             Project: Apache Tez
>          Issue Type: New Feature
>            Reporter: Ming Ma
>            Assignee: Ming Ma
>         Attachments: TEZ-3209.patch, Tez-based demuxer for highly skewed 
> category data.pdf
>
>
> This is based on offline discussion with [~gopalv], [~hitesh], 
> [~jrottinghuis] and [~lohit] w.r.t. the support for efficient processing of 
> highly skewed unordered partitioned mapper output. Our use case is to demux 
> highly skewed unordered category data partitioned by category name. Gopal and 
> Hitesh mentioned dynamically shuffled join scenario.
> One option we discussed is to leverage auto-parallelism feature with upfront 
> over-partitioning. That means possible overhead to support large number 
> partitions and unnecessary data movement as each reducer needs to get data 
> from all mappers. 
> Another alternative is to use custom {{DataMovementType}} which doesn't 
> require each reducer to fetch data from all mappers. In that way, a large 
> partition will be processed by several reducers, each of which will fetch 
> data from a portion of mappers.
> For example, say there are 100 mappers each of which has 10 partitions (P1, 
> ..., P10). Each mapper generates 100MB for its P10 and 1MB for each of its 
> (P1, ... P9). The default SCATTER_GATHER routing means the reducer for P10 
> has to process 10GB of input and becomes the bottleneck of the job. With the 
> fair custom data routing, The P10 belonging to the first 10 mappers will be 
> processed by one reducer with 1GB input data. The P10 belonging to the second 
> 10 mappers will be processed by another reducer, etc.
> For further optimization, we can allocate the reducer on the same nodes as 
> the mappers that it fetches data from.
> To support this, we need TEZ-3206 as well as customized data routing based on 
> {{VertexManagerPlugin}} and {{EdgeManagerPluginOnDemand}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to