This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new f14baf9 TEZ-4146: Register RUNNING state in DAG's state change
callback (Rajesh Balamohan, reviewed by Gopal V)
f14baf9 is described below
commit f14baf95f35a66566652a17e592bb02b26beba7d
Author: Rajesh Balamohan <[email protected]>
AuthorDate: Wed Apr 15 09:25:21 2020 +0530
TEZ-4146: Register RUNNING state in DAG's state change callback (Rajesh
Balamohan, reviewed by Gopal V)
---
.../main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3cde8e7..b8bdcc9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -164,7 +164,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
// TODO Recovery
//private final List<AMInfo> amInfos;
private final Lock dagStatusLock = new ReentrantLock();
- private final Condition dagCompletionCondition =
dagStatusLock.newCondition();
+ private final Condition dagStateChangedCondition =
dagStatusLock.newCondition();
private final AtomicBoolean isFinalState = new AtomicBoolean(false);
private final Lock readLock;
private final Lock writeLock;
@@ -569,6 +569,8 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
private void augmentStateMachine() {
stateMachine
+ .registerStateEnteredCallback(DAGState.RUNNING,
+ STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.SUCCEEDED,
STATE_CHANGED_CALLBACK)
.registerStateEnteredCallback(DAGState.FAILED,
@@ -583,10 +585,12 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
implements OnStateChangedCallback<DAGState, DAGImpl> {
@Override
public void onStateChanged(DAGImpl dag, DAGState dagState) {
- dag.isFinalState.set(true);
+ if (dagState != DAGState.RUNNING) {
+ dag.isFinalState.set(true);
+ }
dag.dagStatusLock.lock();
try {
- dag.dagCompletionCondition.signal();
+ dag.dagStateChangedCondition.signal();
} finally {
dag.dagStatusLock.unlock();
}
@@ -946,7 +950,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
if (isFinalState.get()) {
break;
}
- nanosLeft = dagCompletionCondition.awaitNanos(timeoutNanos);
+ nanosLeft = dagStateChangedCondition.awaitNanos(timeoutNanos);
} catch (InterruptedException e) {
throw new TezException("Interrupted while waiting for dag to
complete", e);
} finally {