[
https://issues.apache.org/jira/browse/TEZ-776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14520815#comment-14520815
]
Siddharth Seth commented on TEZ-776:
------------------------------------
>From another offline discussion to resolve the issues here - the plan is to
>get this committed in some form, since it's obviously useful for the
>ScatterGather case, and into a 0.7 release. TEZ-2255 will be taken up at a
>later point with enhancements as proposed earlier in this jira.
Comments on the patch (776.6.A.patch)
- Can the fields in DataMovementEvent be made final after the new create
methods ?
- InputFailedEvent.makeCopy javadoc incomplete.
- BroadcastEdgeManager -
AtomicReference<ArrayList<ArrayList<Collection<DataMovementEvent>>>>
cachedEvents - would be good to document this structure. It could probably be
replaced by a Multimap or Table from Guava.
- The entire caching mechanism in BroadcastEdgeManager - will it break
pipelined shuffle. It looks like a single event is cached for a specific
version. Multiple events are possible with pipelining (from the current set of
Inputs in the runtime-library)
- Assuming routeComposite and routeInputFailed don't have the cache structure
since Composite is not invoked and InputFailed is expected to be invoked less
often.
- This caching effectively moves storage into the Edge for the Broadcast case,
except doesn't gain from the edge being aware of the routing pattern in terms
of CPU efficiency.
- I'm not sure if the caching is necessary. Thought tests showed that extra
events on the 30 or so handlers don't cause issues.
- Unrelated to this jira, but a minor enhancement to the LOG the type of edge
during setup will be useful information in the logs. I don't think it exists
right now.
-
{code}edgeManager.getClass().getDeclaredMethod("routeCompositeDataMovementEventToDestination",{code}
This is not necessarily sufficient to determine whether ODR should be enabled
for the edge. Since there's concrete methods in the base class nothing is
forcing users to implement all of the methods. A plugin can say that it doesn't
care about CompositeEvents since it'll never see them, and not implement the
method. More on this later
- printStackTrace is Edge
- {code}- if (isFirstEvent) {
- isFirstEvent = false;
- // this is the first item - reuse the event object
- if (isDataMovementEvent) {
- ((DataMovementEvent) event).setTargetIndex(inputIndex);
- } else {{code} Does this change the caching of events which made
Broadcast and OneToOne efficient earlier ?
- Think Hitesh already pointed this out, but a single event can explode into
multiple events - bigger than the maxEvents limit. Simple fix would be to just
accept the explosion and ignore the maxEvent limitation in this case.
- In Edge, CompositeDataMovementEvent has it's own try catch throw
AMUserCodeException, even though there's an outer block doing the same.
- VerexImpl.getTaskAttemptTezEvents - Is taskEvents.size(), and any access to
taskEvents thread safe ? I don't see any lock under which these are accessed.
the list can grow within the Vertex writeLock when new events are seen. That
changes internal data structures in the ArrayList.
- ROOT_INPUT_DATA_INFORMATION_EVENTS - Sending these directly can cause the
maxEventsPerHeartbeat to be exceeded. MultiMRInput already handles multiple
RootInput events. MRInput probably will in the future. More on this later.
- builder.append("Sending ").append(attemptID).append(" numEvents:
").append(events.size()) - Move to DEBUG/TRACE level. Seems like quite a bit of
logging.
- {code}if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, taskIndex,
srcTaskIndex ... earlyExit{code} This can cause the number of events returned
to the task to be lower than maxEventsToGet. There's an optimization in the way
events are fetched - where if maxEvents are received, we ask for more events
immediately instead of waiting for the heartbeat. The potential of less events
can cause that not to be triggered.
For the items mentioned as "more on this later"
Putting the new methods into an interface instead of directly on
EdgeManagerPlugin should help towards these. That's got several advantages.
- Simpler checks and forcing users to implement the methods if they're using
ODR.
- A simpler transition to TEZ-2255 for user code if they choose to use that.
They won't have to implement both ODR and storage.
- Gives us more time to formalize the APIs for efficient routing.
Beyond this, using a mix of ODR and regular routing within an edge should be
very simple. Just need to track which edges support ODR and which don't. Add
events for the ones which support ODR to the vertex event list, hand off the
rest to the task. The getEvents call invokes the task and fills the rest from
the edge.
RootInputInformationEvents can be handled via Tasks - which would take care of
limiting the events to maxEventsToGet.
Also, I don't think ODR needs to be added to the OneToOne and Broadcast edges.
They're already memory efficient, and this adds unnecessary CPU.
Have some numbers which I'll post in the next comment.
> Reduce AM mem usage caused by storing TezEvents
> -----------------------------------------------
>
> Key: TEZ-776
> URL: https://issues.apache.org/jira/browse/TEZ-776
> Project: Apache Tez
> Issue Type: Sub-task
> Reporter: Siddharth Seth
> Assignee: Bikas Saha
> Attachments: TEZ-776.1.patch, TEZ-776.2.patch, TEZ-776.3.patch,
> TEZ-776.4.patch, TEZ-776.5.patch, TEZ-776.6.A.patch, TEZ-776.6.B.patch,
> TEZ-776.7.patch, TEZ-776.ondemand.1.patch, TEZ-776.ondemand.2.patch,
> TEZ-776.ondemand.3.patch, TEZ-776.ondemand.4.patch, TEZ-776.ondemand.5.patch,
> TEZ-776.ondemand.6.patch, TEZ-776.ondemand.7.patch, TEZ-776.ondemand.patch,
> With_Patch_AM_hotspots.png, With_Patch_AM_profile.png,
> Without_patch_AM_CPU_Usage.png, events-problem-solutions.txt,
> with_patch_jmc_output_of_AM.png, without_patch_jmc_output_of_AM.png
>
>
> This is open ended at the moment.
> A fair chunk of the AM heap is taken up by TezEvents (specifically
> DataMovementEvents - 64 bytes per event).
> Depending on the connection pattern - this puts limits on the number of tasks
> that can be processed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)