Repository: tez Updated Branches: refs/heads/master 2735280c5 -> 9109645e5
TEZ-2968. Counter limits exception causes AM to crash. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9109645e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9109645e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9109645e Branch: refs/heads/master Commit: 9109645e5b7b630601a016e62e39f20678c63dde Parents: 2735280 Author: Hitesh Shah <[email protected]> Authored: Fri Dec 4 09:32:09 2015 -0800 Committer: Hitesh Shah <[email protected]> Committed: Fri Dec 4 09:32:09 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/app/dag/VertexTerminationCause.java | 5 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 50 +++++++++++----- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +++++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 30 +++++++++- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 60 +++++++++++++++++--- .../apache/tez/dag/app/web/AMWebController.java | 58 +++++++++++++------ .../tez/dag/app/dag/impl/TestDAGImpl.java | 39 +++++++++++++ .../tez/dag/app/dag/impl/TestDAGRecovery.java | 5 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 53 ++++++++++++++++- 10 files changed, 270 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b6c6034..a5db06d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2968. Counter limits exception causes AM to crash. TEZ-2960. Tez UI: Move hardcoded url namespace to the configuration file TEZ-2581. Umbrella for Tez Recovery Redesign TEZ-2956. Handle auto-reduce parallelism when the @@ -270,6 +271,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-2968. Counter limits exception causes AM to crash. TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in counters page without any delay TEZ-2949. Allow duplicate dag names within session for Tez. TEZ-2923. Tez Live UI counters view empty for vertices, tasks, attempts http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java index 28712ad..816f85a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java @@ -58,7 +58,10 @@ public enum VertexTerminationCause { INTERNAL_ERROR(VertexState.ERROR), /** error when writing recovery log */ - RECOVERY_ERROR(VertexState.FAILED); + RECOVERY_ERROR(VertexState.FAILED), + + /** This vertex failed due to counter limits exceeded. */ + COUNTER_LIMITS_EXCEEDED(VertexState.FAILED); private VertexState finishedState; http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index f395e62..f3b95f6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.state.OnStateChangedCallback; import org.apache.tez.state.StateMachineTez; import org.slf4j.Logger; @@ -197,7 +198,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final Configuration dagConf; private final DAGPlan jobPlan; - + + private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false); + Map<String, LocalResource> localResources; long startDAGCpuTime = 0; @@ -1133,11 +1136,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); - addDiagnostic("Invalid event " + event.getType() + - " on Job " + this.dagId); + String message = "Invalid event " + event.getType() + " on Dag " + this.dagId + + " at currentState=" + oldState; + LOG.error("Can't handle " + message, e); + addDiagnostic(message); eventHandler.handle(new DAGEvent(this.dagId, DAGEventType.INTERNAL_ERROR)); + } catch (RuntimeException e) { + String message = "Uncaught Exception when handling event " + event.getType() + + " on Dag " + this.dagId + " at currentState=" + oldState; + LOG.error(message, e); + addDiagnostic(message); + if (!internalErrorTriggered.getAndSet(true)) { + // to prevent a recursive loop + eventHandler.handle(new DAGEvent(this.dagId, + DAGEventType.INTERNAL_ERROR)); + } } //notify the eventhandler of state change if (oldState != getInternalState()) { @@ -1202,26 +1216,27 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } - void logJobHistoryFinishedEvent() throws IOException { + void logJobHistoryFinishedEvent(TezCounters counters) throws IOException { if (recoveryData == null || recoveryData.getDAGFinishedEvent() == null) { Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(), - finishTime, DAGState.SUCCEEDED, "", getAllCounters(), + finishTime, DAGState.SUCCEEDED, "", counters, this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); } } - void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException { + void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters) throws IOException { if (recoveryData == null || recoveryData.getDAGFinishedEvent() == null) { Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); + DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L, clock.getTime(), state, StringUtils.join(getDiagnostics(), LINE_SEPARATOR), - getAllCounters(), this.userName, this.dagName, taskStats, + counters, this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(dagId, finishEvt)); @@ -1303,7 +1318,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } else { Preconditions.checkState(dag.getState() == DAGState.TERMINATING - || dag.getState() == DAGState.COMMITTING, + || dag.getState() == DAGState.COMMITTING, "DAG should be in COMMITTING/TERMINATING state, but in " + dag.getState()); if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != dag.numVertices) { // pending commits are running or still some vertices are not completed @@ -1343,12 +1358,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // update cpu time counters before finishing the dag updateCpuCounters(); - + TezCounters counters = null; + try { + counters = getAllCounters(); + } catch (LimitExceededException e) { + addDiagnostic("Counters limit exceeded: " + e.getMessage()); + finalState = DAGState.FAILED; + } + try { if (finalState == DAGState.SUCCEEDED) { - logJobHistoryFinishedEvent(); + logJobHistoryFinishedEvent(counters); } else { - logJobHistoryUnsuccesfulEvent(finalState); + logJobHistoryUnsuccesfulEvent(finalState, counters); } } catch (IOException e) { LOG.warn("Failed to persist recovery event for DAG completion" @@ -2226,11 +2248,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, SingleArcTransition<DAGImpl, DAGEvent> { @Override public void transition(DAGImpl job, DAGEvent event) { - //TODO Is this JH event required. LOG.info(job.getID() + " terminating due to internal error"); // terminate all vertices - job.enactKill(DAGTerminationCause.INTERNAL_ERROR, - VertexTerminationCause.INTERNAL_ERROR); + job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR); job.setFinishTime(); job.cancelCommits(); job.finished(DAGState.ERROR); http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 957abcf..bd65b8d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -805,6 +805,20 @@ public class TaskAttemptImpl implements TaskAttempt, this.attemptId.getTaskID().getVertexID().getDAGId(), DAGEventType.INTERNAL_ERROR) ); + } catch (RuntimeException e) { + LOG.error("Uncaught exception when handling event " + event.getType() + + " at current state " + oldState + " for " + + this.attemptId, e); + eventHandler.handle(new DAGEventDiagnosticsUpdate( + this.attemptId.getTaskID().getVertexID().getDAGId(), + "Uncaught exception when handling event " + event.getType() + + " on TaskAttempt " + this.attemptId + + " at state " + oldState + ", error=" + e.getMessage())); + eventHandler.handle( + new DAGEvent( + this.attemptId.getTaskID().getVertexID().getDAGId(), + DAGEventType.INTERNAL_ERROR) + ); } if (oldState != getInternalState()) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 55dd518..0f76a63 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -35,6 +35,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -776,9 +777,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state for " - + this.taskId, e); + LOG.error("Can't handle this event" + event.getType() + + " at current state " + oldState + " for task " + this.taskId, e); internalError(event.getType()); + } catch (RuntimeException e) { + LOG.error("Uncaught exception when trying handle event " + event.getType() + + " at current state " + oldState + " for task " + this.taskId, e); + internalErrorUncaughtException(event.getType(), e); } if (oldState != getInternalState()) { if (LOG.isDebugEnabled()) { @@ -802,6 +807,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { DAGEventType.INTERNAL_ERROR)); } + protected void internalErrorUncaughtException(TaskEventType type, Exception e) { + eventHandler.handle(new DAGEventDiagnosticsUpdate( + this.taskId.getVertexID().getDAGId(), "Uncaught exception when handling event " + type + + " on Task " + this.taskId + ", error=" + e.getMessage())); + eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(), + DAGEventType.INTERNAL_ERROR)); + } + + private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId, TaskAttemptStateInternal attemptState) { eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, attemptState)); @@ -1403,4 +1417,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { */ } } + + @Private + @VisibleForTesting + void setCounters(TezCounters counters) { + try { + writeLock.lock(); + this.counters = counters; + } finally { + writeLock.unlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 758c637..f3cfb58 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; @@ -683,6 +684,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl final AtomicBoolean vmIsInitialized = new AtomicBoolean(false); final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); + private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false); + @VisibleForTesting Map<Vertex, Edge> sourceVertices; private Map<Vertex, Edge> targetVertices; @@ -1796,6 +1799,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl addDiagnostic(message); eventHandler.handle(new VertexEvent(this.vertexId, VertexEventType.V_INTERNAL_ERROR)); + } catch (RuntimeException e) { + String message = "Uncaught Exception when handling event " + event.getType() + + " on vertex " + this.vertexName + + " with vertexId " + this.vertexId + + " at current state " + oldState; + LOG.error(message, e); + addDiagnostic(message); + if (!internalErrorTriggered.getAndSet(true)) { + eventHandler.handle(new VertexEvent(this.vertexId, + VertexEventType.V_INTERNAL_ERROR)); + } } if (oldState != getInternalState()) { @@ -1876,20 +1890,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl void logJobHistoryVertexFinishedEvent() throws IOException { if (recoveryData == null || !recoveryData.isVertexSucceeded()) { - logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, ""); + logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "", + getAllCounters()); } } void logJobHistoryVertexFailedEvent(VertexState state) throws IOException { if (recoveryData == null || !recoveryData.isVertexFinished()) { + TezCounters counters = null; + try { + counters = getAllCounters(); + } catch (LimitExceededException e) { + // Ignore as failed vertex + addDiagnostic("Counters limit exceeded: " + e.getMessage()); + } + logJobHistoryVertexCompletedHelper(state, clock.getTime(), - StringUtils.join(getDiagnostics(), LINE_SEPARATOR)); + StringUtils.join(getDiagnostics(), LINE_SEPARATOR), counters); } } private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime, - String diagnostics) throws IOException { + String diagnostics, TezCounters counters) throws IOException { Map<String, Integer> taskStats = new HashMap<String, Integer>(); taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount); taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount); @@ -1900,7 +1923,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested, initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics, - getAllCounters(), getVertexStats(), taskStats); + counters, getVertexStats(), taskStats); this.appContext.getHistoryHandler().handleCriticalEvent( new DAGHistoryEvent(getDAGId(), finishEvt)); } @@ -2041,7 +2064,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } private static VertexState finishWithTerminationCause(VertexImpl vertex) { - Preconditions.checkArgument(vertex.getTerminationCause()!= null, "TerminationCause is not set"); + Preconditions.checkArgument(vertex.getTerminationCause() != null, "TerminationCause is not set"); String diagnosticMsg = "Vertex did not succeed due to " + vertex.getTerminationCause() + ", failedTasks:" + vertex.failedTaskCount + " killedTasks:" + vertex.killedTaskCount; @@ -2114,9 +2137,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl break; case SUCCEEDED: try { - logJobHistoryVertexFinishedEvent(); - eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), - finalState)); + try { + logJobHistoryVertexFinishedEvent(); + eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), + finalState)); + } catch (LimitExceededException e) { + LOG.error("Counter limits exceeded for vertex: " + getLogIdentifier(), e); + finalState = VertexState.FAILED; + addDiagnostic("Counters limit exceeded: " + e.getMessage()); + trySetTerminationCause(VertexTerminationCause.COUNTER_LIMITS_EXCEEDED); + logJobHistoryVertexFailedEvent(finalState); + eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), + finalState)); + } } catch (IOException e) { LOG.error("Failed to send vertex finished event to recovery", e); finalState = VertexState.FAILED; @@ -4354,4 +4387,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl configurationDoneEvent.getRootInputSpecUpdates()); } } + + @Private + @VisibleForTesting + void setCounters(TezCounters counters) { + try { + writeLock.lock(); + this.fullCounters = counters; + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index f91bc42..c70fdae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -42,6 +42,7 @@ import com.google.inject.Inject; import com.google.inject.name.Named; import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.client.ProgressBuilder; @@ -511,12 +512,17 @@ public class AMWebController extends Controller { dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress())); dagInfo.put("status", dag.getState().toString()); - if (counterNames != null && !counterNames.isEmpty()) { - TezCounters counters = dag.getCachedCounters(); - Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); - if (counterMap != null && !counterMap.isEmpty()) { - dagInfo.put("counters", counterMap); + try { + if (counterNames != null && !counterNames.isEmpty()) { + TezCounters counters = dag.getCachedCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + dagInfo.put("counters", counterMap); + } } + } catch (LimitExceededException e) { + // Ignore + // TODO: add an error message instead for counter key } renderJSON(ImmutableMap.of( "dag", dagInfo @@ -576,12 +582,17 @@ public class AMWebController extends Controller { vertexInfo.put("killedTaskAttempts", Integer.toString(vertexProgress.getKilledTaskAttemptCount())); - if (counterNames != null && !counterNames.isEmpty()) { - TezCounters counters = vertex.getCachedCounters(); - Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); - if (counterMap != null && !counterMap.isEmpty()) { - vertexInfo.put("counters", counterMap); + try { + if (counterNames != null && !counterNames.isEmpty()) { + TezCounters counters = vertex.getCachedCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + vertexInfo.put("counters", counterMap); + } } + } catch (LimitExceededException e) { + // Ignore + // TODO: add an error message instead for counter key } return vertexInfo; @@ -733,11 +744,17 @@ public class AMWebController extends Controller { taskInfo.put("progress", Float.toString(t.getProgress())); taskInfo.put("status", t.getState().toString()); - TezCounters counters = t.getCounters(); - Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); - if (counterMap != null && !counterMap.isEmpty()) { - taskInfo.put("counters", counterMap); + try { + TezCounters counters = t.getCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + taskInfo.put("counters", counterMap); + } + } catch (LimitExceededException e) { + // Ignore + // TODO: add an error message instead for counter key } + tasksInfo.add(taskInfo); } @@ -825,10 +842,15 @@ public class AMWebController extends Controller { attemptInfo.put("progress", Float.toString(a.getProgress())); attemptInfo.put("status", a.getState().toString()); - TezCounters counters = a.getCounters(); - Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); - if (counterMap != null && !counterMap.isEmpty()) { - attemptInfo.put("counters", counterMap); + try { + TezCounters counters = a.getCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + attemptInfo.put("counters", counterMap); + } + } catch (LimitExceededException e) { + // Ignore + // TODO: add an error message instead for counter key } attemptsInfo.add(attemptInfo); } http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index ccff6b0..31b4f76 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -38,6 +38,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; +import org.apache.tez.common.counters.Limits; +import org.apache.tez.common.counters.TezCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -185,6 +187,14 @@ public class TestDAGImpl { private TaskAttemptEventDispatcher taskAttemptEventDispatcher; private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); + static { + Limits.reset(); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100); + conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100); + Limits.setConfiguration(conf); + } + private DAGImpl chooseDAG(TezDAGID curDAGId) { if (curDAGId.equals(dagId)) { return dag; @@ -2210,4 +2220,33 @@ public class TestDAGImpl { } } } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testCounterLimits() { + initDAG(mrrDag); + dispatcher.await(); + startDAG(mrrDag); + dispatcher.await(); + for (int i=0; i<3; ++i) { + Vertex v = mrrDag.getVertex("vertex"+(i+1)); + dispatcher.getEventHandler().handle(new VertexEventTaskCompleted( + TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED)); + TezCounters ctrs = new TezCounters(); + for (int j = 0; j < 50; ++j) { + ctrs.findCounter("g", "c" + i + "_" + j).increment(1); + } + ((VertexImpl) v).setCounters(ctrs); + dispatcher.await(); + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices()); + } + + Assert.assertEquals(3, mrrDag.getSuccessfulVertices()); + Assert.assertEquals(DAGState.FAILED, mrrDag.getState()); + Assert.assertTrue("Diagnostics should contain counter limits error message", + StringUtils.join(mrrDag.getDiagnostics(), ",").contains("Counters limit exceeded")); + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index 803edf7..3eed717 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -1021,7 +1021,10 @@ public class TestDAGRecovery { ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""); List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>(); - taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null)); + EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", + "vertex3", ta1t1v1Id); + taGeneratedEvents.add(new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), + sourceInfo)); TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent( ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, TaskAttemptState.SUCCEEDED, null, "", null, http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 11c2bf1..9453df8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -51,6 +51,8 @@ import com.google.protobuf.ByteString; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.counters.Limits; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; @@ -250,6 +252,14 @@ public class TestVertexImpl { private StateChangeNotifierForTest updateTracker; private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption; + static { + Limits.reset(); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100); + conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100); + Limits.setConfiguration(conf); + } + public static class CountingOutputCommitter extends OutputCommitter { public int initCounter = 0; @@ -5861,7 +5871,11 @@ public class TestVertexImpl { //Send VertexManagerEvent long[] sizes = new long[]{(100 * 1000l * 1000l)}; Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C"); - TezEvent tezEvent = new TezEvent(vmEvent, null); + + TezTaskAttemptID taId = TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vC.getVertexId(), 1), 1); + EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "C", "C", taId); + TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo); dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(), Lists.newArrayList(tezEvent))); dispatcher.await(); @@ -6677,5 +6691,42 @@ public class TestVertexImpl { void setContext(InputInitializerContext context); } + @Test(timeout = 5000) + public void testCounterLimits() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex2"); + startVertex(v); + + TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0); + TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1); + + for (int i = 0; i < 2; ++i) { + TezCounters ctrs = new TezCounters(); + for (int j = 0; j < 75; ++j) { + ctrs.findCounter("g", "c" + i + "_" + j).increment(1); + } + Task t = v.getTask(i); + ((TaskImpl) t).setCounters(ctrs); + } + + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(1, v.getCompletedTasks()); + Assert.assertTrue((0.5f) == v.getCompletedTaskProgress()); + + dispatcher.getEventHandler().handle( + new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); + dispatcher.await(); + Assert.assertEquals(VertexState.FAILED, v.getState()); + Assert.assertEquals(2, v.getCompletedTasks()); + + System.out.println(v.getDiagnostics()); + Assert.assertTrue("Diagnostics should contain counter limits error message", + StringUtils.join(v.getDiagnostics(), ",").contains("Counters limit exceeded")); + + } }
