[
https://issues.apache.org/jira/browse/TEZ-776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335991#comment-14335991
]
Siddharth Seth commented on TEZ-776:
------------------------------------
The patch moves away from an MXN memory overhead to a MXN CPU overhead (route()
invocations instead of MXN event storage).
The CPU usage numbers would be useful in evaluating this, along with the memory
numbers. We're essentially moving away from bulk routing and storage to
on-demand routing, which of-course saves memory - but will hit CPU.
IMO, a middle ground which could attain a balance between CPU and memory would
be better. Any thoughts on Option 4 mentioned previously -
https://issues.apache.org/jira/browse/TEZ-776?focusedCommentId=14326315&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14326315
?
That should help with attaining this middle ground.
Elaborating on that option further, here's some e.g. APIs for the option.
{code}
void handleDataMovementEventsFromSource(CompositeDataMovementEvent event, int
sourceTaskIndex);
void handleDataMovementEventFromSource(DataMovementEvent event, int
sourceTaskIndex, int sourceOutputIndex)
DataMovementEvent getEventsForTask(int targetTaskIndex, int maxEvents) //
Possibly accept a start range, though tracking it internally is better
{code}
The EdgePlugins implement the storage of events. Along with this, they likely
maintain a counter per destination task.
e.g. ScatterGather would maintain a list of events in the
ScatterGatherEdgePlugin. It populates these lists when any of the handle*Events
are invoked.
An invocation to getEvetntsForTask would 1) Lookup the previous know position
for the dest task, 2) Lookup maxEvents from the list starting at that index, 3)
Create the DMEvents and send them back. (Same as storing the events in tasks).
Bulk routing still exists - and this should lower the number of route
invocations significantly. We give plugins an opportunity to handle this as
efficiently as possible.
There's an added advantage here - where plugins can munge the events as
required. e.g. An InputFailedEvent can OBSOLETE the previous DataMovementEvent
- and avoid sending it to new downstream tasks. Another case for this is
handling of TransientDataMovementEvents for Pipelining - where the plugin can
consolidate events together.
Beyond the CPU overhead, I'm not sure the last two options are possible without
assistance from user code ?
On the patch itself, some questions:
- {code}
+ public abstract int routeDataMovementEventToDestination(DataMovementEvent
event,
+ int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
throws Exception;
{code}
This is less expressive than the existing API in terms of routing ? The
existing API allows for a single event to sent to multiple target indices of
the same task. While that's likely not used - it is currently an option.
- DataMovementEvent.create(..., targetIndex) - do we want to expose this. This
allows the source to populate the target; I could write Inputs / Outputs which
work with empty routing tables, and just have the Inputs / Outputs do the
routing. The DM + a target index would be better.
- readLock removed from. VertexImpl.getTotalTasks, - this isn't thread safe.
- response.setNextFromEventId. Don't think this is required, and can easily be
tracked in the AM. The heartbeat mechanism ensures we don't have duplicate
requests coming through.
- Can routed events go over the requested maxEvents ? (If a single event
explodes for instance)
> Reduce AM mem usage caused by storing TezEvents
> -----------------------------------------------
>
> Key: TEZ-776
> URL: https://issues.apache.org/jira/browse/TEZ-776
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: Siddharth Seth
> Assignee: Bikas Saha
> Attachments: TEZ-776.ondemand.1.patch, TEZ-776.ondemand.patch,
> events-problem-solutions.txt
>
>
> This is open ended at the moment.
> A fair chunk of the AM heap is taken up by TezEvents (specifically
> DataMovementEvents - 64 bytes per event).
> Depending on the connection pattern - this puts limits on the number of tasks
> that can be processed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)