[ 
https://issues.apache.org/jira/browse/TEZ-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591400#comment-14591400
 ] 

Jeff Zhang edited comment on TEZ-1273 at 6/18/15 7:44 AM:
----------------------------------------------------------

bq. writeLock is used in the statemachine handle() call but readLock is not 
used anywhere. Doesn't all read access to data that is updated within the state 
transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be 
used by other components. Ideally RunningAppContext should be protected by read 
lock, but it would easily bring dead lock issue. One example is that 
DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not 
get it from the state machine object every time to avoid deadlock. Given that 
most of fields accessed in RunningAppContext won't change after AM is started 
properly, it should be safe without read lock. But in the future, we do need to 
consider other better ways to access RunningAppContext.

bq. would it make the code less complex if we had diff transitions for 
dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete, 
only different on the following part. And this lead less code change, easy for 
review. 
{code}
 switch(dagFinishedEvent.getDAGState()) {
      case SUCCEEDED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          successfulDAGs.incrementAndGet();
        }
        break;
      case FAILED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
        
        break;
      case KILLED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          killedDAGs.incrementAndGet();
        }
        break;
      case ERROR:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
{code}

bq. this transition seems a bit confusing. We should make it more clear as to 
what it is trying to do. Also the comment on submit dag in non-session mode 
does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine 
layer. Some of the calls need to be synchronous and others are handed off the 
state machine. This leads a possibility of race conditions such as 2 dags being 
submitted at the same time while the AM is in an idle state. Sending an event 
to the state machine here would likely not work. We may need to explicitly 
state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is 
submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify 
components that new dag is submitted. 

bq. Why is an internal error ignorable? Can this cause the AM to remain 
infinitely in a terminating state?
Fixed.

bq. catch alls hide bugs. We should remove the above.
Fixed.

bq. does work within serviceInit/serviceStart/serviceStop need to be within a 
write lock?
Previously they are in synchronized block. I think this is to avoid that case 
that DAGAppMasterShutdownHook is called when it is in initialing or starting. 
This could prevent these 3 methods invoked at the the same time in different 
thread. 
Also prevent shutdownTezAM & submitDAGToAppMaster invoked at the same time 
(they are also both in synchronized block) write lock here is just to replace 
the synchronized keyword. 

bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive 
terminate signal. 

bq. precondition checks are not useful for a state machine. This will throw an 
exception from the handle function and lead to an internal error and the AM 
shutting down.
Fixed.

bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be 
set? What will happen after the dag completion ( after kill ) comes back? In 
this case, won't the AM go back to idle state for a session?

bq. confusing comment. Not matching code.
Fixed.

bq. Also, if the version mismatch error occurs and causes AM to go into ERROR 
state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares.  And it will 
unregister from RM
{code}
     versionMismatchDiagnostics = "Incompatible versions found"
          + ", clientVersion=" + clientVersion
          + ", AMVersion=" + dagVersionInfo.getVersion();
      addDiagnostic(versionMismatchDiagnostics);
{code}

{code}
if (versionMismatch) {
      // Short-circuit and return as no DAG should not be run
      LOG.info("Version Mismatch, shutting down AM");
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return DAGAppMasterState.ERROR;
    }
{code}

{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously. 


bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster

bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState 
RECOVERING.

bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering. 


was (Author: zjffdu):
bq. writeLock is used in the statemachine handle() call but readLock is not 
used anywhere. Doesn't all read access to data that is updated within the state 
transitions need to be protected by a read lock?
I think currently DAGAppMaster expose itself through RunningAppContext to be 
used by other components. Ideally RunningAppContext should be protected by read 
lock, but it would easily bring dead lock issue. One example is that 
DAGAppMaster#getAMState, I use a volatile field state in DAGAppMaster but not 
get it from the state machine object every time to avoid deadlock. Given that 
most of fields accessed in RunningAppContext won't change after AM is started 
properly, it should be safe without read lock. But in the future, we do need to 
consider other better ways to access RunningAppContext.

bq. would it make the code less complex if we had diff transitions for 
dag_succeded, dag_failed, dag_internal_error, dag_killed?
I'm afraid not. They share lots of common code in DAGAppMaster#dagComplete, 
only different on the following part. And this lead less code change, easy for 
review. 
{code}
 switch(dagFinishedEvent.getDAGState()) {
      case SUCCEEDED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          successfulDAGs.incrementAndGet();
        }
        break;
      case FAILED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
        
        break;
      case KILLED:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          killedDAGs.incrementAndGet();
        }
        break;
      case ERROR:
        if (!currentDAG.getName().startsWith(
            TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
          failedDAGs.incrementAndGet();
        }
{code}

bq. this transition seems a bit confusing. We should make it more clear as to 
what it is trying to do. Also the comment on submit dag in non-session mode 
does not seem right.
bq. Related to the above, we have the rpc/api layer and the state machine 
layer. Some of the calls need to be synchronous and others are handed off the 
state machine. This leads a possibility of race conditions such as 2 dags being 
submitted at the same time while the AM is in an idle state. Sending an event 
to the state machine here would likely not work. We may need to explicitly 
state to RUNNING from IDLE to prevent such races. Any thoughts on handling this?
Introduce a new event type DAG_SUBMITTED to represent that a new DAG is 
submitted through rpc and rename NEW_DAG_SUBMITTED to DAG_PREPARE to notify 
components that new dag is submitted. 

bq. Why is an internal error ignorable? Can this cause the AM to remain 
infinitely in a terminating state?
Fixed.

bq. catch alls hide bugs. We should remove the above.
Fixed.

bq. does work within serviceInit/serviceStart/serviceStop need to be within a 
write lock?
originally they are in synchronized block. I think this is to avoid that 
multiple thread access serviceStop which may cause conflicts. Also prevent 
shutdownTezAM & submitDAGToAppMaster invoked at the same time (they are both in 
synchronized block) write lock here is just to replace the synchronized 
keyword. 

bq. can serviceStop be triggered by something outside of the state machine?
Yes, it is possible when DAGAppMasterShutdownHook is invoked when AM receive 
terminate signal. 

bq. precondition checks are not useful for a state machine. This will throw an 
exception from the handle function and lead to an internal error and the AM 
shutting down.
Fixed.

bq. For ShutdownWhenRunningTransition, shouldn't the sessionStopped flag be 
set? What will happen after the dag completion ( after kill ) comes back? In 
this case, won't the AM go back to idle state for a session?

bq. confusing comment. Not matching code.
Fixed.

bq. Also, if the version mismatch error occurs and causes AM to go into ERROR 
state - how does the AM unregister from RM and update diagnostics?
diagnostics has already been updated when version compares.  And it will 
unregister from RM
{code}
     versionMismatchDiagnostics = "Incompatible versions found"
          + ", clientVersion=" + clientVersion
          + ", AMVersion=" + dagVersionInfo.getVersion();
      addDiagnostic(versionMismatchDiagnostics);
{code}

{code}
if (versionMismatch) {
      // Short-circuit and return as no DAG should not be run
      LOG.info("Version Mismatch, shutting down AM");
      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
      shutdownHandler.shutdown();
      return DAGAppMasterState.ERROR;
    }
{code}

{code}
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.SESSION_TIMEOUT));
{code}
bq. does this introduce a race with submitDag() ?
Fix it by handle session time event synchronously. 


bq. DrainDispatcher.java in tez-dag/src/test/ - any reason why we need this?
It is just for unit test. Used in TestMockDAGAppMaster

bq. LocalClient changes?
Minor change on LocalClient because we introduce a new DAGAppMasterState 
RECOVERING.

bq. TestMemoryWithEvents
Due to change on MockTezClient to simulate recovering. 

> Refactor DAGAppMaster to state machine based
> --------------------------------------------
>
>                 Key: TEZ-1273
>                 URL: https://issues.apache.org/jira/browse/TEZ-1273
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Jeff Zhang
>            Assignee: Jeff Zhang
>         Attachments: DAGAppMaster_3.pdf, DAGAppMaster_4.pdf, 
> DAGAppMaster_5.pdf, TEZ-1273-3.patch, TEZ-1273-4.patch, TEZ-1273-5.patch, 
> TEZ-1273-6.patch, TEZ-1273-7.patch, TEZ-1273-8.patch, TEZ-1273-9.patch, 
> Tez-1273-2.patch, Tez-1273.patch, dag_app_master.pdf, dag_app_master2.pdf
>
>
> Almost all our entities (Vertex, Task etc) are state machine based and 
> written using a formal state machine. But DAGAppMaster is not written on a 
> formal state machine even though it has a state machine based behavior. This 
> jira is for refactoring it into state machine based



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to