Repository: tez Updated Branches: refs/heads/master 1a5317578 -> 9f090279d
TEZ-2394. Issues when there is an error in VertexManager callbacks (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9f090279 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9f090279 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9f090279 Branch: refs/heads/master Commit: 9f090279d269fbcd63b357781318eb2163c82762 Parents: 1a53175 Author: Bikas Saha <[email protected]> Authored: Fri May 1 16:04:21 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri May 1 16:04:21 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/StateChangeNotifier.java | 4 +- .../tez/dag/app/dag/impl/VertexManager.java | 77 ++++++++++++++------ 3 files changed, 58 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7c718ed..609db3c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2394. Issues when there is an error in VertexManager callbacks TEZ-2386. Tez UI: Inconsistent usage of icon colors TEZ-2395. Tez UI: Minimum/Maximum Duration show a empty bracket next to 0 secs when you purposefully failed a job. TEZ-2360. per-io counters flag should generate both overall and per-edge counters http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java index 260cbf3..990bdea 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java @@ -71,7 +71,7 @@ public class StateChangeNotifier { this.listener = listener; } - void sentUpdate() { + void sendUpdate() { listener.onStateUpdated(update); } @@ -105,7 +105,7 @@ public class StateChangeNotifier { continue; } try { - event.sentUpdate(); + event.sendUpdate(); processedEventFromQueue(); } catch (Exception e) { // TODO send user code exception - TEZ-2332 http://git-wip-us.apache.org/repos/asf/tez/blob/9f090279/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 1ed42fc..945d9ba 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -58,6 +58,8 @@ import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.CallableEvent; +import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; @@ -432,10 +434,22 @@ public class VertexManager { } if (eventInFlight.compareAndSet(false, true)) { // no event was in flight + // ensures only 1 event is in flight VertexManagerEvent e = eventQueue.poll(); - Preconditions.checkState(e != null); - ListenableFuture<Void> future = execService.submit(e); - Futures.addCallback(future, e.getCallback()); + if (e != null) { + ListenableFuture<Void> future = execService.submit(e); + Futures.addCallback(future, e.getCallback()); + } else { + // This may happen. Lets say Callback succeeded on threadA. It set eventInFlight to false + // and called tryScheduleNextEvent() and found queue not empty but got paused before it + // could check eventInFlight.compareAndSet(). Another thread managed to dequeue the event + // and schedule a callback. That callback succeeded and set eventInFlight to false, found + // the queue empty and completed. Now threadA woke up and successfully did compareAndSet() + // tried to dequeue an event and got null. + // This could also happen if there is a bug and we manage to schedule for than 1 callback + // verify that is not the case + Preconditions.checkState(eventInFlight.compareAndSet(true, false)); + } } } @@ -484,36 +498,55 @@ public class VertexManager { @Override public void onFailure(Throwable t) { - // stop further event processing - pluginFailed.set(true); - eventQueue.clear(); - // catch real root cause of failure, it would throw UndeclaredThrowableException - // if using UGI.doAs - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); + try { + Preconditions.checkState(eventInFlight.get()); + // stop further event processing + pluginFailed.set(true); + eventQueue.clear(); + // catch real root cause of failure, it would throw UndeclaredThrowableException + // if using UGI.doAs + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + Preconditions.checkState(appContext != null); + Preconditions.checkState(managedVertex != null); + // state change must be triggered via an event transition + appContext.getEventHandler().handle( + new VertexEventManagerUserCodeError(managedVertex.getVertexId(), + new AMUserCodeException(Source.VertexManager, t))); + // enqueue no further events due to user code error + } catch (Exception e) { + sendInternalError(e); } - Preconditions.checkState(appContext != null); - Preconditions.checkState(managedVertex != null); - // state change must be triggered via an event transition - appContext.getEventHandler().handle( - new VertexEventManagerUserCodeError(managedVertex.getVertexId(), - new AMUserCodeException(Source.VertexManager, t))); - // enqueue no further events due to user code error } @Override public void onSuccess(Void result) { - Preconditions.checkState(eventInFlight.get()); - eventInFlight.set(false); - tryScheduleNextEvent(); + try { + onSuccessDerived(result); + Preconditions.checkState(eventInFlight.compareAndSet(true, false)); + tryScheduleNextEvent(); + } catch (Exception e) { + sendInternalError(e); + } + } + + protected void onSuccessDerived(Void result) { + } + + private void sendInternalError(Exception e) { + // fail the DAG so that we dont hang + // state change must be triggered via an event transition + LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e); + appContext.getEventHandler().handle( + (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR))); } } private class VertexManagerRootInputInitializedCallback extends VertexManagerCallback { @Override - public void onSuccess(Void result) { - super.onSuccess(result); + protected void onSuccessDerived(Void result) { if (LOG.isDebugEnabled()) { LOG.debug("vertex:" + managedVertex.getLogIdentifier() + "; after call of VertexManagerPlugin.onRootVertexInitialized" + " on input:"
