Repository: tez
Updated Branches:
  refs/heads/master 2735280c5 -> 9109645e5


TEZ-2968. Counter limits exception causes AM to crash. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9109645e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9109645e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9109645e

Branch: refs/heads/master
Commit: 9109645e5b7b630601a016e62e39f20678c63dde
Parents: 2735280
Author: Hitesh Shah <[email protected]>
Authored: Fri Dec 4 09:32:09 2015 -0800
Committer: Hitesh Shah <[email protected]>
Committed: Fri Dec 4 09:32:09 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../tez/dag/app/dag/VertexTerminationCause.java |  5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 50 +++++++++++-----
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 14 +++++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 30 +++++++++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 60 +++++++++++++++++---
 .../apache/tez/dag/app/web/AMWebController.java | 58 +++++++++++++------
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 39 +++++++++++++
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  5 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 53 ++++++++++++++++-
 10 files changed, 270 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b6c6034..a5db06d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-2968. Counter limits exception causes AM to crash.
   TEZ-2960. Tez UI: Move hardcoded url namespace to the configuration file
   TEZ-2581. Umbrella for Tez Recovery Redesign
   TEZ-2956. Handle auto-reduce parallelism when the
@@ -270,6 +271,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2968. Counter limits exception causes AM to crash.
   TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in 
counters page without any delay
   TEZ-2949. Allow duplicate dag names within session for Tez.
   TEZ-2923. Tez Live UI counters view empty for vertices, tasks, attempts

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 28712ad..816f85a 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -58,7 +58,10 @@ public enum VertexTerminationCause {
   INTERNAL_ERROR(VertexState.ERROR),
   
   /** error when writing recovery log */ 
-  RECOVERY_ERROR(VertexState.FAILED);
+  RECOVERY_ERROR(VertexState.FAILED),
+
+  /** This vertex failed due to counter limits exceeded. */
+  COUNTER_LIMITS_EXCEEDED(VertexState.FAILED);
 
   private VertexState finishedState;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f395e62..f3b95f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
@@ -197,7 +198,9 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   private final Configuration dagConf;
   private final DAGPlan jobPlan;
-  
+
+  private final AtomicBoolean internalErrorTriggered = new 
AtomicBoolean(false);
+
   Map<String, LocalResource> localResources;
   
   long startDAGCpuTime = 0;
@@ -1133,11 +1136,22 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        addDiagnostic("Invalid event " + event.getType() +
-            " on Job " + this.dagId);
+        String message = "Invalid event " + event.getType() + " on Dag " + 
this.dagId
+            + " at currentState=" + oldState;
+        LOG.error("Can't handle " + message, e);
+        addDiagnostic(message);
         eventHandler.handle(new DAGEvent(this.dagId,
             DAGEventType.INTERNAL_ERROR));
+      } catch (RuntimeException e) {
+        String message = "Uncaught Exception when handling event " + 
event.getType()
+            + " on Dag " + this.dagId + " at currentState=" + oldState;
+        LOG.error(message, e);
+        addDiagnostic(message);
+        if (!internalErrorTriggered.getAndSet(true)) {
+          // to prevent a recursive loop
+          eventHandler.handle(new DAGEvent(this.dagId,
+              DAGEventType.INTERNAL_ERROR));
+        }
       }
       //notify the eventhandler of state change
       if (oldState != getInternalState()) {
@@ -1202,26 +1216,27 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  void logJobHistoryFinishedEvent() throws IOException {
+  void logJobHistoryFinishedEvent(TezCounters counters) throws IOException {
     if (recoveryData == null
         || recoveryData.getDAGFinishedEvent() == null) {
       Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
       DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
-          finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
+          finishTime, DAGState.SUCCEEDED, "", counters,
           this.userName, this.dagName, taskStats, 
this.appContext.getApplicationAttemptId());
       this.appContext.getHistoryHandler().handleCriticalEvent(
           new DAGHistoryEvent(dagId, finishEvt));
     }
   }
 
-  void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
+  void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters) 
throws IOException {
     if (recoveryData == null
         || recoveryData.getDAGFinishedEvent() == null) {
       Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+
       DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L,
           clock.getTime(), state,
           StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
-          getAllCounters(), this.userName, this.dagName, taskStats,
+          counters, this.userName, this.dagName, taskStats,
           this.appContext.getApplicationAttemptId());
       this.appContext.getHistoryHandler().handleCriticalEvent(
           new DAGHistoryEvent(dagId, finishEvt));
@@ -1303,7 +1318,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       }
     } else {
       Preconditions.checkState(dag.getState() == DAGState.TERMINATING
-          || dag.getState() == DAGState.COMMITTING,
+              || dag.getState() == DAGState.COMMITTING,
           "DAG should be in COMMITTING/TERMINATING state, but in " + 
dag.getState());
       if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != 
dag.numVertices) {
         // pending commits are running or still some vertices are not completed
@@ -1343,12 +1358,19 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
     // update cpu time counters before finishing the dag
     updateCpuCounters();
-    
+    TezCounters counters = null;
+    try {
+      counters = getAllCounters();
+    } catch (LimitExceededException e) {
+      addDiagnostic("Counters limit exceeded: " + e.getMessage());
+      finalState = DAGState.FAILED;
+    }
+
     try {
       if (finalState == DAGState.SUCCEEDED) {
-        logJobHistoryFinishedEvent();
+        logJobHistoryFinishedEvent(counters);
       } else {
-        logJobHistoryUnsuccesfulEvent(finalState);
+        logJobHistoryUnsuccesfulEvent(finalState, counters);
       }
     } catch (IOException e) {
       LOG.warn("Failed to persist recovery event for DAG completion"
@@ -2226,11 +2248,9 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
-      //TODO Is this JH event required.
       LOG.info(job.getID() + " terminating due to internal error");
       // terminate all vertices
-      job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
-          VertexTerminationCause.INTERNAL_ERROR);
+      job.enactKill(DAGTerminationCause.INTERNAL_ERROR, 
VertexTerminationCause.INTERNAL_ERROR);
       job.setFinishTime();
       job.cancelCommits();
       job.finished(DAGState.ERROR);

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 957abcf..bd65b8d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -805,6 +805,20 @@ public class TaskAttemptImpl implements TaskAttempt,
                 this.attemptId.getTaskID().getVertexID().getDAGId(),
                 DAGEventType.INTERNAL_ERROR)
             );
+      } catch (RuntimeException e) {
+        LOG.error("Uncaught exception when handling event " + event.getType()
+            + " at current state " + oldState + " for "
+            + this.attemptId, e);
+        eventHandler.handle(new DAGEventDiagnosticsUpdate(
+            this.attemptId.getTaskID().getVertexID().getDAGId(),
+            "Uncaught exception when handling event " + event.getType()
+                + " on TaskAttempt " + this.attemptId
+                + " at state " + oldState + ", error=" + e.getMessage()));
+        eventHandler.handle(
+            new DAGEvent(
+                this.attemptId.getTaskID().getVertexID().getDAGId(),
+                DAGEventType.INTERNAL_ERROR)
+        );
       }
       if (oldState != getInternalState()) {
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 55dd518..0f76a63 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -776,9 +777,13 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state for "
-            + this.taskId, e);
+        LOG.error("Can't handle this event" + event.getType()
+            + " at current state " + oldState + " for task " + this.taskId, e);
         internalError(event.getType());
+      } catch (RuntimeException e) {
+        LOG.error("Uncaught exception when trying handle event " + 
event.getType()
+            + " at current state " + oldState + " for task " + this.taskId, e);
+        internalErrorUncaughtException(event.getType(), e);
       }
       if (oldState != getInternalState()) {
         if (LOG.isDebugEnabled()) {
@@ -802,6 +807,15 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
         DAGEventType.INTERNAL_ERROR));
   }
 
+  protected void internalErrorUncaughtException(TaskEventType type, Exception 
e) {
+    eventHandler.handle(new DAGEventDiagnosticsUpdate(
+        this.taskId.getVertexID().getDAGId(), "Uncaught exception when 
handling  event " + type +
+        " on Task " + this.taskId + ", error=" + e.getMessage()));
+    eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
+        DAGEventType.INTERNAL_ERROR));
+  }
+
+
   private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
       TaskAttemptStateInternal attemptState) {
     eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, 
attemptState));
@@ -1403,4 +1417,16 @@ public class TaskImpl implements Task, 
EventHandler<TaskEvent> {
       */
     }
   }
+
+  @Private
+  @VisibleForTesting
+  void setCounters(TezCounters counters) {
+    try {
+      writeLock.lock();
+      this.counters = counters;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 758c637..f3cfb58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -683,6 +684,8 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
   final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
+  private final AtomicBoolean internalErrorTriggered = new 
AtomicBoolean(false);
+
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
   private Map<Vertex, Edge> targetVertices;
@@ -1796,6 +1799,17 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         addDiagnostic(message);
         eventHandler.handle(new VertexEvent(this.vertexId,
             VertexEventType.V_INTERNAL_ERROR));
+      } catch (RuntimeException e) {
+        String message = "Uncaught Exception when handling event " + 
event.getType() +
+            " on vertex " + this.vertexName +
+            " with vertexId " + this.vertexId +
+            " at current state " + oldState;
+        LOG.error(message, e);
+        addDiagnostic(message);
+        if (!internalErrorTriggered.getAndSet(true)) {
+          eventHandler.handle(new VertexEvent(this.vertexId,
+              VertexEventType.V_INTERNAL_ERROR));
+        }
       }
 
       if (oldState != getInternalState()) {
@@ -1876,20 +1890,29 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   void logJobHistoryVertexFinishedEvent() throws IOException {
     if (recoveryData == null
         || !recoveryData.isVertexSucceeded()) {
-      logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, 
"");
+      logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "",
+          getAllCounters());
     }
   }
 
   void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
     if (recoveryData == null
         || !recoveryData.isVertexFinished()) {
+      TezCounters counters = null;
+      try {
+        counters = getAllCounters();
+      } catch (LimitExceededException e) {
+        // Ignore as failed vertex
+        addDiagnostic("Counters limit exceeded: " + e.getMessage());
+      }
+
       logJobHistoryVertexCompletedHelper(state, clock.getTime(),
-          StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+          StringUtils.join(getDiagnostics(), LINE_SEPARATOR), counters);
     }
   }
 
   private void logJobHistoryVertexCompletedHelper(VertexState finalState, long 
finishTime,
-                                                  String diagnostics) throws 
IOException {
+                                                  String diagnostics, 
TezCounters counters) throws IOException {
     Map<String, Integer> taskStats = new HashMap<String, Integer>();
     taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount);
     taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
@@ -1900,7 +1923,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, 
vertexName, numTasks, initTimeRequested,
         initedTime, startTimeRequested, startedTime, finishTime, finalState, 
diagnostics,
-        getAllCounters(), getVertexStats(), taskStats);
+        counters, getVertexStats(), taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
@@ -2041,7 +2064,7 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
   private static VertexState finishWithTerminationCause(VertexImpl vertex) {
-    Preconditions.checkArgument(vertex.getTerminationCause()!= null, 
"TerminationCause is not set");
+    Preconditions.checkArgument(vertex.getTerminationCause() != null, 
"TerminationCause is not set");
     String diagnosticMsg = "Vertex did not succeed due to " + 
vertex.getTerminationCause()
         + ", failedTasks:" + vertex.failedTaskCount
         + " killedTasks:" + vertex.killedTaskCount;
@@ -2114,9 +2137,19 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
         break;
       case SUCCEEDED:
         try {
-          logJobHistoryVertexFinishedEvent();
-          eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-              finalState));
+          try {
+            logJobHistoryVertexFinishedEvent();
+            eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+                finalState));
+          } catch (LimitExceededException e) {
+            LOG.error("Counter limits exceeded for vertex: " + 
getLogIdentifier(), e);
+            finalState = VertexState.FAILED;
+            addDiagnostic("Counters limit exceeded: " + e.getMessage());
+            
trySetTerminationCause(VertexTerminationCause.COUNTER_LIMITS_EXCEEDED);
+            logJobHistoryVertexFailedEvent(finalState);
+            eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+                finalState));
+          }
         } catch (IOException e) {
           LOG.error("Failed to send vertex finished event to recovery", e);
           finalState = VertexState.FAILED;
@@ -4354,4 +4387,15 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
           configurationDoneEvent.getRootInputSpecUpdates());
     }
   }
+
+  @Private
+  @VisibleForTesting
+  void setCounters(TezCounters counters) {
+    try {
+      writeLock.lock();
+      this.fullCounters = counters;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index f91bc42..c70fdae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -42,6 +42,7 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -511,12 +512,17 @@ public class AMWebController extends Controller {
     dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress()));
     dagInfo.put("status", dag.getState().toString());
 
-    if (counterNames != null && !counterNames.isEmpty()) {
-      TezCounters counters = dag.getCachedCounters();
-      Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        dagInfo.put("counters", counterMap);
+    try {
+      if (counterNames != null && !counterNames.isEmpty()) {
+        TezCounters counters = dag.getCachedCounters();
+        Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          dagInfo.put("counters", counterMap);
+        }
       }
+    } catch (LimitExceededException e) {
+      // Ignore
+      // TODO: add an error message instead for counter key
     }
     renderJSON(ImmutableMap.of(
         "dag", dagInfo
@@ -576,12 +582,17 @@ public class AMWebController extends Controller {
     vertexInfo.put("killedTaskAttempts",
         Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
 
-    if (counterNames != null && !counterNames.isEmpty()) {
-      TezCounters counters = vertex.getCachedCounters();
-      Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        vertexInfo.put("counters", counterMap);
+    try {
+      if (counterNames != null && !counterNames.isEmpty()) {
+        TezCounters counters = vertex.getCachedCounters();
+        Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          vertexInfo.put("counters", counterMap);
+        }
       }
+    } catch (LimitExceededException e) {
+      // Ignore
+      // TODO: add an error message instead for counter key
     }
 
     return vertexInfo;
@@ -733,11 +744,17 @@ public class AMWebController extends Controller {
       taskInfo.put("progress", Float.toString(t.getProgress()));
       taskInfo.put("status", t.getState().toString());
 
-      TezCounters counters = t.getCounters();
-      Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        taskInfo.put("counters", counterMap);
+      try {
+        TezCounters counters = t.getCounters();
+        Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          taskInfo.put("counters", counterMap);
+        }
+      } catch (LimitExceededException e) {
+        // Ignore
+        // TODO: add an error message instead for counter key
       }
+
       tasksInfo.add(taskInfo);
     }
 
@@ -825,10 +842,15 @@ public class AMWebController extends Controller {
       attemptInfo.put("progress", Float.toString(a.getProgress()));
       attemptInfo.put("status", a.getState().toString());
 
-      TezCounters counters = a.getCounters();
-      Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        attemptInfo.put("counters", counterMap);
+      try {
+        TezCounters counters = a.getCounters();
+        Map<String, Map<String, Long>> counterMap = 
constructCounterMapInfo(counters, counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          attemptInfo.put("counters", counterMap);
+        }
+      } catch (LimitExceededException e) {
+        // Ignore
+        // TODO: add an error message instead for counter key
       }
       attemptsInfo.add(attemptInfo);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ccff6b0..31b4f76 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -185,6 +187,14 @@ public class TestDAGImpl {
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
   private ClusterInfo clusterInfo = new 
ClusterInfo(Resource.newInstance(8192,10));
 
+  static {
+    Limits.reset();
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+    Limits.setConfiguration(conf);
+  }
+
   private DAGImpl chooseDAG(TezDAGID curDAGId) {
     if (curDAGId.equals(dagId)) {
       return dag;
@@ -2210,4 +2220,33 @@ public class TestDAGImpl {
       }
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testCounterLimits() {
+    initDAG(mrrDag);
+    dispatcher.await();
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<3; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      TezCounters ctrs = new TezCounters();
+      for (int j = 0; j < 50; ++j) {
+        ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+      }
+      ((VertexImpl) v).setCounters(ctrs);
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+    }
+
+    Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
+    Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
+    Assert.assertTrue("Diagnostics should contain counter limits error 
message",
+        StringUtils.join(mrrDag.getDiagnostics(), ",").contains("Counters 
limit exceeded"));
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 803edf7..3eed717 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -1021,7 +1021,10 @@ public class TestDAGRecovery {
         ta1t1v1Id, "v1", ta1LaunchTime, mock(ContainerId.class), 
         mock(NodeId.class), "", "", "");
     List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
-    taGeneratedEvents.add(new 
TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
+    EventMetaData sourceInfo = new 
EventMetaData(EventProducerConsumerType.INPUT, "vertex1",
+        "vertex3", ta1t1v1Id);
+    taGeneratedEvents.add(new 
TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])),
+        sourceInfo));
     TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(
         ta1t1v1Id, "v1", ta1LaunchTime, ta1FinishedTime, 
         TaskAttemptState.SUCCEEDED, null, "", null, 

http://git-wip-us.apache.org/repos/asf/tez/blob/9109645e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 11c2bf1..9453df8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -51,6 +51,8 @@ import com.google.protobuf.ByteString;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -250,6 +252,14 @@ public class TestVertexImpl {
   private StateChangeNotifierForTest updateTracker;
   private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
+  static {
+    Limits.reset();
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+    Limits.setConfiguration(conf);
+  }
+
   public static class CountingOutputCommitter extends OutputCommitter {
 
     public int initCounter = 0;
@@ -5861,7 +5871,11 @@ public class TestVertexImpl {
     //Send VertexManagerEvent
     long[] sizes = new long[]{(100 * 1000l * 1000l)};
     Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C");
-    TezEvent tezEvent = new TezEvent(vmEvent, null);
+
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(vC.getVertexId(), 1), 1);
+    EventMetaData sourceInfo = new 
EventMetaData(EventProducerConsumerType.INPUT, "C", "C", taId);
+    TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
     dispatcher.getEventHandler().handle(new 
VertexEventRouteEvent(vC.getVertexId(),
         Lists.newArrayList(tezEvent)));
     dispatcher.await();
@@ -6677,5 +6691,42 @@ public class TestVertexImpl {
     void setContext(InputInitializerContext context);
   }
 
+  @Test(timeout = 5000)
+  public void testCounterLimits() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex2");
+    startVertex(v);
+
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+    for (int i = 0; i < 2; ++i) {
+      TezCounters ctrs = new TezCounters();
+      for (int j = 0; j < 75; ++j) {
+        ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+      }
+      Task t = v.getTask(i);
+      ((TaskImpl) t).setCounters(ctrs);
+    }
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(1, v.getCompletedTasks());
+    Assert.assertTrue((0.5f) == v.getCompletedTaskProgress());
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(2, v.getCompletedTasks());
+
+    System.out.println(v.getDiagnostics());
+    Assert.assertTrue("Diagnostics should contain counter limits error 
message",
+        StringUtils.join(v.getDiagnostics(), ",").contains("Counters limit 
exceeded"));
+
+  }
 
 }

Reply via email to