TEZ-2441. Add tests for TezTaskRunner2. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/eb82ca2c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/eb82ca2c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/eb82ca2c Branch: refs/heads/TEZ-2003 Commit: eb82ca2c6af9c3e1b10eb372398fd6f2530508c1 Parents: 80e7053 Author: Siddharth Seth <[email protected]> Authored: Wed Jul 29 18:25:18 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:26:58 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/runtime/task/TezChild.java | 5 +- .../apache/tez/runtime/task/TezTaskRunner.java | 2 +- .../apache/tez/runtime/task/TezTaskRunner2.java | 42 +- .../runtime/task/TaskExecutionTestHelpers.java | 451 +++++++++++++ .../runtime/task/TestContainerExecution.java | 59 ++ .../tez/runtime/task/TestTaskExecution.java | 400 +----------- .../tez/runtime/task/TestTaskExecution2.java | 638 +++++++++++++++++++ .../src/test/resources/log4j.properties | 19 + 9 files changed, 1213 insertions(+), 404 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index b88044b..9d72d92 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -39,5 +39,6 @@ ALL CHANGES: TEZ-2651. Pluggable services should not extend AbstractService. TEZ-2652. Cleanup the way services are specified for an AM and vertices. TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. + TEZ-2441. Add tests for TezTaskRunner2. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 353fe23..b64ec37 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -49,7 +49,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.log4j.LogManager; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezCommonUtils; @@ -68,7 +67,6 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; @@ -256,6 +254,7 @@ public class TezChild { boolean shouldDie; try { TaskRunner2Result result = taskRunner.run(); + LOG.info("TaskRunner2Result: {}", result); shouldDie = result.isContainerShutdownRequested(); if (shouldDie) { LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString); @@ -377,8 +376,6 @@ public class TezChild { } if (ownUmbilical) { RPC.stopProxy(umbilical); - // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible. -// LogManager.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index a82d87b..aebf6a9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -250,7 +250,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { cause = ((UndeclaredThrowableException) cause).getCause(); } maybeRegisterFirstException(cause); - LOG.info("Encounted an error while executing task: " + task.getTaskAttemptID(), + LOG.info("Encountered an error while executing task: " + task.getTaskAttemptID(), cause); try { sendFailure(cause, "Failure while running task"); http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index a5fabb5..1a8828d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; @@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory; public class TezTaskRunner2 { + // Behaviour changes as compared to TezTaskRunner + // - Exception not thrown. Instead returned in the result. + // - The actual exception is part of the result, instead of requiring a getCause(). private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class); @@ -156,19 +160,7 @@ public class TezTaskRunner2 { } } } - if (executionResult != null) { - synchronized (this) { - if (isRunningState()) { - if (executionResult.error != null) { - trySettingEndReason(EndReason.TASK_ERROR); - registerFirstException(executionResult.error, null); - } else { - trySettingEndReason(EndReason.SUCCESS); - taskComplete.set(true); - } - } - } - } + processCallableResult(executionResult); switch (firstEndReason) { case SUCCESS: @@ -249,6 +241,26 @@ public class TezTaskRunner2 { } } + // It's possible for the task to actually complete, and an alternate signal such as killTask/killContainer + // come in before the future has been processed by this thread. That condition is not handled - and + // the result of the execution will be determind by the thread order. + @VisibleForTesting + void processCallableResult(TaskRunner2CallableResult executionResult) { + if (executionResult != null) { + synchronized (this) { + if (isRunningState()) { + if (executionResult.error != null) { + trySettingEndReason(EndReason.TASK_ERROR); + registerFirstException(executionResult.error, null); + } else { + trySettingEndReason(EndReason.SUCCESS); + taskComplete.set(true); + } + } + } + } + } + /** * Attempt to kill the running task, if it hasn't already completed for some other reason. * @return true if the task kill was honored, false otherwise @@ -438,12 +450,12 @@ public class TezTaskRunner2 { private String getTaskDiagnosticsString(Throwable t, String message) { String diagnostics; if (t != null && message != null) { - diagnostics = "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + ", errorMessage=" + diagnostics = "Failure while running task: " + ExceptionUtils.getStackTrace(t) + ", errorMessage=" + message; } else if (t == null && message == null) { diagnostics = "Unknown error"; } else { - diagnostics = t != null ? "exceptionThrown=" + ExceptionUtils.getStackTrace(t) + diagnostics = t != null ? "Failure while running task: " + ExceptionUtils.getStackTrace(t) : " errorMessage=" + message; } return diagnostics; http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java new file mode 100644 index 0000000..fc42da3 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java @@ -0,0 +1,451 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezTaskUmbilicalProtocol; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskExecutionTestHelpers { + + public static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException"; + + // Uses static fields for signaling. Ensure only used by one test at a time. + public static class TestProcessor extends AbstractLogicalIOProcessor { + + public static final byte[] CONF_EMPTY = new byte[] { 0 }; + public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 }; + public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 }; + public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 }; + public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 }; + public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 }; + + private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class); + + private static final ReentrantLock processorLock = new ReentrantLock(); + private static final Condition processorCondition = processorLock.newCondition(); + private static final Condition loopCondition = processorLock.newCondition(); + private static final Condition completionCondition = processorLock.newCondition(); + private static final Condition runningCondition = processorLock.newCondition(); + private static volatile boolean completed = false; + private static volatile boolean running = false; + private static volatile boolean looping = false; + private static volatile boolean signalled = false; + + private static boolean receivedInterrupt = false; + private static volatile boolean wasAborted = false; + + private boolean throwIOException = false; + private boolean throwTezException = false; + private boolean signalFatalAndThrow = false; + private boolean signalFatalAndLoop = false; + private boolean signalFatalAndComplete = false; + + public TestProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + parseConf(getContext().getUserPayload().deepCopyAsArray()); + } + + @Override + public void handleEvents(List<Event> processorEvents) { + + } + + @Override + public void close() throws Exception { + + } + + private void parseConf(byte[] bytes) { + byte b = bytes[0]; + throwIOException = (b & 1) > 0; + throwTezException = (b & 2) > 0; + signalFatalAndThrow = (b & 4) > 0; + signalFatalAndLoop = (b & 8) > 0; + signalFatalAndComplete = (b & 16) > 0; + } + + public static void reset() { + signalled = false; + receivedInterrupt = false; + completed = false; + running = false; + wasAborted = false; + } + + public static void signal() { + LOG.info("Signalled"); + processorLock.lock(); + try { + signalled = true; + processorCondition.signal(); + } finally { + processorLock.unlock(); + } + } + + public static void awaitStart() throws InterruptedException { + LOG.info("Awaiting Process run"); + processorLock.lock(); + try { + if (running) { + return; + } + runningCondition.await(); + } finally { + processorLock.unlock(); + } + } + + public static void awaitLoop() throws InterruptedException { + LOG.info("Awaiting loop after signalling error"); + processorLock.lock(); + try { + if (looping) { + return; + } + loopCondition.await(); + } finally { + processorLock.unlock(); + } + } + + public static void awaitCompletion() throws InterruptedException { + LOG.info("Await completion"); + processorLock.lock(); + try { + if (completed) { + return; + } else { + completionCondition.await(); + } + } finally { + processorLock.unlock(); + } + } + + public static boolean wasInterrupted() { + processorLock.lock(); + try { + return receivedInterrupt; + } finally { + processorLock.unlock(); + } + } + + public static boolean wasAborted() { + processorLock.lock(); + try { + return wasAborted; + } finally { + processorLock.unlock(); + } + } + + @Override + public void abort() { + wasAborted = true; + } + + @Override + public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws + Exception { + processorLock.lock(); + running = true; + runningCondition.signal(); + try { + try { + LOG.info("Signal is: " + signalled); + if (!signalled) { + LOG.info("Waiting for processor signal"); + processorCondition.await(); + } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + LOG.info("Received processor signal"); + if (throwIOException) { + throw createProcessorIOException(); + } else if (throwTezException) { + throw createProcessorTezException(); + } else if (signalFatalAndThrow) { + IOException io = new IOException("FATALERROR"); + getContext().fatalError(io, "FATALERROR"); + throw io; + } else if (signalFatalAndComplete) { + IOException io = new IOException("FATALERROR"); + getContext().fatalError(io, "FATALERROR"); + return; + } else if (signalFatalAndLoop) { + IOException io = createProcessorIOException(); + getContext().fatalError(io, "FATALERROR"); + LOG.info("looping"); + looping = true; + loopCondition.signal(); + LOG.info("Waiting for Processor signal again"); + processorCondition.await(); + LOG.info("Received second processor signal"); + } + } catch (InterruptedException e) { + receivedInterrupt = true; + } + } finally { + completed = true; + completionCondition.signal(); + processorLock.unlock(); + } + } + } + + public static TezException createProcessorTezException() { + return new TezException("TezException"); + } + + public static IOException createProcessorIOException() { + return new IOException("IOException"); + } + + public static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol { + + private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class); + + private final List<TezEvent> requestEvents = new LinkedList<TezEvent>(); + + private final ReentrantLock umbilicalLock = new ReentrantLock(); + private final Condition eventCondition = umbilicalLock.newCondition(); + private boolean pendingEvent = false; + private boolean eventEnacted = false; + + volatile int getTaskInvocations = 0; + + private boolean shouldThrowException = false; + private boolean shouldSendDieSignal = false; + + public void signalThrowException() { + umbilicalLock.lock(); + try { + shouldThrowException = true; + pendingEvent = true; + } finally { + umbilicalLock.unlock(); + } + } + + public void signalSendShouldDie() { + umbilicalLock.lock(); + try { + shouldSendDieSignal = true; + pendingEvent = true; + } finally { + umbilicalLock.unlock(); + } + } + + public void awaitRegisteredEvent() throws InterruptedException { + umbilicalLock.lock(); + try { + if (eventEnacted) { + return; + } + LOG.info("Awaiting event"); + eventCondition.await(); + } finally { + umbilicalLock.unlock(); + } + } + + public void resetTrackedEvents() { + umbilicalLock.lock(); + try { + requestEvents.clear(); + } finally { + umbilicalLock.unlock(); + } + } + + public void verifyNoCompletionEvents() { + umbilicalLock.lock(); + try { + for (TezEvent event : requestEvents) { + if (event.getEvent() instanceof TaskAttemptFailedEvent) { + fail("Found a TaskAttemptFailedEvent when not expected"); + } + if (event.getEvent() instanceof TaskAttemptCompletedEvent) { + fail("Found a TaskAttemptCompletedvent when not expected"); + } + } + } finally { + umbilicalLock.unlock(); + } + } + + public void verifyTaskFailedEvent(String diagnostics) { + umbilicalLock.lock(); + try { + for (TezEvent event : requestEvents) { + if (event.getEvent() instanceof TaskAttemptFailedEvent) { + TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent(); + if (failedEvent.getDiagnostics().startsWith(diagnostics)) { + return; + } else { + fail("Diagnostic message does not match expected message. Found [" + + failedEvent.getDiagnostics() + "], Expected: [" + diagnostics + "]"); + } + } + } + fail("No TaskAttemptFailedEvents sent over umbilical"); + } finally { + umbilicalLock.unlock(); + } + } + + public void verifyTaskFailedEvent(String diagStart, String diagContains) { + umbilicalLock.lock(); + try { + for (TezEvent event : requestEvents) { + if (event.getEvent() instanceof TaskAttemptFailedEvent) { + TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent(); + if (failedEvent.getDiagnostics().startsWith(diagStart)) { + if (diagContains != null) { + if (failedEvent.getDiagnostics().contains(diagContains)) { + return; + } else { + fail("Diagnostic message does not contain expected message. Found [" + + failedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]"); + } + } + } else { + fail("Diagnostic message does not start with expected message. Found [" + + failedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]"); + } + } + } + fail("No TaskAttemptFailedEvents sent over umbilical"); + } finally { + umbilicalLock.unlock(); + } + } + + public void verifyTaskSuccessEvent() { + umbilicalLock.lock(); + try { + for (TezEvent event : requestEvents) { + if (event.getEvent() instanceof TaskAttemptCompletedEvent) { + return; + } + } + fail("No TaskAttemptFailedEvents sent over umbilical"); + } finally { + umbilicalLock.unlock(); + } + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } + + @Override + public ContainerTask getTask(ContainerContext containerContext) throws IOException { + // Return shouldDie = true + getTaskInvocations++; + return new ContainerTask(null, true, null, null, false); + } + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + return true; + } + + @Override + public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, + TezException { + umbilicalLock.lock(); + if (request.getEvents() != null) { + requestEvents.addAll(request.getEvents()); + } + try { + if (shouldThrowException) { + LOG.info("TestUmbilical throwing Exception"); + throw new IOException(HEARTBEAT_EXCEPTION_STRING); + } + TezHeartbeatResponse response = new TezHeartbeatResponse(); + response.setLastRequestId(request.getRequestId()); + if (shouldSendDieSignal) { + LOG.info("TestUmbilical returning shouldDie=true"); + response.setShouldDie(); + } + return response; + } finally { + if (pendingEvent) { + eventEnacted = true; + LOG.info("Signalling Event"); + eventCondition.signal(); + } + umbilicalLock.unlock(); + } + } + } + + public static ContainerId createContainerId(ApplicationId appId) { + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + return containerId; + } + + public static TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) { + TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0), + createContainerId(appId).toString()); + return taskReporter; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java new file mode 100644 index 0000000..c1616af --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.junit.Test; + +public class TestContainerExecution { + + @Test(timeout = 5000) + public void testGetTaskShouldDie() throws InterruptedException, ExecutionException { + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); + + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + ContainerContext containerContext = new ContainerContext(containerId.toString()); + + ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100); + ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter); + + getTaskFuture.get(); + assertEquals(1, umbilical.getTaskInvocations); + + } finally { + executor.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java index 1bcb337..a99416a 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java @@ -18,8 +18,8 @@ package org.apache.tez.runtime.task; +import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -28,30 +28,18 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.common.ContainerContext; -import org.apache.tez.common.ContainerTask; -import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -60,21 +48,13 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; -import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.runtime.common.resources.ScalingAllocator; +import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor; +import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TezTaskUmbilicalForTest; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -82,7 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.HashMultimap; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -91,7 +70,7 @@ public class TestTaskExecution { private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution.class); - private static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException"; + private static final Configuration defaultConf = new Configuration(); private static final FileSystem localFs; @@ -137,7 +116,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.signal(); boolean result = taskRunnerFuture.get(); @@ -164,7 +143,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.signal(); boolean result = taskRunnerFuture.get(); @@ -176,7 +155,7 @@ public class TestTaskExecution { taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY); // Setup the executor - taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.signal(); result = taskRunnerFuture.get(); @@ -188,7 +167,7 @@ public class TestTaskExecution { } } - // test tasked failed due to exception in Processor + // test task failed due to exception in Processor @Test(timeout = 5000) public void testFailedTask() throws IOException, InterruptedException, TezException { @@ -203,7 +182,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_THROW_TEZ_EXCEPTION); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.awaitStart(); TestProcessor.signal(); @@ -238,7 +217,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, "NotExitedProcessor", TestProcessor.CONF_THROW_TEZ_EXCEPTION); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); try { taskRunnerFuture.get(); } catch (ExecutionException e) { @@ -268,7 +247,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.awaitStart(); umbilical.signalThrowException(); @@ -280,7 +259,7 @@ public class TestTaskExecution { } catch (ExecutionException e) { Throwable cause = e.getCause(); assertTrue(cause instanceof IOException); - assertTrue(cause.getMessage().contains(HEARTBEAT_EXCEPTION_STRING)); + assertTrue(cause.getMessage().contains(TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING)); } TestProcessor.awaitCompletion(); assertTrue(TestProcessor.wasInterrupted()); @@ -307,7 +286,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY); // Setup the executor - Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable(taskRunner)); + Future<Boolean> taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable1ForTest(taskRunner)); // Signal the processor to go through TestProcessor.awaitStart(); umbilical.signalSendShouldDie(); @@ -329,38 +308,14 @@ public class TestTaskExecution { } } - @Test(timeout = 5000) - public void testGetTaskShouldDie() throws InterruptedException, ExecutionException { - ListeningExecutorService executor = null; - try { - ExecutorService rawExecutor = Executors.newFixedThreadPool(1); - executor = MoreExecutors.listeningDecorator(rawExecutor); - ApplicationId appId = ApplicationId.newInstance(10000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); - - TezTaskUmbilicalForTest umbilical = new TezTaskUmbilicalForTest(); - ContainerContext containerContext = new ContainerContext(containerId.toString()); - - ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext, 100); - ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter); - - getTaskFuture.get(); - assertEquals(1, umbilical.getTaskInvocations); - - } finally { - executor.shutdownNow(); - } - } - // Potential new tests // Different states - initialization failure, close failure // getTask states - private static class TaskRunnerCallable implements Callable<Boolean> { + private static class TaskRunnerCallable1ForTest implements Callable<Boolean> { private final TezTaskRunner taskRunner; - public TaskRunnerCallable(TezTaskRunner taskRunner) { + public TaskRunnerCallable1ForTest(TezTaskRunner taskRunner) { this.taskRunner = taskRunner; } @@ -370,328 +325,9 @@ public class TestTaskExecution { } } - // Uses static fields for signaling. Ensure only used by one test at a time. - public static class TestProcessor extends AbstractLogicalIOProcessor { - - public static final byte[] CONF_EMPTY = new byte[] { 0 }; - public static final byte[] CONF_THROW_IO_EXCEPTION = new byte[] { 1 }; - public static final byte[] CONF_THROW_TEZ_EXCEPTION = new byte[] { 2 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_THROW = new byte[] { 4 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_LOOP = new byte[] { 8 }; - public static final byte[] CONF_SIGNAL_FATAL_AND_COMPLETE = new byte[] { 16 }; - - private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class); - - private static final ReentrantLock processorLock = new ReentrantLock(); - private static final Condition processorCondition = processorLock.newCondition(); - private static final Condition completionCondition = processorLock.newCondition(); - private static final Condition runningCondition = processorLock.newCondition(); - private static boolean completed = false; - private static boolean running = false; - private static boolean signalled = false; - - public static boolean receivedInterrupt = false; - - private boolean throwIOException = false; - private boolean throwTezException = false; - private boolean signalFatalAndThrow = false; - private boolean signalFatalAndLoop = false; - private boolean signalFatalAndComplete = false; - - public TestProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void initialize() throws Exception { - parseConf(getContext().getUserPayload().deepCopyAsArray()); - } - - @Override - public void handleEvents(List<Event> processorEvents) { - - } - - @Override - public void close() throws Exception { - - } - private void parseConf(byte[] bytes) { - byte b = bytes[0]; - throwIOException = (b & 1) > 1; - throwTezException = (b & 2) > 1; - signalFatalAndThrow = (b & 4) > 1; - signalFatalAndLoop = (b & 8) > 1; - signalFatalAndComplete = (b & 16) > 1; - } - public static void reset() { - signalled = false; - receivedInterrupt = false; - completed = false; - running = false; - } - public static void signal() { - LOG.info("Signalled"); - processorLock.lock(); - try { - signalled = true; - processorCondition.signal(); - } finally { - processorLock.unlock(); - } - } - - public static void awaitStart() throws InterruptedException { - LOG.info("Awaiting Process run"); - processorLock.lock(); - try { - if (running) { - return; - } - runningCondition.await(); - } finally { - processorLock.unlock(); - } - } - - public static void awaitCompletion() throws InterruptedException { - LOG.info("Await completion"); - processorLock.lock(); - try { - if (completed) { - return; - } else { - completionCondition.await(); - } - } finally { - processorLock.unlock(); - } - } - - public static boolean wasInterrupted() { - processorLock.lock(); - try { - return receivedInterrupt; - } finally { - processorLock.unlock(); - } - } - - @Override - public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws - Exception { - processorLock.lock(); - running = true; - runningCondition.signal(); - try { - try { - LOG.info("Signal is: " + signalled); - if (!signalled) { - LOG.info("Waiting for processor signal"); - processorCondition.await(); - } - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - LOG.info("Received processor signal"); - if (throwIOException) { - throw new IOException(); - } else if (throwTezException) { - throw new TezException("TezException"); - } else if (signalFatalAndThrow) { - IOException io = new IOException("FATALERROR"); - getContext().fatalError(io, "FATALERROR"); - throw io; - } else if (signalFatalAndComplete) { - IOException io = new IOException("FATALERROR"); - getContext().fatalError(io, "FATALERROR"); - return; - } else if (signalFatalAndLoop) { - IOException io = new IOException("FATALERROR"); - getContext().fatalError(io, "FATALERROR"); - LOG.info("Waiting for Processor signal again"); - processorCondition.await(); - LOG.info("Received second processor signal"); - } - } catch (InterruptedException e) { - receivedInterrupt = true; - } - } finally { - completed = true; - completionCondition.signal(); - processorLock.unlock(); - } - } - } - - private static class TezTaskUmbilicalForTest implements TezTaskUmbilicalProtocol { - - private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class); - - private final List<TezEvent> requestEvents = new LinkedList<TezEvent>(); - - private final ReentrantLock umbilicalLock = new ReentrantLock(); - private final Condition eventCondition = umbilicalLock.newCondition(); - private boolean pendingEvent = false; - private boolean eventEnacted = false; - - volatile int getTaskInvocations = 0; - - private boolean shouldThrowException = false; - private boolean shouldSendDieSignal = false; - - public void signalThrowException() { - umbilicalLock.lock(); - try { - shouldThrowException = true; - pendingEvent = true; - } finally { - umbilicalLock.unlock(); - } - } - - public void signalSendShouldDie() { - umbilicalLock.lock(); - try { - shouldSendDieSignal = true; - pendingEvent = true; - } finally { - umbilicalLock.unlock(); - } - } - - public void awaitRegisteredEvent() throws InterruptedException { - umbilicalLock.lock(); - try { - if (eventEnacted) { - return; - } - LOG.info("Awaiting event"); - eventCondition.await(); - } finally { - umbilicalLock.unlock(); - } - } - - public void resetTrackedEvents() { - umbilicalLock.lock(); - try { - requestEvents.clear(); - } finally { - umbilicalLock.unlock(); - } - } - - public void verifyNoCompletionEvents() { - umbilicalLock.lock(); - try { - for (TezEvent event : requestEvents) { - if (event.getEvent() instanceof TaskAttemptFailedEvent) { - fail("Found a TaskAttemptFailedEvent when not expected"); - } - if (event.getEvent() instanceof TaskAttemptCompletedEvent) { - fail("Found a TaskAttemptCompletedvent when not expected"); - } - } - } finally { - umbilicalLock.unlock(); - } - } - - public void verifyTaskFailedEvent(String diagnostics) { - umbilicalLock.lock(); - try { - for (TezEvent event : requestEvents) { - if (event.getEvent() instanceof TaskAttemptFailedEvent) { - TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent)event.getEvent(); - if(failedEvent.getDiagnostics().startsWith(diagnostics)){ - return ; - } else { - fail("No detailed diagnostics message in TaskAttemptFailedEvent"); - } - } - } - fail("No TaskAttemptFailedEvents sent over umbilical"); - } finally { - umbilicalLock.unlock(); - } - } - - public void verifyTaskSuccessEvent() { - umbilicalLock.lock(); - try { - for (TezEvent event : requestEvents) { - if (event.getEvent() instanceof TaskAttemptCompletedEvent) { - return; - } - } - fail("No TaskAttemptFailedEvents sent over umbilical"); - } finally { - umbilicalLock.unlock(); - } - } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, - int clientMethodsHash) throws IOException { - return null; - } - - @Override - public ContainerTask getTask(ContainerContext containerContext) throws IOException { - // Return shouldDie = true - getTaskInvocations++; - return new ContainerTask(null, true, null, null, false); - } - - @Override - public boolean canCommit(TezTaskAttemptID taskid) throws IOException { - return true; - } - - @Override - public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, - TezException { - umbilicalLock.lock(); - if (request.getEvents() != null) { - requestEvents.addAll(request.getEvents()); - } - try { - if (shouldThrowException) { - LOG.info("TestUmbilical throwing Exception"); - throw new IOException(HEARTBEAT_EXCEPTION_STRING); - } - TezHeartbeatResponse response = new TezHeartbeatResponse(); - response.setLastRequestId(request.getRequestId()); - if (shouldSendDieSignal) { - LOG.info("TestUmbilical returning shouldDie=true"); - response.setShouldDie(); - } - return response; - } finally { - if (pendingEvent) { - eventEnacted = true; - LOG.info("Signalling Event"); - eventCondition.signal(); - } - umbilicalLock.unlock(); - } - } - } - - private TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) { - TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0), - createContainerId(appId).toString()); - return taskReporter; - } private TezTaskRunner createTaskRunner(ApplicationId appId, TezTaskUmbilicalForTest umbilical, TaskReporter taskReporter, ListeningExecutorService executor, byte[] processorConf) @@ -722,9 +358,5 @@ public class TestTaskExecution { return taskRunner; } - private ContainerId createContainerId(ApplicationId appId) { - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); - return containerId; - } + } http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java new file mode 100644 index 0000000..12d9d3f --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -0,0 +1,638 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorIOException; +import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createProcessorTezException; +import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.createTaskReporter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.common.resources.ScalingAllocator; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestTaskExecution2 { + + private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution2.class); + + private static final Configuration defaultConf = new Configuration(); + private static final FileSystem localFs; + private static final Path workDir; + + private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1); + + static { + defaultConf.set("fs.defaultFS", "file:///"); + defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS, + ScalingAllocator.class.getName()); + try { + localFs = FileSystem.getLocal(defaultConf); + Path wd = new Path(System.getProperty("test.build.data", "/tmp"), + TestTaskExecution.class.getSimpleName()); + workDir = localFs.makeQualified(wd); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Before + public void reset() { + TestProcessor.reset(); + } + + @AfterClass + public static void shutdown() { + taskExecutor.shutdownNow(); + } + + @Test(timeout = 5000) + public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException, + ExecutionException { + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit( + new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.signal(); + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskSuccessEvent(); + assertFalse(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit( + new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.signal(); + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskSuccessEvent(); + assertFalse(TestProcessor.wasAborted()); + umbilical.resetTrackedEvents(); + + taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.signal(); + result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskSuccessEvent(); + assertFalse(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + // test task failed due to exception in Processor + @Test(timeout = 5000) + public void testFailedTaskTezException() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_THROW_TEZ_EXCEPTION); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false); + + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent( + "Failure while running task", + TezException.class.getName() + ": " + TezException.class.getSimpleName()); + // Failure detected as a result of fall off from the run method. abort isn't required. + assertFalse(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + + // Test task failed due to Processor class not found + @Test(timeout = 5000) + public void testFailedTask2() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + "NotExitedProcessor", TestProcessor.CONF_EMPTY, false); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, + new TezUncheckedException("Unchecked exception"), false); + + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent("Failure while running task", + ":org.apache.tez.dag.api.TezUncheckedException: " + + "Unable to load class: NotExitedProcessor"); + // Failure detected as a result of fall off from the run method. abort isn't required. + assertFalse(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + // test task failed due to exception in Processor + @Test(timeout = 5000) + public void testFailedTaskIOException() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_THROW_IO_EXCEPTION); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false); + + + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent( + "Failure while running task", + IOException.class.getName() + ": " + IOException.class.getSimpleName()); + // Failure detected as a result of fall off from the run method. abort isn't required. + assertFalse(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testHeartbeatException() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + umbilical.signalThrowException(); + umbilical.awaitRegisteredEvent(); + // Not signaling an actual start to verify task interruption + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE, + new IOException("IOException"), + TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false); + + TestProcessor.awaitCompletion(); + assertTrue(TestProcessor.wasInterrupted()); + assertNull(taskReporter.currentCallable); + // No completion events since umbilical communication already failed. + umbilical.verifyNoCompletionEvents(); + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + umbilical.signalSendShouldDie(); + umbilical.awaitRegisteredEvent(); + // Not signaling an actual start to verify task interruption + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true); + + + TestProcessor.awaitCompletion(); + assertTrue(TestProcessor.wasInterrupted()); + assertNull(taskReporter.currentCallable); + // TODO Is this statement correct ? + // No completion events since shouldDie was requested by the AM, which should have killed the + // task. + umbilical.verifyNoCompletionEvents(); + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testSignalFatalErrorAndLoop() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_SIGNAL_FATAL_AND_LOOP); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + + TestProcessor.awaitLoop(); + // The fatal error should have caused an interrupt. + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false); + + TestProcessor.awaitCompletion(); + assertTrue(TestProcessor.wasInterrupted()); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskFailedEvent( + "Failure while running task", + IOException.class.getName() + ": " + IOException.class.getSimpleName()); + // Signal fatal error should cause the processor to fail. + assertTrue(TestProcessor.wasAborted()); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testTaskKilled() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + + taskRunner.killTask(); + + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false); + + TestProcessor.awaitCompletion(); + assertTrue(TestProcessor.wasInterrupted()); + assertNull(taskReporter.currentCallable); + // Kill events are not sent over the umbilical at the moment. + umbilical.verifyNoCompletionEvents(); + } finally { + executor.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void testKilledAfterComplete() throws IOException, InterruptedException, TezException, + ExecutionException { + + ListeningExecutorService executor = null; + try { + ExecutorService rawExecutor = Executors.newFixedThreadPool(1); + executor = MoreExecutors.listeningDecorator(rawExecutor); + ApplicationId appId = ApplicationId.newInstance(10000, 1); + TaskExecutionTestHelpers.TezTaskUmbilicalForTest + umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest(); + TaskReporter taskReporter = createTaskReporter(appId, umbilical); + + TezTaskRunner2ForTest taskRunner = + createTaskRunnerForTest(appId, umbilical, taskReporter, executor, + TestProcessor.CONF_EMPTY); + // Setup the executor + Future<TaskRunner2Result> taskRunnerFuture = + taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner)); + // Signal the processor to go through + TestProcessor.awaitStart(); + TestProcessor.signal(); + TestProcessor.awaitCompletion(); + + taskRunner.awaitCallableCompletion(); + + taskRunner.killTask(); + TaskRunner2Result result = taskRunnerFuture.get(); + verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false); + + assertFalse(TestProcessor.wasInterrupted()); + assertNull(taskReporter.currentCallable); + umbilical.verifyTaskSuccessEvent(); + } finally { + executor.shutdownNow(); + } + } + + + private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result, + EndReason expectedEndReason, Throwable expectedThrowable, + boolean wasShutdownRequested) { + verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, expectedThrowable, null, + wasShutdownRequested); + } + + private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result, + EndReason expectedEndReason, Throwable expectedThrowable, + String expectedExceptionMessage, + boolean wasShutdownRequested) { + assertEquals(expectedEndReason, taskRunner2Result.getEndReason()); + if (expectedThrowable == null) { + assertNull(taskRunner2Result.getError()); + } else { + assertNotNull(taskRunner2Result.getError()); + Throwable cause = taskRunner2Result.getError(); + LOG.info(cause.getClass().getName()); + assertTrue(cause.getClass().isAssignableFrom(expectedThrowable.getClass())); + + if (expectedExceptionMessage != null) { + assertTrue(cause.getMessage().contains(expectedExceptionMessage)); + } + + } + assertEquals(wasShutdownRequested, taskRunner2Result.isContainerShutdownRequested()); + } + + + private static class TaskRunnerCallable2ForTest implements Callable<TaskRunner2Result> { + private final TezTaskRunner2 taskRunner; + + public TaskRunnerCallable2ForTest(TezTaskRunner2 taskRunner) { + this.taskRunner = taskRunner; + } + + @Override + public TaskRunner2Result call() throws Exception { + return taskRunner.run(); + } + } + + private TezTaskRunner2 createTaskRunner(ApplicationId appId, + TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, + TaskReporter taskReporter, + ListeningExecutorService executor, byte[] processorConf) + throws IOException { + return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(), + processorConf, false); + } + + private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId, + TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, + TaskReporter taskReporter, + ListeningExecutorService executor, + byte[] processorConf) + throws IOException { + return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor, + TestProcessor.class.getName(), + processorConf, true); + } + + private TezTaskRunner2 createTaskRunner(ApplicationId appId, + TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, + TaskReporter taskReporter, + ListeningExecutorService executor, String processorClass, + byte[] processorConf, boolean testRunner) throws + IOException { + TezConfiguration tezConf = new TezConfiguration(defaultConf); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Path testDir = new Path(workDir, UUID.randomUUID().toString()); + String[] localDirs = new String[]{testDir.toString()}; + + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vertexId = TezVertexID.getInstance(dagId, 1); + TezTaskID taskId = TezTaskID.getInstance(vertexId, 1); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1); + ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass) + .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf))); + TaskSpec taskSpec = + new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor, + new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null); + + TezTaskRunner2 taskRunner; + if (testRunner) { + taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1, + new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), + HashMultimap.<String, String>create(), taskReporter, + executor, null, "", new ExecutionContextImpl("localhost"), + Runtime.getRuntime().maxMemory()); + } else { + taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1, + new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), + HashMultimap.<String, String>create(), taskReporter, + executor, null, "", new ExecutionContextImpl("localhost"), + Runtime.getRuntime().maxMemory()); + } + + return taskRunner; + } + + public static class TezTaskRunner2ForTest extends TezTaskRunner2 { + + private final ReentrantLock testLock = new ReentrantLock(); + private final Condition callableCompletionCondition = testLock.newCondition(); + + private final AtomicBoolean isCallableComplete = new AtomicBoolean(false); + + public TezTaskRunner2ForTest(Configuration tezConf, UserGroupInformation ugi, + String[] localDirs, + TaskSpec taskSpec, int appAttemptNumber, + Map<String, ByteBuffer> serviceConsumerMetadata, + Map<String, String> serviceProviderEnvMap, + Multimap<String, String> startedInputsMap, + TaskReporterInterface taskReporter, + ListeningExecutorService executor, + ObjectRegistry objectRegistry, + String pid, + ExecutionContext executionContext, + long memAvailable) throws IOException { + super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata, + serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, + executionContext, memAvailable); + } + + + @Override + @VisibleForTesting + void processCallableResult(TaskRunner2Callable.TaskRunner2CallableResult executionResult) { + testLock.lock(); + try { + super.processCallableResult(executionResult); + isCallableComplete.set(true); + callableCompletionCondition.signal(); + } finally { + testLock.unlock(); + } + } + + void awaitCallableCompletion() throws InterruptedException { + testLock.lock(); + try { + while (!isCallableComplete.get()) { + callableCompletionCondition.await(); + } + } finally { + testLock.unlock(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/eb82ca2c/tez-runtime-internals/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/resources/log4j.properties b/tez-runtime-internals/src/test/resources/log4j.properties new file mode 100644 index 0000000..531b68b --- /dev/null +++ b/tez-runtime-internals/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
