Repository: tez Updated Branches: refs/heads/master 5ce07f89f -> 27a13fc97
http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 219cc2f..1d619a3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -39,6 +39,7 @@ import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; @@ -84,9 +85,10 @@ public class TezTaskRunner2 { // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context. private volatile Throwable firstException; private volatile EventMetaData exceptionSourceInfo; + private volatile TaskFailureType firstTaskFailureType; private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false); - private boolean oobSignalErrorInProgress = false; + private volatile boolean oobSignalErrorInProgress = false; private final Lock oobSignalLock = new ReentrantLock(); private final Condition oobSignalCondition = oobSignalLock.newCondition(); @@ -147,7 +149,7 @@ public class TezTaskRunner2 { } if (future == null) { - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); } TaskRunner2CallableResult executionResult = null; @@ -161,7 +163,7 @@ public class TezTaskRunner2 { synchronized (this) { if (isRunningState()) { trySettingEndReason(EndReason.TASK_ERROR); - registerFirstException(e, null); + registerFirstException(TaskFailureType.NON_FATAL, e, null); LOG.warn("Exception from RunnerCallable", e); } } @@ -172,30 +174,32 @@ public class TezTaskRunner2 { case SUCCESS: try { taskReporter.taskSucceeded(task.getTaskAttemptID()); - return logAndReturnEndResult(EndReason.SUCCESS, null, stopContainerRequested.get()); + return logAndReturnEndResult(EndReason.SUCCESS, null, null, stopContainerRequested.get()); } catch (IOException e) { // Comm failure. Task can't do much. - handleFinalStatusUpdateFailure(e, true); - return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get()); + handleFinalStatusUpdateFailure(e, "success"); + return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get()); } catch (TezException e) { // Failure from AM. Task can't do much. - handleFinalStatusUpdateFailure(e, true); - return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, e, stopContainerRequested.get()); + handleFinalStatusUpdateFailure(e, "success"); + return logAndReturnEndResult(EndReason.COMMUNICATION_FAILURE, firstTaskFailureType, e, stopContainerRequested.get()); } case CONTAINER_STOP_REQUESTED: // Don't need to send any more communication updates to the AM. - return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get()); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get()); case KILL_REQUESTED: - // Kill is currently not reported to the AM via the TaskRunner. Fix this when the umbilical - // supports an indication of kill, if required. - return logAndReturnEndResult(firstEndReason, null, stopContainerRequested.get()); + // This was an external kill called directly on the task runner + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, null, stopContainerRequested.get()); + case TASK_KILL_REQUEST: + // Task reported a self kill + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); case COMMUNICATION_FAILURE: // Already seen a communication failure. There's no point trying to report another one. - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); case TASK_ERROR: // Don't report an error again if it was reported via signalFatalError if (errorReporterToAm.get()) { - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); } else { String message; if (firstException instanceof FSError) { @@ -203,24 +207,24 @@ public class TezTaskRunner2 { } else if (firstException instanceof Error) { message = "Encountered an Error while executing task: " + task.getTaskAttemptID(); } else { - message = "Failure while running task: " + task.getTaskAttemptID(); + message = "Error while running task ( failure ) : " + task.getTaskAttemptID(); } try { - taskReporter.taskFailed(task.getTaskAttemptID(), firstException, message, exceptionSourceInfo); - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + taskReporter.taskFailed(task.getTaskAttemptID(), firstTaskFailureType, firstException, message, exceptionSourceInfo); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); } catch (IOException e) { // Comm failure. Task can't do much. - handleFinalStatusUpdateFailure(e, true); - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + handleFinalStatusUpdateFailure(e, "failure"); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); } catch (TezException e) { // Failure from AM. Task can't do much. - handleFinalStatusUpdateFailure(e, true); - return logAndReturnEndResult(firstEndReason, firstException, stopContainerRequested.get()); + handleFinalStatusUpdateFailure(e, "failure"); + return logAndReturnEndResult(firstEndReason, firstTaskFailureType, firstException, stopContainerRequested.get()); } } default: LOG.error("Unexpected EndReason. File a bug"); - return logAndReturnEndResult(EndReason.TASK_ERROR, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get()); + return logAndReturnEndResult(EndReason.TASK_ERROR, firstTaskFailureType, new RuntimeException("Unexpected EndReason"), stopContainerRequested.get()); } } finally { @@ -257,7 +261,7 @@ public class TezTaskRunner2 { if (isRunningState()) { if (executionResult.error != null) { trySettingEndReason(EndReason.TASK_ERROR); - registerFirstException(executionResult.error, null); + registerFirstException(TaskFailureType.NON_FATAL, executionResult.error, null); } else { trySettingEndReason(EndReason.SUCCESS); taskComplete.set(true); @@ -295,8 +299,19 @@ public class TezTaskRunner2 { } private void killTaskInternal() { + abortTaskInternal(); + interruptTaskInternal(); + } + + private void abortTaskInternal() { if (taskRunnerCallable != null) { taskKillStartTime = System.currentTimeMillis(); + taskRunnerCallable.abortTask(); + } + } + + private void interruptTaskInternal() { + if (taskRunnerCallable != null) { taskRunnerCallable.interruptTask(); } } @@ -320,57 +335,20 @@ public class TezTaskRunner2 { } @Override - public void signalFatalError(TezTaskAttemptID taskAttemptID, Throwable t, String message, - EventMetaData sourceInfo) { + public void signalFailure(TezTaskAttemptID taskAttemptID, TaskFailureType taskFailureType, Throwable t, String message, + EventMetaData sourceInfo) { // Fatal error reported by the task. - boolean isFirstError = false; - synchronized (TezTaskRunner2.this) { - if (isRunningState()) { - if (trySettingEndReason(EndReason.TASK_ERROR)) { - if (t == null) { - t = new RuntimeException( - message == null ? "FatalError: No user message or exception specified" : message); - } - registerFirstException(t, sourceInfo); - LOG.info("Received notification of a fatal error which will cause the task to die", t); - isFirstError = true; - errorReporterToAm.set(true); - oobSignalErrorInProgress = true; - } else { - logErrorIgnored("signalFatalError", message); - } - } else { - logErrorIgnored("signalFatalError", message); - } - } + signalTerminationInternal(taskAttemptID, EndReason.TASK_ERROR, taskFailureType, t, message, sourceInfo, false); + } + + @Override + public void signalKillSelf(TezTaskAttemptID taskAttemptID, Throwable t, String message, + EventMetaData sourceInfo) { + signalTerminationInternal(taskAttemptID, EndReason.TASK_KILL_REQUEST, null, t, message, sourceInfo, true); - // Informing the TaskReporter here because the running task may not be interruptable. - // Has to be outside the lock. - if (isFirstError) { - logAborting("signalFatalError"); - killTaskInternal(); - try { - taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message), sourceInfo); - } catch (IOException e) { - // Comm failure. Task can't do much. The main exception is already registered. - handleFinalStatusUpdateFailure(e, true); - } catch (TezException e) { - // Failure from AM. Task can't do much. The main exception is already registered. - handleFinalStatusUpdateFailure(e, true); - } finally { - oobSignalLock.lock(); - try { - // This message is being sent outside of the main thread, which may end up completing before - // this thread runs. Make sure the main run thread does not end till this completes. - oobSignalErrorInProgress = false; - oobSignalCondition.signal(); - } finally { - oobSignalLock.unlock(); - } - } - } } + @Override public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { // Task checking whether it can commit. @@ -397,7 +375,7 @@ public class TezTaskRunner2 { if (isRunningState()) { LOG.info("TaskReporter reporter error which will cause the task to fail", t); if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) { - registerFirstException(t, null); + registerFirstException(TaskFailureType.NON_FATAL, t, null); isFirstError = true; } else { logErrorIgnored("umbilicalFatalError", null); @@ -436,6 +414,72 @@ public class TezTaskRunner2 { } } + + private void signalTerminationInternal(TezTaskAttemptID taskAttemptID, EndReason endReason, + TaskFailureType taskFailureType, Throwable t, String message, + EventMetaData sourceInfo, boolean isKill) { + boolean isFirstError = false; + String typeString = isKill ? " kill " : " failure "; + synchronized (TezTaskRunner2.this) { + if (isRunningState()) { + if (trySettingEndReason(endReason)) { + if (t == null) { + String errMessage = message; + if (errMessage == null) { + errMessage = typeString + " : No user message or exception specified"; + } + t = new RuntimeException(errMessage); + } + registerFirstException(taskFailureType, t, sourceInfo); + LOG.info("Received notification of a " + typeString + + " which will cause the task to die", t); + isFirstError = true; + errorReporterToAm.set(true); + oobSignalErrorInProgress = true; + } else { + logErrorIgnored(typeString, message); + } + } else { + logErrorIgnored(typeString, message); + } + } + + // Informing the TaskReporter here because the running task may not be interruptable. + // Has to be outside the lock. + if (isFirstError) { + logAborting(typeString); + abortTaskInternal(); + try { + if (isKill) { + taskReporter + .taskKilled(taskAttemptID, t, getTaskDiagnosticsString(t, message, typeString), sourceInfo); + } else { + taskReporter.taskFailed(taskAttemptID, taskFailureType, t, + getTaskDiagnosticsString(t, message, typeString), sourceInfo); + } + } catch (IOException e) { + // Comm failure. Task can't do much. The main exception is already registered. + handleFinalStatusUpdateFailure(e, typeString); + } catch (TezException e) { + // Failure from AM. Task can't do much. The main exception is already registered. + handleFinalStatusUpdateFailure(e, typeString); + } catch (Exception e) { + handleFinalStatusUpdateFailure(e, typeString); + } finally { + interruptTaskInternal(); + oobSignalLock.lock(); + try { + // This message is being sent outside of the main thread, which may end up completing before + // this thread runs. Make sure the main run thread does not end till this completes. + oobSignalErrorInProgress = false; + oobSignalCondition.signal(); + } finally { + oobSignalLock.unlock(); + } + } + } + } + private synchronized boolean trySettingEndReason(EndReason endReason) { if (isRunningState()) { firstEndReason = endReason; @@ -445,39 +489,43 @@ public class TezTaskRunner2 { } - private void registerFirstException(Throwable t, EventMetaData sourceInfo) { + private void registerFirstException(TaskFailureType taskFailureType, Throwable t, EventMetaData sourceInfo) { Preconditions.checkState(isRunningState()); errorSeen.set(true); firstException = t; + this.firstTaskFailureType = taskFailureType; this.exceptionSourceInfo = sourceInfo; } - private String getTaskDiagnosticsString(Throwable t, String message) { + private String getTaskDiagnosticsString(Throwable t, String message, String typeString) { String diagnostics; if (t != null && message != null) { - diagnostics = "Failure while running task: " + ExceptionUtils.getStackTrace(t) + ", errorMessage=" + diagnostics = "Error while running task (" + typeString + ") : " + ExceptionUtils.getStackTrace(t) + ", errorMessage=" + message; } else if (t == null && message == null) { diagnostics = "Unknown error"; } else { - diagnostics = t != null ? "Failure while running task: " + ExceptionUtils.getStackTrace(t) + diagnostics = t != null ? "Error while running task (" + typeString + ") : " + ExceptionUtils.getStackTrace(t) : " errorMessage=" + message; } return diagnostics; } - private TaskRunner2Result logAndReturnEndResult(EndReason endReason, Throwable firstError, + private TaskRunner2Result logAndReturnEndResult(EndReason endReason, + TaskFailureType taskFailureType, + Throwable firstError, boolean stopContainerRequested) { - TaskRunner2Result result = new TaskRunner2Result(endReason, firstError, stopContainerRequested); + TaskRunner2Result result = + new TaskRunner2Result(endReason, taskFailureType, firstError, stopContainerRequested); LOG.info("TaskRunnerResult for {} : {} ", task.getTaskAttemptID(), result); return result; } - private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted) { + private void handleFinalStatusUpdateFailure(Throwable t, String stateString) { // TODO Ideally differentiate between FAILED/KILLED LOG.warn("Failure while reporting state= {} to AM", - (successReportAttempted ? "success" : "failure/killed"), t); + stateString, t); } private void logErrorIgnored(String ignoredEndReason, String errorMessage) { http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/proto/Events.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/proto/Events.proto b/tez-runtime-internals/src/main/proto/Events.proto deleted file mode 100644 index 558a2b3..0000000 --- a/tez-runtime-internals/src/main/proto/Events.proto +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tez.runtime.internals.api.events"; -option java_outer_classname = "SystemEventProtos"; -option java_generate_equals_and_hash = true; - -message TaskAttemptFailedEventProto { - optional string diagnostics = 1; -} - -message TaskAttemptCompletedEventProto { -} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/main/proto/RuntimeEvents.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/proto/RuntimeEvents.proto b/tez-runtime-internals/src/main/proto/RuntimeEvents.proto new file mode 100644 index 0000000..660988c --- /dev/null +++ b/tez-runtime-internals/src/main/proto/RuntimeEvents.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tez.runtime.internals.api.events"; +option java_outer_classname = "SystemEventProtos"; +option java_generate_equals_and_hash = true; + +enum TaskFailureTypeProto { + FT_NON_FATAL = 0; + FT_FATAL = 1; +} + +message TaskAttemptFailedEventProto { + optional string diagnostics = 1; + optional TaskFailureTypeProto task_failure_type = 2; +} + +message TaskAttemptKilledEventProto { + optional string diagnostics = 1; +} + +message TaskAttemptCompletedEventProto { +} http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java index 7502c41..626d178 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java @@ -14,6 +14,7 @@ package org.apache.tez.runtime.task; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.io.IOException; @@ -35,11 +36,13 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; @@ -53,12 +56,29 @@ public class TaskExecutionTestHelpers { // Uses static fields for signaling. Ensure only used by one test at a time. public static class TestProcessor extends AbstractLogicalIOProcessor { - public static final byte[] CONF_EMPTY = new byte[] { 0 }; - public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 }; - public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 }; + private static final int EMPTY = 0; + private static final int THROW_IO_EXCEPTION = 1; + private static final int THROW_TEZ_EXCEPTION = 2; + private static final int SIGNAL_DEPRECATEDFATAL_AND_THROW = 3; + private static final int SIGNAL_DEPRECATEDFATAL_AND_LOOP = 4; + private static final int SIGNAL_DEPRECATEDFATAL_AND_COMPLETE = 5; + private static final int SIGNAL_FATAL_AND_THROW = 6; + private static final int SIGNAL_NON_FATAL_AND_THROW = 7; + private static final int SELF_KILL_AND_COMPLETE = 8; + + public static final byte[] CONF_EMPTY = new byte[]{EMPTY}; + public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[]{THROW_IO_EXCEPTION}; + public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[]{THROW_TEZ_EXCEPTION}; + public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_THROW = + new byte[]{SIGNAL_DEPRECATEDFATAL_AND_THROW}; + public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_LOOP = + new byte[]{SIGNAL_DEPRECATEDFATAL_AND_LOOP}; + public static final byte[] CONF_SIGNAL_DEPRECATEDFATAL_AND_COMPLETE = + new byte[]{SIGNAL_DEPRECATEDFATAL_AND_COMPLETE}; + public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[]{SIGNAL_FATAL_AND_THROW}; + public static final byte[] CONF_SIGNAL_NON_FATAL_AND_THROW = + new byte[]{SIGNAL_NON_FATAL_AND_THROW}; + public static final byte[] CONF_SELF_KILL_AND_COMPLETE = new byte[]{SELF_KILL_AND_COMPLETE}; private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class); @@ -77,9 +97,12 @@ public class TaskExecutionTestHelpers { private boolean throwIOException = false; private boolean throwTezException = false; + private boolean signalDeprecatedFatalAndThrow = false; + private boolean signalDeprecatedFatalAndLoop = false; + private boolean signalDeprecatedFatalAndComplete = false; private boolean signalFatalAndThrow = false; - private boolean signalFatalAndLoop = false; - private boolean signalFatalAndComplete = false; + private boolean signalNonFatalAndThrow = false; + private boolean selfKillAndComplete = false; public TestProcessor(ProcessorContext context) { super(context); @@ -102,11 +125,14 @@ public class TaskExecutionTestHelpers { private void parseConf(byte[] bytes) { byte b = bytes[0]; - throwIOException = (b & 1) > 0; - throwTezException = (b & 2) > 0; - signalFatalAndThrow = (b & 4) > 0; - signalFatalAndLoop = (b & 8) > 0; - signalFatalAndComplete = (b & 16) > 0; + throwIOException = (b == THROW_IO_EXCEPTION); + throwTezException = (b == THROW_TEZ_EXCEPTION); + signalDeprecatedFatalAndThrow = (b == SIGNAL_DEPRECATEDFATAL_AND_THROW); + signalDeprecatedFatalAndLoop = (b == SIGNAL_DEPRECATEDFATAL_AND_LOOP); + signalDeprecatedFatalAndComplete = (b == SIGNAL_DEPRECATEDFATAL_AND_COMPLETE); + signalFatalAndThrow = (b == SIGNAL_FATAL_AND_THROW); + signalNonFatalAndThrow = (b == SIGNAL_NON_FATAL_AND_THROW); + selfKillAndComplete = (b == SELF_KILL_AND_COMPLETE); } public static void reset() { @@ -191,6 +217,7 @@ public class TaskExecutionTestHelpers { wasAborted = true; } + @SuppressWarnings("deprecation") @Override public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { @@ -212,23 +239,38 @@ public class TaskExecutionTestHelpers { throw createProcessorIOException(); } else if (throwTezException) { throw createProcessorTezException(); - } else if (signalFatalAndThrow) { - IOException io = new IOException("FATALERROR"); - getContext().fatalError(io, "FATALERROR"); + } else if (signalDeprecatedFatalAndThrow) { + IOException io = new IOException(IOException.class.getSimpleName()); + + getContext().fatalError(io, IOException.class.getSimpleName()); throw io; - } else if (signalFatalAndComplete) { - IOException io = new IOException("FATALERROR"); - getContext().fatalError(io, "FATALERROR"); + } else if (signalDeprecatedFatalAndComplete) { + IOException io = new IOException(IOException.class.getSimpleName()); + getContext().fatalError(io, IOException.class.getSimpleName()); return; - } else if (signalFatalAndLoop) { + } else if (signalDeprecatedFatalAndLoop) { IOException io = createProcessorIOException(); - getContext().fatalError(io, "FATALERROR"); + getContext().fatalError(io, IOException.class.getSimpleName()); LOG.info("looping"); looping = true; loopCondition.signal(); LOG.info("Waiting for Processor signal again"); processorCondition.await(); LOG.info("Received second processor signal"); + } else if (signalFatalAndThrow) { + IOException io = new IOException(IOException.class.getSimpleName()); + getContext().reportFailure(TaskFailureType.FATAL, io, IOException.class.getSimpleName()); + LOG.info("throwing"); + throw io; + } else if (signalNonFatalAndThrow) { + IOException io = new IOException(IOException.class.getSimpleName()); + getContext().reportFailure(TaskFailureType.NON_FATAL, io, IOException.class.getSimpleName()); + LOG.info("throwing"); + throw io; + } else if (selfKillAndComplete) { + LOG.info("Reporting kill self"); + getContext().killSelf(new IOException(IOException.class.getSimpleName()), "SELFKILL"); + LOG.info("Returning"); } } catch (InterruptedException e) { receivedInterrupt = true; @@ -344,6 +386,10 @@ public class TaskExecutionTestHelpers { } public void verifyTaskFailedEvent(String diagStart, String diagContains) { + verifyTaskFailedEvent(diagStart, diagContains, TaskFailureType.NON_FATAL); + } + + public void verifyTaskFailedEvent(String diagStart, String diagContains, TaskFailureType taskFailureType) { umbilicalLock.lock(); try { for (TezEvent event : requestEvents) { @@ -352,6 +398,7 @@ public class TaskExecutionTestHelpers { if (failedEvent.getDiagnostics().startsWith(diagStart)) { if (diagContains != null) { if (failedEvent.getDiagnostics().contains(diagContains)) { + assertEquals(taskFailureType, failedEvent.getTaskFailureType()); return; } else { fail("Diagnostic message does not contain expected message. Found [" + @@ -370,6 +417,35 @@ public class TaskExecutionTestHelpers { } } + public void verifyTaskKilledEvent(String diagStart, String diagContains) { + umbilicalLock.lock(); + try { + for (TezEvent event : requestEvents) { + if (event.getEvent() instanceof TaskAttemptKilledEvent) { + TaskAttemptKilledEvent killedEvent = + (TaskAttemptKilledEvent) event.getEvent(); + if (killedEvent.getDiagnostics().startsWith(diagStart)) { + if (diagContains != null) { + if (killedEvent.getDiagnostics().contains(diagContains)) { + return; + } else { + fail("Diagnostic message does not contain expected message. Found [" + + killedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]"); + } + } + } else { + fail("Diagnostic message does not start with expected message. Found [" + + killedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]"); + } + } + } + fail("No TaskAttemptKilledEvents sent over umbilical"); + } finally { + umbilicalLock.unlock(); + } + + } + public void verifyTaskSuccessEvent() { umbilicalLock.lock(); try { http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 18660f6..c3b9abd 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -67,6 +67,7 @@ import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.InputSpec; @@ -89,6 +90,9 @@ public class TestTaskExecution2 { private static final FileSystem localFs; private static final Path workDir; + private static final String FAILURE_START_STRING = "Error while running task ( failure )"; + private static final String KILL_START_STRING = "Error while running task ( kill )"; + private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1); static { @@ -135,7 +139,7 @@ public class TestTaskExecution2 { // Signal the processor to go through TestProcessor.signal(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null); assertNull(taskReporter.currentCallable); umbilical.verifyTaskSuccessEvent(); assertFalse(TestProcessor.wasAborted()); @@ -166,7 +170,7 @@ public class TestTaskExecution2 { // Signal the processor to go through TestProcessor.signal(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null); assertNull(taskReporter.currentCallable); umbilical.verifyTaskSuccessEvent(); assertFalse(TestProcessor.wasAborted()); @@ -182,7 +186,7 @@ public class TestTaskExecution2 { // Signal the processor to go through TestProcessor.signal(); result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null); assertNull(taskReporter.currentCallable); umbilical.verifyTaskSuccessEvent(); assertFalse(TestProcessor.wasAborted()); @@ -217,11 +221,11 @@ public class TestTaskExecution2 { TestProcessor.awaitStart(); TestProcessor.signal(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false, TaskFailureType.NON_FATAL); assertNull(taskReporter.currentCallable); umbilical.verifyTaskFailedEvent( - "Failure while running task", + FAILURE_START_STRING, TezException.class.getName() + ": " + TezException.class.getSimpleName()); // Failure detected as a result of fall off from the run method. abort isn't required. assertFalse(TestProcessor.wasAborted()); @@ -254,10 +258,10 @@ public class TestTaskExecution2 { TaskRunner2Result result = taskRunnerFuture.get(); verifyTaskRunnerResult(result, EndReason.TASK_ERROR, - new TezReflectionException("TezReflectionException"), false); + new TezReflectionException("TezReflectionException"), false, TaskFailureType.NON_FATAL); assertNull(taskReporter.currentCallable); - umbilical.verifyTaskFailedEvent("Failure while running task", + umbilical.verifyTaskFailedEvent(FAILURE_START_STRING, ":org.apache.tez.dag.api.TezReflectionException: " + "Unable to load class: NotExitedProcessor"); // Failure detected as a result of fall off from the run method. abort isn't required. @@ -291,12 +295,12 @@ public class TestTaskExecution2 { TestProcessor.awaitStart(); TestProcessor.signal(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL); assertNull(taskReporter.currentCallable); umbilical.verifyTaskFailedEvent( - "Failure while running task", + FAILURE_START_STRING, IOException.class.getName() + ": " + IOException.class.getSimpleName()); // Failure detected as a result of fall off from the run method. abort isn't required. assertFalse(TestProcessor.wasAborted()); @@ -333,7 +337,7 @@ public class TestTaskExecution2 { TaskRunner2Result result = taskRunnerFuture.get(); verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE, new IOException("IOException"), - TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false); + TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false, TaskFailureType.NON_FATAL); TestProcessor.awaitCompletion(); assertTrue(TestProcessor.wasInterrupted()); @@ -371,7 +375,7 @@ public class TestTaskExecution2 { // Not signaling an actual start to verify task interruption TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true); + verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true, null); TestProcessor.awaitCompletion(); @@ -388,7 +392,7 @@ public class TestTaskExecution2 { } @Test(timeout = 5000) - public void testSignalFatalErrorAndLoop() throws IOException, InterruptedException, TezException, + public void testSignalDeprecatedFatalErrorAndLoop() throws IOException, InterruptedException, TezException, ExecutionException { ListeningExecutorService executor = null; @@ -401,7 +405,7 @@ public class TestTaskExecution2 { TaskReporter taskReporter = createTaskReporter(appId, umbilical); TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, - TestProcessor.CONF_SIGNAL_FATAL_AND_LOOP); + TestProcessor.CONF_SIGNAL_DEPRECATEDFATAL_AND_LOOP); // Setup the executor Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); @@ -413,13 +417,13 @@ public class TestTaskExecution2 { // The fatal error should have caused an interrupt. TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL); TestProcessor.awaitCompletion(); assertTrue(TestProcessor.wasInterrupted()); assertNull(taskReporter.currentCallable); umbilical.verifyTaskFailedEvent( - "Failure while running task", + FAILURE_START_STRING, IOException.class.getName() + ": " + IOException.class.getSimpleName()); // Signal fatal error should cause the processor to fail. assertTrue(TestProcessor.wasAborted()); @@ -429,6 +433,115 @@ public class TestTaskExecution2 { } @Test(timeout = 5000) + public void testSignalFatalAndThrow() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_SIGNAL_FATAL_AND_THROW); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.FATAL); + + TestProcessor.awaitCompletion(); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent( + FAILURE_START_STRING, + IOException.class.getName() + ": " + IOException.class.getSimpleName(), TaskFailureType.FATAL); + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testSignalNonFatalAndThrow() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_SIGNAL_NON_FATAL_AND_THROW); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL); + + TestProcessor.awaitCompletion(); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent( + FAILURE_START_STRING, + IOException.class.getName() + ": " + IOException.class.getSimpleName(), TaskFailureType.NON_FATAL); + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testTaskSelfKill() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_SELF_KILL_AND_COMPLETE); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_KILL_REQUEST, createProcessorIOException(), false, + null); + + TestProcessor.awaitCompletion(); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskKilledEvent( + KILL_START_STRING, + IOException.class.getName() + ": " + IOException.class.getSimpleName()); + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) public void testTaskKilled() throws IOException, InterruptedException, TezException, ExecutionException { @@ -452,7 +565,7 @@ public class TestTaskExecution2 { taskRunner.killTask(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false); + verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false, null); TestProcessor.awaitCompletion(); assertTrue(TestProcessor.wasInterrupted()); @@ -492,7 +605,7 @@ public class TestTaskExecution2 { taskRunner.killTask(); TaskRunner2Result result = taskRunnerFuture.get(); - verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null); assertFalse(TestProcessor.wasInterrupted()); assertNull(taskReporter.currentCallable); @@ -534,15 +647,17 @@ public class TestTaskExecution2 { private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result, EndReason expectedEndReason, Throwable expectedThrowable, - boolean wasShutdownRequested) { + boolean wasShutdownRequested, + TaskFailureType taskFailureType) { verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, expectedThrowable, null, - wasShutdownRequested); + wasShutdownRequested, taskFailureType); } private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result, EndReason expectedEndReason, Throwable expectedThrowable, String expectedExceptionMessage, - boolean wasShutdownRequested) { + boolean wasShutdownRequested, + TaskFailureType taskFailureType) { assertEquals(expectedEndReason, taskRunner2Result.getEndReason()); if (expectedThrowable == null) { assertNull(taskRunner2Result.getError()); @@ -557,6 +672,7 @@ public class TestTaskExecution2 { } } + assertEquals(taskFailureType, taskRunner2Result.getTaskFailureType()); assertEquals(wasShutdownRequested, taskRunner2Result.isContainerShutdownRequested()); } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 7f2054b..b82098e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -46,6 +46,7 @@ import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -748,7 +749,7 @@ public class ShuffleManager implements FetcherCallback { private void reportFatalError(Throwable exception, String message) { LOG.error(message); - inputContext.fatalError(exception, message); + inputContext.reportFailure(TaskFailureType.NON_FATAL, exception, message); } @Override @@ -931,7 +932,7 @@ public class ShuffleManager implements FetcherCallback { } } else { LOG.error(srcNameTrimmed + ": " + "Scheduler failed with error: ", t); - inputContext.fatalError(t, "Shuffle Scheduler Failed"); + inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Scheduler Failed"); } } @@ -988,7 +989,7 @@ public class ShuffleManager implements FetcherCallback { } else { LOG.error(srcNameTrimmed + ": " + "Fetcher failed with error: ", t); shuffleError = t; - inputContext.fatalError(t, "Fetch failed"); + inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Fetch failed"); doBookKeepingForFetcherComplete(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index f40c49a..37269ad 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -423,7 +424,7 @@ public class Shuffle implements ExceptionReporter { } else { LOG.error(srcNameTrimmed + ": " + "ShuffleRunner failed with error", t); // In case of an abort / Interrupt - the runtime makes sure that this is ignored. - inputContext.fatalError(t, "Shuffle Runner Failed"); + inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Shuffle Runner Failed"); cleanupIgnoreErrors(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index ce410be..c7e3059 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -52,6 +52,7 @@ import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.library.api.IOInterruptedException; @@ -921,7 +922,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit outputContext.sendEvents(Collections.singletonList(compEvent)); } catch (IOException e) { LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e); - outputContext.fatalError(e, "Error in sending pipelined events"); + outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined events"); } } @@ -950,7 +951,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } catch (Throwable e) { LOG.error(destNameTrimmed + ": " + "Failure while attempting to reset buffer after spill", e); - outputContext.fatalError(e, "Failure while attempting to reset buffer after spill"); + outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill"); } if (!pipelinedShuffle) { @@ -976,7 +977,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Consider removing it in favor of having Tez kill the task LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t); spillException = t; - outputContext.fatalError(t, "Failure while spilling to disk"); + outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while spilling to disk"); spillLock.lock(); try { spillInProgress.signal(); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 5bbf0fb..0294bd3 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -20,7 +20,6 @@ package org.apache.tez.runtime.library.common.shuffle.impl; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; @@ -29,7 +28,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; @@ -50,6 +48,7 @@ import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @@ -229,7 +228,7 @@ public class TestShuffleInputEventHandlerImpl { //0--> 1 with spill id 1 (attemptNum 1). This should report exception dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).fatalError(any(Throwable.class), anyString()); + verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); } /** @@ -258,7 +257,7 @@ public class TestShuffleInputEventHandlerImpl { //Now send attemptNum 0. This should throw exception, because attempt #1 is already added dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).fatalError(any(Throwable.class), anyString()); + verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); } /** @@ -297,7 +296,7 @@ public class TestShuffleInputEventHandlerImpl { //Now send attemptNum 1. This should throw exception, because attempt #1 is already added dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - verify(inputContext).fatalError(any(Throwable.class), anyString()); + verify(inputContext).reportFailure(any(TaskFailureType.class), any(Throwable.class), anyString()); } private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx, http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java index 28f813c..855aedf 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffle.java @@ -17,6 +17,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -35,6 +36,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.common.Constants; @@ -76,7 +78,7 @@ public class TestShuffle { ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class); ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); - verify(inputContext, times(1)).fatalError(throwableArgumentCaptor.capture(), + verify(inputContext, times(1)).reportFailure(eq(TaskFailureType.NON_FATAL), throwableArgumentCaptor.capture(), stringArgumentCaptor.capture()); Throwable t = throwableArgumentCaptor.getValue(); http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index e7a2125..8c935eb 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.UUID; import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -325,7 +326,7 @@ public class TestUnorderedPartitionedKVWriter { } List<Event> events = kvWriter.close(); - verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class)); + verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS); assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs, @@ -493,7 +494,7 @@ public class TestUnorderedPartitionedKVWriter { assertTrue(events.size() == 1); //the last event which was sent out - verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class)); + verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); // Verify the status of the buffers if (numExpectedSpills == 0) { @@ -651,7 +652,7 @@ public class TestUnorderedPartitionedKVWriter { int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead; int numExpectedSpills = numRecordsWritten / recordsPerBuffer; - verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class)); + verify(outputContext, never()).reportFailure(any(TaskFailureType.class), any(Throwable.class), any(String.class)); // Verify the status of the buffers if (numExpectedSpills == 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java index 884808e..6103047 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java @@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.examples.processor; import java.util.List; import java.util.Map; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -57,7 +58,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor { Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME); if (filterWord == null) { - getContext().fatalError(null, "No filter word specified"); + getContext().reportFailure(TaskFailureType.NON_FATAL, null, "No filter word specified"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index a4f3c27..811ca3c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -284,7 +285,7 @@ public class TestInput extends AbstractLogicalInput { void throwException(String msg) { RuntimeException e = new RuntimeException(msg); - getContext().fatalError(e , msg); + getContext().reportFailure(TaskFailureType.NON_FATAL, e , msg); throw e; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java index 43777bc..b53dad9 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -119,7 +120,7 @@ public class TestProcessor extends AbstractLogicalIOProcessor { void throwException(String msg) { RuntimeException e = new RuntimeException(msg); - getContext().fatalError(e , msg); + getContext().reportFailure(TaskFailureType.NON_FATAL, e , msg); throw e; } http://git-wip-us.apache.org/repos/asf/tez/blob/27a13fc9/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java new file mode 100644 index 0000000..f413bdd --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTaskErrorsUsingLocalMode.java @@ -0,0 +1,215 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.test; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestTaskErrorsUsingLocalMode { + + private static final Logger LOG = LoggerFactory.getLogger(TestTaskErrorsUsingLocalMode.class); + + private static final String VERTEX_NAME = "vertex1"; + + + @Test(timeout = 20000) + public void testFatalErrorReported() throws IOException, TezException, InterruptedException { + + TezClient tezClient = getTezClient("testFatalErrorReported"); + DAGClient dagClient = null; + + try { + FailingProcessor.configureForFatalFail(); + DAG dag = DAG.create("testFatalErrorReportedDag").addVertex( + Vertex + .create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)); + + dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState()); + assertEquals(1, dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getFailedTaskAttemptCount()); + } finally { + if (dagClient != null) { + dagClient.close(); + } + tezClient.stop(); + } + } + + @Test(timeout = 20000) + public void testNonFatalErrorReported() throws IOException, TezException, InterruptedException { + + TezClient tezClient = getTezClient("testNonFatalErrorReported"); + DAGClient dagClient = null; + + try { + FailingProcessor.configureForNonFatalFail(); + DAG dag = DAG.create("testNonFatalErrorReported").addVertex( + Vertex + .create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)); + + dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState()); + assertEquals(4, dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getFailedTaskAttemptCount()); + } finally { + if (dagClient != null) { + dagClient.close(); + } + tezClient.stop(); + } + } + + @Test(timeout = 20000) + public void testSelfKillReported() throws IOException, TezException, InterruptedException { + + TezClient tezClient = getTezClient("testSelfKillReported"); + DAGClient dagClient = null; + + try { + FailingProcessor.configureForKilled(10); + DAG dag = DAG.create("testSelfKillReported").addVertex( + Vertex + .create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)); + + dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + assertEquals(10, dagClient.getVertexStatus(VERTEX_NAME, null).getProgress().getKilledTaskAttemptCount()); + } finally { + if (dagClient != null) { + dagClient.close(); + } + tezClient.stop(); + } + } + + + private TezClient getTezClient(String name) throws IOException, TezException { + TezConfiguration tezConf1 = new TezConfiguration(); + tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf1.set("fs.defaultFS", "file:///"); + tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); + tezClient1.start(); + return tezClient1; + } + + + public static class FailingProcessor extends AbstractLogicalIOProcessor { + + private static final String FAIL_STRING_NON_FATAL = "non-fatal-fail"; + private static final String FAIL_STRING_FATAL = "fatal-fail"; + private static final String KILL_STRING = "kill-self"; + + private static volatile boolean shouldFail; + private static volatile boolean fatalError; + + private static volatile boolean shouldKill; + private static volatile int killModeAttemptNumberToSucceed; + + + static { + reset(); + } + + static void reset() { + shouldFail = false; + fatalError = false; + + shouldKill = false; + killModeAttemptNumberToSucceed = -1; + } + + static void configureForNonFatalFail() { + reset(); + shouldFail = true; + } + + static void configureForFatalFail() { + reset(); + shouldFail = true; + fatalError = true; + } + + static void configureForKilled(int attemptNumber) { + reset(); + shouldKill = true; + killModeAttemptNumberToSucceed = attemptNumber; + } + + public FailingProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + + } + + @Override + public void handleEvents(List<Event> processorEvents) { + + } + + @Override + public void close() throws Exception { + + } + + @Override + public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws + Exception { + LOG.info("Running Failing processor"); + if (shouldFail) { + if (fatalError) { + LOG.info("Reporting fatal error"); + getContext().reportFailure(TaskFailureType.FATAL, null, FAIL_STRING_FATAL); + } else { + LOG.info("Reporting non-fatal error"); + getContext().reportFailure(TaskFailureType.NON_FATAL, null, FAIL_STRING_NON_FATAL); + } + } else if (shouldKill) { + if (getContext().getTaskAttemptNumber() != killModeAttemptNumberToSucceed) { + LOG.info("Reporting self-kill for attempt=" + getContext().getTaskAttemptNumber()); + getContext().killSelf(null, KILL_STRING); + } + } + } + } + +}
