[
https://issues.apache.org/jira/browse/TEZ-2581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987841#comment-14987841
]
Bikas Saha commented on TEZ-2581:
---------------------------------
DAGAppMaster.java
Can we pass some diagnostics here that says failed to recover?
{code}
if (recoveredDAGData.nonRecoverable) {
DAGEventRecoverEvent recoverDAGEvent =
new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
DAGState.FAILED, recoveredDAGData);{code}
TaskAttemptFinishedEvent.java
Can all the serde code be moved into TezEvent.[to|From]Proto()? This may
prevent code duplication in VertexInitGeneratedEvent.java.
VertexInitializedEvent has an unnecessary diff.
Is getAppId() in TezExampleBase meant to be public?
VertexInitGeneratedEvent.java
Events should be dumb carriers of info. They should not have filtering logic so
that there is a clean separation of logic.
{code} public VertexInitGeneratedEvent(TezVertexID vertexId,
List<TezEvent> events) {
this.vertexId = vertexId;
this.events = Lists.newArrayListWithCapacity(events.size());
for (TezEvent event : events) {
if (EnumSet.of(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)
.contains(event.getEventType())) {
this.events.add(event);
}{code}
TaskCommunicatorManager.java
Formatting is off for the case statements for finished events
RecoveryParser.java
Adding comments to help understand the logic - Since vertex is committed we
cannot rerun it and since vertex recovery events are not completed we cannot
run other vertices that may depend on this one. So we have to abort.
{code} // DAG is not recoverable if vertex has committer and has completed
the commit (based on summary recovery events)
// but its full recovery events are not seen. (based on non-summary
recovery events)
public void isRecoverableNonSummary() {
{code}
Is this called only is dag is not complete? Presumably, if the dag is completed
then not recovering all events is ok.
Is dag completion a summary event? If so, we could find out recovery state
definitely from summary data.
Why are isRecoverableNonSummary() and isRecoverableSummary() inconsistent?
Nonsummary set reason itself but Summary returns a reason string that also
indicates that it is non-recoverable. From the patch, it looks like both are
determined indepedently and nonRecoverable flag is checked first. So a
completed dag may be recovered as failed if its nonRecoverable flag is set.
Test only tag?
{code} public static List<HistoryEvent> readRecoveryEvents(TezConfiguration
tezConf, ApplicationId appId,{code}
maybeCreateTaskAttemptRecoveryData() is defined but seems unused while reading
recovery events?
Created TEZ-2925 for sync issue with history events (e.g. for UI).
DAGImpl.java
The events sent to vertices are not synchronously handled. Thats inconsistent
from the comments here and the comments before the transition code.
{code} // Initialize dag synchronously to generate the vertices and
recover its vertices to the desired state.
dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
for (Vertex v : dag.vertexMap.values()) {
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
vertexDesiredState));
}{code}
Also, the synchronous DAG init transition may send vertices vertex_init events.
Those will execute vertex init logic while the vertex recover events get
processed on the dispatcher???
Can we add a precondition here - e.g. precondition(vertex.isRecovered) because
this will only in recovery, right?
{code} if (recoveryData != null &&
recoveryData.isVertexGroupCommitted(groupInfo.groupName)) {
LOG.info("VertexGroup was already committed as per recovery"
+ " data, groupName=" + groupInfo.groupName);
continue;{code}
We are sending these transforms instead of lists with the vertexid's. These
will end up invoking dag.getVertex() when the recovery event is processed on
some other thread. That will cause locking contention on the dagimpl object. Is
that right? Is it worth it?
{code} Collection<TezVertexID> vertexIds =
Collections2.transform(vertexGroup.groupMembers,
new Function<String, TezVertexID>() {
@Override
public TezVertexID apply(String vertexName) {
return getVertex(vertexName).getVertexId();
}
});
appContext.getHistoryHandler().handleCriticalEvent(new
DAGHistoryEvent(getID(),
new VertexGroupCommitFinishedEvent(getID(),
commitCompletedEvent.getOutputKey().getEntityName(),
vertexIds, clock.getTime())));
{code}
TaskImpl.java
Unresolved TODO.
Also, is termination event correctly handled in the NEW state?.
Should this event have the recovery flag set?
{code} // TODO use tFinishedEvent.getTerminationCause after
TaskTerminationCause to TaskFinishedEvent
task.eventHandler.handle(new TaskEventTermination(task.taskId,
TaskAttemptTerminationCause.UNKNOWN_ERROR,
tFinishedEvent.getDiagnostics()));{code}
IMO, its not quite clear what to do is taskRecovery is not supported. For this
patch, lets keep the current semantics and fail if taskRecovery is not
supported. We should follow up on this in a separate jira. What do you think?
TaskAttemptImpl.java
TaskAttemptEventStartedRemotely and other events sent through recovery path in
ScheduleTaskattemptTransition dont seem to have the isRecovery flag set???
Comment inconsistent with code
{code} // Send TaskAttemptEventContainerTerminated to move it to
KILLED.
// In the future when we can support container recovery, it is not
necessary to send this event.
Preconditions.checkArgument(ta.recoveryData.getTaskAttemptStartedEvent() !=
null,
"No TaskAttemptStartedEvent and TaskAttemptFinishedEvent, but the
TaskAttemptRecoveryData is not null");
LOG.debug("TaskAttempt is still running in the last AM attempt,
attemptId=" + ta.attemptId);
ta.sendEvent(new TaskAttemptEventAttemptKilled(ta.attemptId,{code}
Why are we calling logJobHistoryAttemptStarted() inside the recovery flow in
start transition? Clearly, the start event has been logged and recovered
already. We could just set the launch time from the started event at this
point. This mirrors the launchTime also being set in the normal flow in this
transition.
In SucceededTransition, we could set finish time from recoveryData and from the
clock in normal flow. IMO this is more clear than setting the finish time as a
side effect of calling logAttemptFinished.
Should the AMSchedulerEvent be sent only in the normal code path, since the
recovery path did not go through any allocation in the scheduler. Of course
this would change in work preserving recovery.
Is the change in ContainerCompletedWhileTerminating still valid? From where
would we get the fake event?
VertexImpl.java
This should be in setParallelism2 - where the actual logic is and not in the
temporary wrapper method.
Also, could we keep setParallelism as is and renamed the temporary wrapper as
setParallelismWrapper?
{code} if (!vertexToBeReconfiguredByManager) {
logVertexReconfigureDoneEvent(false);
}{code}
Can we make logInitedEvent() side-effect free by setting the initTime in the
recover path of the transition where initTime is normally set?
We probably should not be calling initializeCommitters() if we are going to
recover a vertex which is fully completed.
{code} private boolean initializeVertex() {
try {
initializeCommitters();{code}
Why is this needed? For a recovered vertex, with dummy vertex manager,
setParallelism should not be invoked, right?
{code} if (canInitVertex() && (recoveryData == null ||
!recoveryData.shouldSkipInit())) {
getEventHandler().handle(new VertexEvent(getVertexId(),
VertexEventType.V_READY_TO_INIT));
}{code}
Also setParallelism could be called multiple times and we should not log
multiple times.
For these reasons, my suggestion would be to consider logging the
reconfigureDone event, if needed, when the vertex_configured notification is
sent. Then it would be free of vertex manager semantics and be guaranteed to be
sent once.
Not clear about why we need the VertexRecoveryData.shouldSkipInit().
VertexRecoveryData should just hold the data recorded for recovery. It should
not understand whether recovery should be skipped or not. That method belongs
in VertexImpl for a clear separation of responsibility.
Why do we need to provide the reconfigure data to the NoOpManager and have it
call setParallelism instead of directly using that data to initialize the
fields in the vertex (e.g. in handleInitEvent())? Essentially, we want the
vertex manager to be no-op right? Is it because we want to ensure that the
vertex_parallelism_updated notification is sent to registered listeners?
In handleInitEvent we should not be calling setParallelism() if we are also
calling it from the NoOpManager?
What if we have come here due to recover transition because dag is complete.
But we dont have the finished event (and there is no committer). Then the if
stmt will fail and we will go through normal code path.
{code} public static class InitTransition implements
MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
// recover from recovery data (NEW->FAILED/KILLED)
if (vertex.recoveryData != null
&& !vertex.recoveryData.isVertexInited()
&& vertex.recoveryData.isVertexFinished()) {
VertexFinishedEvent finishedEvent =
vertex.recoveryData.getVertexFinishedEvent();
vertex.diagnostics.add(finishedEvent.getDiagnostics());
return vertex.finished(finishedEvent.getState());
}{code}
Wny this new state check in the normal code path?
{code} for (Vertex target : vertex.targetVertices.keySet()) {
if (target.getState() == VertexState.NEW) {
vertex.getEventHandler().handle(new
VertexEvent(target.getVertexId(),
VertexEventType.V_INIT));
}{code}
Uninitialized edge may be set by the manager of the downstream vertex. So we
cannot ignore that if we have seen reconfigured event.
{code} if (!vertex.uninitializedEdges.isEmpty() &&
(vertex.recoveryData == null ||
!vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}{code}
This can happen multiple times. Are multiple instances of
vertexInitGeneratedEvent handled?
{code} try {
if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
vertex.logJobHistoryVertexInitGeneratedEvent(inputInfoEvents);
vertex.handleRoutedTezEvents(inputInfoEvents, false);
}{code}
Can we use the reconfigureVertex() variant that allows setting all these values
in a single call?
{code} if (reconfigreDoneEvent.getRootInputSpecUpdates() != null
&& !reconfigreDoneEvent.getRootInputSpecUpdates().isEmpty()) {
getContext().reconfigureVertex(reconfigreDoneEvent.getRootInputSpecUpdates(),
reconfigreDoneEvent.getVertexLocationHint(),
reconfigreDoneEvent.getNumTasks());
} else {
getContext().reconfigureVertex(reconfigreDoneEvent.getNumTasks(),
reconfigreDoneEvent.getVertexLocationHint(),
reconfigreDoneEvent.getSourceEdgeProperties());
}{code}
IMO, the ReconfigureDoneEvent.isReconfiguredByVM() logic is not correct. For
backwards compatibility, vertexReconfigurationPlanned() is not required to be
invoked by VM's. So guarding the reconfigureVertex() in NoOpManager under
isReconfiguredByVM() is not correct since that flag is saved when in the normal
flow, the manager calls vertexReconfigureDone() but that is not required. This
is why I think we should de-link the recovery from the semantics of vertex
manager flow by saving the reconfigureDoneEvent (with reconfigure data if any)
in the maybeSendConfiguredEvent() method when we send the vertex_configured
notification. Upon recovery, if there is reconfigured data, then we will use
the recovered numTasks/etc data to overwrite the vaues from DAG plan using the
NoOpManager via calling setParallelism and without bothering about
reconfigurationPlanned(). Thoughts?
Avoidable info level logging in NoOpManager.
> Umbrella for Tez Recovery Redesign
> ----------------------------------
>
> Key: TEZ-2581
> URL: https://issues.apache.org/jira/browse/TEZ-2581
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Jeff Zhang
> Assignee: Jeff Zhang
> Attachments: TEZ-2581-WIP-1.patch, TEZ-2581-WIP-2.patch,
> TEZ-2581-WIP-3.patch, TEZ-2581-WIP-4.patch, TEZ-2581-WIP-5.patch,
> TEZ-2581-WIP-6.patch, TEZ-2581-WIP-7.patch, TEZ-2581-WIP-8.patch,
> TezRecoveryRedesignProposal.pdf, TezRecoveryRedesignV1.1.pdf
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)