This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new f14baf9  TEZ-4146: Register RUNNING state in DAG's state change 
callback (Rajesh Balamohan, reviewed by Gopal V)
f14baf9 is described below

commit f14baf95f35a66566652a17e592bb02b26beba7d
Author: Rajesh Balamohan <[email protected]>
AuthorDate: Wed Apr 15 09:25:21 2020 +0530

    TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh 
Balamohan, reviewed by Gopal V)
---
 .../main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3cde8e7..b8bdcc9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -164,7 +164,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
   // TODO Recovery
   //private final List<AMInfo> amInfos;
   private final Lock dagStatusLock = new ReentrantLock();
-  private final Condition dagCompletionCondition = 
dagStatusLock.newCondition();
+  private final Condition dagStateChangedCondition = 
dagStatusLock.newCondition();
   private final AtomicBoolean isFinalState = new AtomicBoolean(false);
   private final Lock readLock;
   private final Lock writeLock;
@@ -569,6 +569,8 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   private void augmentStateMachine() {
     stateMachine
+        .registerStateEnteredCallback(DAGState.RUNNING,
+            STATE_CHANGED_CALLBACK)
         .registerStateEnteredCallback(DAGState.SUCCEEDED,
             STATE_CHANGED_CALLBACK)
         .registerStateEnteredCallback(DAGState.FAILED,
@@ -583,10 +585,12 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       implements OnStateChangedCallback<DAGState, DAGImpl> {
     @Override
     public void onStateChanged(DAGImpl dag, DAGState dagState) {
-      dag.isFinalState.set(true);
+      if (dagState != DAGState.RUNNING) {
+        dag.isFinalState.set(true);
+      }
       dag.dagStatusLock.lock();
       try {
-        dag.dagCompletionCondition.signal();
+        dag.dagStateChangedCondition.signal();
       } finally {
         dag.dagStatusLock.unlock();
       }
@@ -946,7 +950,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
         if (isFinalState.get()) {
           break;
         }
-        nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos);
+        nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
       } catch (InterruptedException e) {
         throw new TezException("Interrupted while waiting for dag to 
complete", e);
       } finally {

Reply via email to