Repository: tez Updated Branches: refs/heads/master 62bbbdb78 -> 020872ae1
TEZ-1238. Display more clear diagnostics info on client side on task failures. (Jeff Zhang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/020872ae Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/020872ae Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/020872ae Branch: refs/heads/master Commit: 020872ae1840feac7040dcd95abee2802d40cacd Parents: 62bbbdb Author: Hitesh Shah <[email protected]> Authored: Thu Jul 31 16:06:08 2014 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Jul 31 16:06:08 2014 -0700 ---------------------------------------------------------------------- .../apache/tez/runtime/task/TaskReporter.java | 2 + .../tez/runtime/task/TestTaskExecution.java | 54 ++++++++++++++++++-- 2 files changed, 51 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/020872ae/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index b4be2ae..f6b20dc 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -329,6 +329,8 @@ public class TaskReporter { task.getProgress()), updateEventMetadata); if (diagnostics == null) { diagnostics = StringUtils.stringifyException(t); + } else { + diagnostics = diagnostics + ":" + StringUtils.stringifyException(t); } TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), srcMeta == null ? updateEventMetadata : srcMeta); http://git-wip-us.apache.org/repos/asf/tez/blob/020872ae/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java index 3fd6e2e..b733682 100644 --- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java +++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java @@ -55,6 +55,7 @@ import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -178,6 +179,7 @@ public class TestTaskExecution { } } + // test tasked failed due to exception in Processor @Test public void testFailedTask() throws IOException, InterruptedException, TezException { @@ -206,7 +208,38 @@ public class TestTaskExecution { } assertNull(taskReporter.currentCallable); - umbilical.verifyTaskFailedEvent(); + umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezException: TezException"); + } finally { + executor.shutdownNow(); + } + } + + // Test task failed due to Processor class not found + @Test + public void testFailedTask2() throws IOException, InterruptedException, TezException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION); + // Setup the executor + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + try { + taskRunnerFuture.get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + LOG.info(cause.getClass().getName()); + assertTrue(cause instanceof TezException); + } + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent("Failure while running task:org.apache.tez.dag.api.TezUncheckedException: " + + "Unable to load class: NotExitedProcessor"); } finally { executor.shutdownNow(); } @@ -484,7 +517,7 @@ public class TestTaskExecution { private final Condition eventCondition = umbilicalLock.newCondition(); private boolean pendingEvent = false; private boolean eventEnacted = false; - + volatile int getTaskInvocations = 0; private boolean shouldThrowException = false; @@ -548,12 +581,17 @@ public class TestTaskExecution { } } - public void verifyTaskFailedEvent() { + public void verifyTaskFailedEvent(String diagnostics) { umbilicalLock.lock(); try { for (TezEvent event : requestEvents) { if (event.getEvent() instanceof TaskAttemptFailedEvent) { - return; + TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent(); + if(failedEvent.getDiagnostics().startsWith(diagnostics)){ + return ; + } else { + fail("No detailed diagnostics message in TaskAttemptFailedEvent"); + } } } fail("No TaskAttemptFailedEvents sent over umbilical"); @@ -638,6 +676,12 @@ public class TestTaskExecution { private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical, TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf) throws IOException { + return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(), + processorConf); + } + + private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical, + TaskReporter taskReporter, ListeningExecutorService executor, String processorClass, byte[] processorConf) throws IOException{ TezConfiguration tezConf = new TezConfiguration(defaultConf); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); Path testDir = new Path(workDir, UUID.randomUUID().toString()); @@ -647,7 +691,7 @@ public class TestTaskExecution { TezVertexID vertexId = TezVertexID.getInstance(dagId, 1); TezTaskID taskId = TezTaskID.getInstance(vertexId, 1); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1); - ProcessorDescriptor processorDescriptor = new ProcessorDescriptor(TestProcessor.class.getName()) + ProcessorDescriptor processorDescriptor = new ProcessorDescriptor(processorClass) .setUserPayload(processorConf); TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", processorDescriptor, new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
