[
https://issues.apache.org/jira/browse/TEZ-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591400#comment-14591400
]
Jeff Zhang edited comment on TEZ-1273 at 6/18/15 7:44 AM:
----------------------------------------------------------
bq. writeLock is used in the statemachine handle() call but readLock is not
used anywhere. Doesn't all read access to data that is updated within the state
transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be
used by other components. Ideally RunningAppContext should be protected by read
lock, but it would easily bring dead lock issue. One example is that
DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not
get it from the state machine object every time to avoid deadlock. Given that
most of fields accessed in RunningAppContext won't change after AM is started
properly, it should be safe without read lock. But in the future, we do need to
consider other better ways to access RunningAppContext.
bq. would it make the code less complex if we had diff transitions for
dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete,
only different on the following part. And this lead less code change, easy for
review.
{code}
switch(dagFinishedEvent.getDAGState()) {
case SUCCEEDED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
successfulDAGs.incrementAndGet();
}
break;
case FAILED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
break;
case KILLED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
killedDAGs.incrementAndGet();
}
break;
case ERROR:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
{code}
bq. this transition seems a bit confusing. We should make it more clear as to
what it is trying to do. Also the comment on submit dag in non-session mode
does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine
layer. Some of the calls need to be synchronous and others are handed off the
state machine. This leads a possibility of race conditions such as 2 dags being
submitted at the same time while the AM is in an idle state. Sending an event
to the state machine here would likely not work. We may need to explicitly
state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is
submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify
components that new dag is submitted.
bq. Why is an internal error ignorable? Can this cause the AM to remain
infinitely in a terminating state?
Fixed.
bq. catch alls hide bugs. We should remove the above.
Fixed.
bq. does work within serviceInit/serviceStart/serviceStop need to be within a
write lock?
Previously they are in synchronized block. I think this is to avoid that case
that DAGAppMasterShutdownHook is called when it is in initialing or starting.
This could prevent these 3 methods invoked at the the same time in different
thread.
Also prevent shutdownTezAM & submitDAGToAppMaster invoked at the same time
(they are also both in synchronized block) write lock here is just to replace
the synchronized keyword.
bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive
terminate signal.
bq. precondition checks are not useful for a state machine. This will throw an
exception from the handle function and lead to an internal error and the AM
shutting down.
Fixed.
bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be
set? What will happen after the dag completion ( after kill ) comes back? In
this case, won't the AM go back to idle state for a session?
bq. confusing comment. Not matching code.
Fixed.
bq. Also, if the version mismatch error occurs and causes AM to go into ERROR
state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares. And it will
unregister from RM
{code}
versionMismatchDiagnostics = "Incompatible versions found"
+ ", clientVersion=" + clientVersion
+ ", AMVersion=" + dagVersionInfo.getVersion();
addDiagnostic(versionMismatchDiagnostics);
{code}
{code}
if (versionMismatch) {
// Short-circuit and return as no DAG should not be run
LOG.info("Version Mismatch, shutting down AM");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return DAGAppMasterState.ERROR;
}
{code}
{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously.
bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster
bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState
RECOVERING.
bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering.
was (Author: zjffdu):
bq. writeLock is used in the statemachine handle() call but readLock is not
used anywhere. Doesn't all read access to data that is updated within the state
transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be
used by other components. Ideally RunningAppContext should be protected by read
lock, but it would easily bring dead lock issue. One example is that
DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not
get it from the state machine object every time to avoid deadlock. Given that
most of fields accessed in RunningAppContext won't change after AM is started
properly, it should be safe without read lock. But in the future, we do need to
consider other better ways to access RunningAppContext.
bq. would it make the code less complex if we had diff transitions for
dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete,
only different on the following part. And this lead less code change, easy for
review.
{code}
switch(dagFinishedEvent.getDAGState()) {
case SUCCEEDED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
successfulDAGs.incrementAndGet();
}
break;
case FAILED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
break;
case KILLED:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
killedDAGs.incrementAndGet();
}
break;
case ERROR:
if (!currentDAG.getName().startsWith(
TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
failedDAGs.incrementAndGet();
}
{code}
bq. this transition seems a bit confusing. We should make it more clear as to
what it is trying to do. Also the comment on submit dag in non-session mode
does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine
layer. Some of the calls need to be synchronous and others are handed off the
state machine. This leads a possibility of race conditions such as 2 dags being
submitted at the same time while the AM is in an idle state. Sending an event
to the state machine here would likely not work. We may need to explicitly
state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is
submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify
components that new dag is submitted.
bq. Why is an internal error ignorable? Can this cause the AM to remain
infinitely in a terminating state?
Fixed.
bq. catch alls hide bugs. We should remove the above.
Fixed.
bq. does work within serviceInit/serviceStart/serviceStop need to be within a
write lock?
originally they are in synchronized block. I think this is to avoid that
multiple thread access serviceStop which may cause conflicts. Also prevent
shutdownTezAM & submitDAGToAppMaster invoked at the same time (they are both in
synchronized block) write lock here is just to replace the synchronized
keyword.
bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive
terminate signal.
bq. precondition checks are not useful for a state machine. This will throw an
exception from the handle function and lead to an internal error and the AM
shutting down.
Fixed.
bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be
set? What will happen after the dag completion ( after kill ) comes back? In
this case, won't the AM go back to idle state for a session?
bq. confusing comment. Not matching code.
Fixed.
bq. Also, if the version mismatch error occurs and causes AM to go into ERROR
state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares. And it will
unregister from RM
{code}
versionMismatchDiagnostics = "Incompatible versions found"
+ ", clientVersion=" + clientVersion
+ ", AMVersion=" + dagVersionInfo.getVersion();
addDiagnostic(versionMismatchDiagnostics);
{code}
{code}
if (versionMismatch) {
// Short-circuit and return as no DAG should not be run
LOG.info("Version Mismatch, shutting down AM");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return DAGAppMasterState.ERROR;
}
{code}
{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously.
bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster
bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState
RECOVERING.
bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering.
> Refactor DAGAppMaster to state machine based
> --------------------------------------------
>
> Key: TEZ-1273
> URL: https://issues.apache.org/jira/browse/TEZ-1273
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Jeff Zhang
> Assignee: Jeff Zhang
> Attachments: DAGAppMaster_3.pdf, DAGAppMaster_4.pdf,
> DAGAppMaster_5.pdf, TEZ-1273-3.patch, TEZ-1273-4.patch, TEZ-1273-5.patch,
> TEZ-1273-6.patch, TEZ-1273-7.patch, TEZ-1273-8.patch, TEZ-1273-9.patch,
> Tez-1273-2.patch, Tez-1273.patch, dag_app_master.pdf, dag_app_master2.pdf
>
>
> Almost all our entities (Vertex, Task etc) are state machine based and
> written using a formal state machine. But DAGAppMaster is not written on a
> formal state machine even though it has a state machine based behavior. This
> jira is for refactoring it into state machine based
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)