[ 
https://issues.apache.org/jira/browse/TEZ-776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335991#comment-14335991
 ] 

Siddharth Seth commented on TEZ-776:
------------------------------------

The patch moves away from an MXN memory overhead to a MXN CPU overhead (route() 
invocations instead of MXN event storage).
The CPU usage numbers would be useful in evaluating this, along with the memory 
numbers. We're essentially moving away from bulk routing and storage to 
on-demand routing, which of-course saves memory - but will hit CPU.

IMO, a middle ground which could attain a balance between CPU and memory would 
be better. Any thoughts on Option 4 mentioned previously - 
https://issues.apache.org/jira/browse/TEZ-776?focusedCommentId=14326315&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14326315
 ?
That should help with attaining this middle ground.

Elaborating on that option further, here's some e.g. APIs for the option.
{code}
void handleDataMovementEventsFromSource(CompositeDataMovementEvent event, int 
sourceTaskIndex);
void handleDataMovementEventFromSource(DataMovementEvent event, int 
sourceTaskIndex, int sourceOutputIndex)
DataMovementEvent getEventsForTask(int targetTaskIndex, int maxEvents) // 
Possibly accept a start range, though tracking it internally is better
{code}
The EdgePlugins implement the storage of events. Along with this, they likely 
maintain a counter per destination task.
e.g. ScatterGather would maintain a list of events in the 
ScatterGatherEdgePlugin. It populates these lists when any of the handle*Events 
are invoked.
An invocation to getEvetntsForTask  would 1) Lookup the previous know position 
for the dest task, 2) Lookup maxEvents from the list starting at that index, 3) 
Create the DMEvents and send them back. (Same as storing the events in tasks). 
Bulk routing still exists - and this should lower the number of route 
invocations significantly. We give plugins an opportunity to handle this as 
efficiently as possible.

There's an added advantage here - where plugins can munge the events as 
required. e.g. An InputFailedEvent can OBSOLETE the previous DataMovementEvent 
- and avoid sending it to new downstream tasks. Another case for this is 
handling of TransientDataMovementEvents for Pipelining - where the plugin can 
consolidate events together.
Beyond the CPU overhead, I'm not sure the last two options are possible without 
assistance from user code ?

On the patch itself, some questions: 
- {code}
+  public abstract int routeDataMovementEventToDestination(DataMovementEvent 
event,
+      int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) 
throws Exception;
{code}
This is less expressive than the existing API in terms of routing ? The 
existing API allows for a single event to sent to multiple target indices of 
the same task. While that's likely not used - it is currently an option.

- DataMovementEvent.create(..., targetIndex) - do we want to expose this. This 
allows the source to populate the target; I could write Inputs / Outputs which 
work with empty routing tables, and just have the Inputs / Outputs do the 
routing. The DM + a target index would be better.

- readLock removed from. VertexImpl.getTotalTasks, - this isn't thread safe.

- response.setNextFromEventId. Don't think this is required, and can easily be 
tracked in the AM. The heartbeat mechanism ensures we don't have duplicate 
requests coming through.

- Can routed events go over the requested maxEvents ? (If a single event 
explodes for instance)



> Reduce AM mem usage caused by storing TezEvents
> -----------------------------------------------
>
>                 Key: TEZ-776
>                 URL: https://issues.apache.org/jira/browse/TEZ-776
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Siddharth Seth
>            Assignee: Bikas Saha
>         Attachments: TEZ-776.ondemand.1.patch, TEZ-776.ondemand.patch, 
> events-problem-solutions.txt
>
>
> This is open ended at the moment.
> A fair chunk of the AM heap is taken up by TezEvents (specifically 
> DataMovementEvents - 64 bytes per event).
> Depending on the connection pattern - this puts limits on the number of tasks 
> that can be processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to