[ 
https://issues.apache.org/jira/browse/TEZ-776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14520815#comment-14520815
 ] 

Siddharth Seth commented on TEZ-776:
------------------------------------

>From another offline discussion to resolve the issues here - the plan is to 
>get this committed in some form, since it's obviously useful for the 
>ScatterGather case, and into a 0.7 release. TEZ-2255 will be taken up at a 
>later point with enhancements as proposed earlier in this jira.


Comments on the patch (776.6.A.patch)
- Can the fields in DataMovementEvent be made final after the new create 
methods ?
- InputFailedEvent.makeCopy javadoc incomplete.
- BroadcastEdgeManager - 
AtomicReference<ArrayList<ArrayList<Collection<DataMovementEvent>>>> 
cachedEvents - would be good to document this structure. It could probably be 
replaced by a Multimap or Table from Guava.
- The entire caching mechanism in BroadcastEdgeManager - will it break 
pipelined shuffle. It looks like a single event is cached for a specific 
version. Multiple events are possible with pipelining (from the current set of 
Inputs in the runtime-library)
- Assuming routeComposite and routeInputFailed don't have the cache structure 
since Composite is not invoked and InputFailed is expected to be invoked less 
often.
- This caching effectively moves storage into the Edge for the Broadcast case, 
except doesn't gain from the edge being aware of the routing pattern in terms 
of CPU efficiency.
- I'm not sure if the caching is necessary. Thought tests showed that extra 
events on the 30 or so handlers don't cause issues.
- Unrelated to this jira, but a minor enhancement to the LOG the type of edge 
during setup will be useful information in the logs. I don't think it exists 
right now.
- 
{code}edgeManager.getClass().getDeclaredMethod("routeCompositeDataMovementEventToDestination",{code}
 This is not necessarily sufficient to determine whether ODR should be enabled 
for the edge. Since there's concrete methods in the base class nothing is 
forcing users to implement all of the methods. A plugin can say that it doesn't 
care about CompositeEvents since it'll never see them, and not implement the 
method. More on this later
- printStackTrace is Edge
- {code}-          if (isFirstEvent) {
-            isFirstEvent = false;
-            // this is the first item - reuse the event object
-            if (isDataMovementEvent) {
-              ((DataMovementEvent) event).setTargetIndex(inputIndex);
-            } else {{code} Does this change the caching of events which made 
Broadcast and OneToOne efficient earlier ?
- Think Hitesh already pointed this out, but a single event can explode into 
multiple events - bigger than the maxEvents limit. Simple fix would be to just 
accept the explosion and ignore the maxEvent limitation in this case.
- In Edge, CompositeDataMovementEvent has it's own try catch throw 
AMUserCodeException, even though there's an outer block doing the same.
- VerexImpl.getTaskAttemptTezEvents - Is taskEvents.size(), and any access to 
taskEvents thread safe ? I don't see any lock under which these are accessed. 
the list can grow within the Vertex writeLock when new events are seen. That 
changes internal data structures in the ArrayList.
- ROOT_INPUT_DATA_INFORMATION_EVENTS - Sending these directly can cause the 
maxEventsPerHeartbeat to be exceeded. MultiMRInput already handles multiple 
RootInput events. MRInput probably will in the future. More on this later.
- builder.append("Sending ").append(attemptID).append(" numEvents: 
").append(events.size()) - Move to DEBUG/TRACE level. Seems like quite a bit of 
logging.
- {code}if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, taskIndex, 
srcTaskIndex ... earlyExit{code} This can cause the number of events returned 
to the task to be lower than maxEventsToGet. There's an optimization in the way 
events are fetched - where if maxEvents are received, we ask for more events 
immediately instead of waiting for the heartbeat. The potential of less events 
can cause that not to be triggered.

For the items mentioned as "more on this later"
Putting the new methods into an interface instead of directly on 
EdgeManagerPlugin should help towards these. That's got several advantages.
- Simpler checks and forcing users to implement the methods if they're using 
ODR.
- A simpler transition to TEZ-2255 for user code if they choose to use that. 
They won't have to implement both ODR and storage.
- Gives us more time to formalize the APIs for efficient routing.

Beyond this, using a mix of ODR and regular routing within an edge should be 
very simple. Just need to track which edges support ODR and which don't. Add 
events for the ones which support ODR to the vertex event list, hand off the 
rest to the task. The getEvents call invokes the task and fills the rest from 
the edge.
RootInputInformationEvents can be handled via Tasks - which would take care of 
limiting the events to maxEventsToGet.

Also, I don't think ODR needs to be added to the OneToOne and Broadcast edges. 
They're already memory efficient, and this adds unnecessary CPU.

Have some numbers which I'll post in the next comment.

> Reduce AM mem usage caused by storing TezEvents
> -----------------------------------------------
>
>                 Key: TEZ-776
>                 URL: https://issues.apache.org/jira/browse/TEZ-776
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Siddharth Seth
>            Assignee: Bikas Saha
>         Attachments: TEZ-776.1.patch, TEZ-776.2.patch, TEZ-776.3.patch, 
> TEZ-776.4.patch, TEZ-776.5.patch, TEZ-776.6.A.patch, TEZ-776.6.B.patch, 
> TEZ-776.7.patch, TEZ-776.ondemand.1.patch, TEZ-776.ondemand.2.patch, 
> TEZ-776.ondemand.3.patch, TEZ-776.ondemand.4.patch, TEZ-776.ondemand.5.patch, 
> TEZ-776.ondemand.6.patch, TEZ-776.ondemand.7.patch, TEZ-776.ondemand.patch, 
> With_Patch_AM_hotspots.png, With_Patch_AM_profile.png, 
> Without_patch_AM_CPU_Usage.png, events-problem-solutions.txt, 
> with_patch_jmc_output_of_AM.png, without_patch_jmc_output_of_AM.png
>
>
> This is open ended at the moment.
> A fair chunk of the AM heap is taken up by TezEvents (specifically 
> DataMovementEvents - 64 bytes per event).
> Depending on the connection pattern - this puts limits on the number of tasks 
> that can be processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to