Repository: tez
Updated Branches:
refs/heads/branch-0.7 0a1a29b15 -> ae3329f87
TEZ-2968. Counter limits exception causes AM to crash. (hitesh)
(cherry picked from commit 9109645e5b7b630601a016e62e39f20678c63dde)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ae3329f8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ae3329f8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ae3329f8
Branch: refs/heads/branch-0.7
Commit: ae3329f873a3293cae82610ec1aa8189152d614b
Parents: 0a1a29b
Author: Hitesh Shah <[email protected]>
Authored: Fri Dec 4 09:32:09 2015 -0800
Committer: Hitesh Shah <[email protected]>
Committed: Fri Dec 4 11:52:55 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/VertexTerminationCause.java | 5 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 59 ++++++++++++++-----
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 +++++
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 30 +++++++++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 60 +++++++++++++++++---
.../apache/tez/dag/app/web/AMWebController.java | 58 +++++++++++++------
.../tez/dag/app/dag/impl/TestDAGImpl.java | 39 +++++++++++++
.../tez/dag/app/dag/impl/TestVertexImpl.java | 47 +++++++++++++++
9 files changed, 269 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80df681..4cc48b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2968. Counter limits exception causes AM to crash.
TEZ-2960. Tez UI: Move hardcoded url namespace to the configuration file
TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in
counters page without any delay
TEZ-2946. Tez UI: At times RM return a huge error message making the yellow
error bar to fill the whole screen
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 28712ad..816f85a 100644
---
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -58,7 +58,10 @@ public enum VertexTerminationCause {
INTERNAL_ERROR(VertexState.ERROR),
/** error when writing recovery log */
- RECOVERY_ERROR(VertexState.FAILED);
+ RECOVERY_ERROR(VertexState.FAILED),
+
+ /** This vertex failed due to counter limits exceeded. */
+ COUNTER_LIMITS_EXCEEDED(VertexState.FAILED);
private VertexState finishedState;
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0553a8d..e6841ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
@@ -196,7 +197,9 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
private final Configuration dagConf;
private final DAGPlan jobPlan;
-
+
+ private final AtomicBoolean internalErrorTriggered = new
AtomicBoolean(false);
+
Map<String, LocalResource> localResources;
long startDAGCpuTime = 0;
@@ -1167,11 +1170,22 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
- addDiagnostic("Invalid event " + event.getType() +
- " on Job " + this.dagId);
+ String message = "Invalid event " + event.getType() + " on Dag " +
this.dagId
+ + " at currentState=" + oldState;
+ LOG.error("Can't handle " + message, e);
+ addDiagnostic(message);
eventHandler.handle(new DAGEvent(this.dagId,
DAGEventType.INTERNAL_ERROR));
+ } catch (RuntimeException e) {
+ String message = "Uncaught Exception when handling event " +
event.getType()
+ + " on Dag " + this.dagId + " at currentState=" + oldState;
+ LOG.error(message, e);
+ addDiagnostic(message);
+ if (!internalErrorTriggered.getAndSet(true)) {
+ // to prevent a recursive loop
+ eventHandler.handle(new DAGEvent(this.dagId,
+ DAGEventType.INTERNAL_ERROR));
+ }
}
//notify the eventhandler of state change
if (oldState != getInternalState()) {
@@ -1240,12 +1254,22 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
new DAGHistoryEvent(dagId, startEvt));
}
- void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
+ void logJobHistoryFinishedEvent(TezCounters counters) throws IOException {
Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
- DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
+ DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
+ finishTime, DAGState.SUCCEEDED, "", counters,
+ this.userName, this.dagName, taskStats,
this.appContext.getApplicationAttemptId());
+ this.appContext.getHistoryHandler().handleCriticalEvent(
+ new DAGHistoryEvent(dagId, finishEvt));
+ }
+
+ void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters)
throws IOException {
+ Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+
+ DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L,
clock.getTime(), state,
StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
- getAllCounters(), this.userName, this.dagName, taskStats,
+ counters, this.userName, this.dagName, taskStats,
this.appContext.getApplicationAttemptId());
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
@@ -1326,7 +1350,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
}
} else {
Preconditions.checkState(dag.getState() == DAGState.TERMINATING
- || dag.getState() == DAGState.COMMITTING,
+ || dag.getState() == DAGState.COMMITTING,
"DAG should be in COMMITTING/TERMINATING state, but in " +
dag.getState());
if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices !=
dag.numVertices) {
// pending commits are running or still some vertices are not completed
@@ -1366,12 +1390,19 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
// update cpu time counters before finishing the dag
updateCpuCounters();
-
+ TezCounters counters = null;
+ try {
+ counters = getAllCounters();
+ } catch (LimitExceededException e) {
+ addDiagnostic("Counters limit exceeded: " + e.getMessage());
+ finalState = DAGState.FAILED;
+ }
+
try {
if (finalState == DAGState.SUCCEEDED) {
- logJobHistoryFinishedEvent();
+ logJobHistoryFinishedEvent(counters);
} else {
- logJobHistoryUnsuccesfulEvent(finalState);
+ logJobHistoryUnsuccesfulEvent(finalState, counters);
}
} catch (IOException e) {
LOG.warn("Failed to persist recovery event for DAG completion"
@@ -1714,7 +1745,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
}
DAGState endState = DAGState.FAILED;
try {
- dag.logJobHistoryUnsuccesfulEvent(endState);
+ dag.logJobHistoryUnsuccesfulEvent(endState,
dag.getAllCounters());
} catch (IOException e) {
LOG.warn("Failed to persist recovery event for DAG completion"
+ ", dagId=" + dag.dagId
@@ -2299,11 +2330,9 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
public void transition(DAGImpl job, DAGEvent event) {
- //TODO Is this JH event required.
LOG.info(job.getID() + " terminating due to internal error");
// terminate all vertices
- job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
- VertexTerminationCause.INTERNAL_ERROR);
+ job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
VertexTerminationCause.INTERNAL_ERROR);
job.setFinishTime();
job.cancelCommits();
job.finished(DAGState.ERROR);
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5357063..b8f6459 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -788,6 +788,20 @@ public class TaskAttemptImpl implements TaskAttempt,
this.attemptId.getTaskID().getVertexID().getDAGId(),
DAGEventType.INTERNAL_ERROR)
);
+ } catch (RuntimeException e) {
+ LOG.error("Uncaught exception when handling event " + event.getType()
+ + " at current state " + oldState + " for "
+ + this.attemptId, e);
+ eventHandler.handle(new DAGEventDiagnosticsUpdate(
+ this.attemptId.getTaskID().getVertexID().getDAGId(),
+ "Uncaught exception when handling event " + event.getType()
+ + " on TaskAttempt " + this.attemptId
+ + " at state " + oldState + ", error=" + e.getMessage()));
+ eventHandler.handle(
+ new DAGEvent(
+ this.attemptId.getTaskID().getVertexID().getDAGId(),
+ DAGEventType.INTERNAL_ERROR)
+ );
}
if (oldState != getInternalState()) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 46215d0..cc4f046 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -900,9 +901,13 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state for "
- + this.taskId, e);
+ LOG.error("Can't handle this event" + event.getType()
+ + " at current state " + oldState + " for task " + this.taskId, e);
internalError(event.getType());
+ } catch (RuntimeException e) {
+ LOG.error("Uncaught exception when trying handle event " +
event.getType()
+ + " at current state " + oldState + " for task " + this.taskId, e);
+ internalErrorUncaughtException(event.getType(), e);
}
if (oldState != getInternalState()) {
if (LOG.isDebugEnabled()) {
@@ -926,6 +931,15 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
DAGEventType.INTERNAL_ERROR));
}
+ protected void internalErrorUncaughtException(TaskEventType type, Exception
e) {
+ eventHandler.handle(new DAGEventDiagnosticsUpdate(
+ this.taskId.getVertexID().getDAGId(), "Uncaught exception when
handling event " + type +
+ " on Task " + this.taskId + ", error=" + e.getMessage()));
+ eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
+ DAGEventType.INTERNAL_ERROR));
+ }
+
+
private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
TaskAttemptStateInternal attemptState) {
eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId,
attemptState));
@@ -1563,4 +1577,16 @@ public class TaskImpl implements Task,
EventHandler<TaskEvent> {
*/
}
}
+
+ @Private
+ @VisibleForTesting
+ void setCounters(TezCounters counters) {
+ try {
+ writeLock.lock();
+ this.counters = counters;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1419b06..cc1f489 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -716,6 +717,8 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
+ private final AtomicBoolean internalErrorTriggered = new
AtomicBoolean(false);
+
@VisibleForTesting
Map<Vertex, Edge> sourceVertices;
private Map<Vertex, Edge> targetVertices;
@@ -1868,6 +1871,17 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
addDiagnostic(message);
eventHandler.handle(new VertexEvent(this.vertexId,
VertexEventType.V_INTERNAL_ERROR));
+ } catch (RuntimeException e) {
+ String message = "Uncaught Exception when handling event " +
event.getType() +
+ " on vertex " + this.vertexName +
+ " with vertexId " + this.vertexId +
+ " at current state " + oldState;
+ LOG.error(message, e);
+ addDiagnostic(message);
+ if (!internalErrorTriggered.getAndSet(true)) {
+ eventHandler.handle(new VertexEvent(this.vertexId,
+ VertexEventType.V_INTERNAL_ERROR));
+ }
}
if (oldState != getInternalState()) {
@@ -1928,16 +1942,24 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
void logJobHistoryVertexFinishedEvent() throws IOException {
this.setFinishTime();
- logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "");
+ logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "",
+ getAllCounters());
}
void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
+ TezCounters counters = null;
+ try {
+ counters = getAllCounters();
+ } catch (LimitExceededException e) {
+ // Ignore as failed vertex
+ addDiagnostic("Counters limit exceeded: " + e.getMessage());
+ }
logJobHistoryVertexCompletedHelper(state, clock.getTime(),
- StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+ StringUtils.join(getDiagnostics(), LINE_SEPARATOR), counters);
}
private void logJobHistoryVertexCompletedHelper(VertexState finalState, long
finishTime,
- String diagnostics) throws
IOException {
+ String diagnostics,
TezCounters counters) throws IOException {
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount);
taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
@@ -1948,7 +1970,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, numTasks, initTimeRequested,
initedTime, startTimeRequested, startedTime, finishTime, finalState,
diagnostics,
- getAllCounters(), getVertexStats(), taskStats);
+ counters, getVertexStats(), taskStats);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
@@ -2083,7 +2105,7 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
private static VertexState finishWithTerminationCause(VertexImpl vertex) {
- Preconditions.checkArgument(vertex.getTerminationCause()!= null,
"TerminationCause is not set");
+ Preconditions.checkArgument(vertex.getTerminationCause() != null,
"TerminationCause is not set");
String diagnosticMsg = "Vertex did not succeed due to " +
vertex.getTerminationCause()
+ ", failedTasks:" + vertex.failedTaskCount
+ " killedTasks:" + vertex.killedTaskCount;
@@ -2156,9 +2178,19 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
break;
case SUCCEEDED:
try {
- logJobHistoryVertexFinishedEvent();
- eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
- finalState));
+ try {
+ logJobHistoryVertexFinishedEvent();
+ eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+ finalState));
+ } catch (LimitExceededException e) {
+ LOG.error("Counter limits exceeded for vertex: " +
getLogIdentifier(), e);
+ finalState = VertexState.FAILED;
+ addDiagnostic("Counters limit exceeded: " + e.getMessage());
+
trySetTerminationCause(VertexTerminationCause.COUNTER_LIMITS_EXCEEDED);
+ logJobHistoryVertexFailedEvent(finalState);
+ eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+ finalState));
+ }
} catch (IOException e) {
LOG.error("Failed to send vertex finished event to recovery", e);
finalState = VertexState.FAILED;
@@ -4921,4 +4953,16 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
}
}
+
+ @Private
+ @VisibleForTesting
+ void setCounters(TezCounters counters) {
+ try {
+ writeLock.lock();
+ this.fullCounters = counters;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index f91bc42..c70fdae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -42,6 +42,7 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -511,12 +512,17 @@ public class AMWebController extends Controller {
dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress()));
dagInfo.put("status", dag.getState().toString());
- if (counterNames != null && !counterNames.isEmpty()) {
- TezCounters counters = dag.getCachedCounters();
- Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
- if (counterMap != null && !counterMap.isEmpty()) {
- dagInfo.put("counters", counterMap);
+ try {
+ if (counterNames != null && !counterNames.isEmpty()) {
+ TezCounters counters = dag.getCachedCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ dagInfo.put("counters", counterMap);
+ }
}
+ } catch (LimitExceededException e) {
+ // Ignore
+ // TODO: add an error message instead for counter key
}
renderJSON(ImmutableMap.of(
"dag", dagInfo
@@ -576,12 +582,17 @@ public class AMWebController extends Controller {
vertexInfo.put("killedTaskAttempts",
Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
- if (counterNames != null && !counterNames.isEmpty()) {
- TezCounters counters = vertex.getCachedCounters();
- Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
- if (counterMap != null && !counterMap.isEmpty()) {
- vertexInfo.put("counters", counterMap);
+ try {
+ if (counterNames != null && !counterNames.isEmpty()) {
+ TezCounters counters = vertex.getCachedCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ vertexInfo.put("counters", counterMap);
+ }
}
+ } catch (LimitExceededException e) {
+ // Ignore
+ // TODO: add an error message instead for counter key
}
return vertexInfo;
@@ -733,11 +744,17 @@ public class AMWebController extends Controller {
taskInfo.put("progress", Float.toString(t.getProgress()));
taskInfo.put("status", t.getState().toString());
- TezCounters counters = t.getCounters();
- Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
- if (counterMap != null && !counterMap.isEmpty()) {
- taskInfo.put("counters", counterMap);
+ try {
+ TezCounters counters = t.getCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ taskInfo.put("counters", counterMap);
+ }
+ } catch (LimitExceededException e) {
+ // Ignore
+ // TODO: add an error message instead for counter key
}
+
tasksInfo.add(taskInfo);
}
@@ -825,10 +842,15 @@ public class AMWebController extends Controller {
attemptInfo.put("progress", Float.toString(a.getProgress()));
attemptInfo.put("status", a.getState().toString());
- TezCounters counters = a.getCounters();
- Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
- if (counterMap != null && !counterMap.isEmpty()) {
- attemptInfo.put("counters", counterMap);
+ try {
+ TezCounters counters = a.getCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ attemptInfo.put("counters", counterMap);
+ }
+ } catch (LimitExceededException e) {
+ // Ignore
+ // TODO: add an error message instead for counter key
}
attemptsInfo.add(attemptInfo);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 49f534b..c7dec36 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -185,6 +187,14 @@ public class TestDAGImpl {
private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
private ClusterInfo clusterInfo = new
ClusterInfo(Resource.newInstance(8192,10));
+ static {
+ Limits.reset();
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+ conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+ Limits.setConfiguration(conf);
+ }
+
private DAGImpl chooseDAG(TezDAGID curDAGId) {
if (curDAGId.equals(dagId)) {
return dag;
@@ -2209,4 +2219,33 @@ public class TestDAGImpl {
}
}
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testCounterLimits() {
+ initDAG(mrrDag);
+ dispatcher.await();
+ startDAG(mrrDag);
+ dispatcher.await();
+ for (int i=0; i<3; ++i) {
+ Vertex v = mrrDag.getVertex("vertex"+(i+1));
+ dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+ TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+ TezCounters ctrs = new TezCounters();
+ for (int j = 0; j < 50; ++j) {
+ ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+ }
+ ((VertexImpl) v).setCounters(ctrs);
+ dispatcher.await();
+ Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+ Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+ }
+
+ Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
+ Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
+ Assert.assertTrue("Diagnostics should contain counter limits error
message",
+ StringUtils.join(mrrDag.getDiagnostics(), ",").contains("Counters
limit exceeded"));
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e6387b3..e49f97d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -49,6 +49,8 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -239,6 +241,14 @@ public class TestVertexImpl {
private StateChangeNotifierForTest updateTracker;
private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
+ static {
+ Limits.reset();
+ Configuration conf = new Configuration(false);
+ conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+ conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+ Limits.setConfiguration(conf);
+ }
+
public static class CountingOutputCommitter extends OutputCommitter {
public int initCounter = 0;
@@ -6446,5 +6456,42 @@ public class TestVertexImpl {
void setContext(InputInitializerContext context);
}
+ @Test(timeout = 5000)
+ public void testCounterLimits() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex2");
+ startVertex(v);
+
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+ for (int i = 0; i < 2; ++i) {
+ TezCounters ctrs = new TezCounters();
+ for (int j = 0; j < 75; ++j) {
+ ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+ }
+ Task t = v.getTask(i);
+ ((TaskImpl) t).setCounters(ctrs);
+ }
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(1, v.getCompletedTasks());
+ Assert.assertTrue((0.5f) == v.getCompletedTaskProgress());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(2, v.getCompletedTasks());
+
+ System.out.println(v.getDiagnostics());
+ Assert.assertTrue("Diagnostics should contain counter limits error
message",
+ StringUtils.join(v.getDiagnostics(), ",").contains("Counters limit
exceeded"));
+
+ }
}