Repository: twill Updated Branches: refs/heads/master 5cacb8eb6 -> 3c32e1e45
Add started, containerLaunched, containerStopped, completed, killed, aborted to EventHandler This closes #58 on Github Signed-off-by: Poorna Chandra <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/3c32e1e4 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/3c32e1e4 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/3c32e1e4 Branch: refs/heads/master Commit: 3c32e1e457cdb168cc65e104e86cf3fba8d2c372 Parents: 5cacb8e Author: Chengfeng <[email protected]> Authored: Tue Jul 25 12:53:26 2017 -0700 Committer: Poorna Chandra <[email protected]> Committed: Wed Aug 9 18:52:52 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/twill/api/EventHandler.java | 60 +++- .../apache/twill/api/EventHandlerContext.java | 13 + .../appmaster/ApplicationMasterService.java | 147 ++++++++-- .../appmaster/BasicEventHandlerContext.java | 20 +- .../internal/appmaster/RunningContainers.java | 33 ++- .../apache/twill/yarn/EventHandlerTestRun.java | 290 +++++++++++++++++++ .../twill/yarn/ProvisionTimeoutTestRun.java | 46 ++- .../org/apache/twill/yarn/YarnTestSuite.java | 1 + 8 files changed, 575 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-api/src/main/java/org/apache/twill/api/EventHandler.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/EventHandler.java b/twill-api/src/main/java/org/apache/twill/api/EventHandler.java index a19c0b3..45a353e 100644 --- a/twill-api/src/main/java/org/apache/twill/api/EventHandler.java +++ b/twill-api/src/main/java/org/apache/twill/api/EventHandler.java @@ -17,6 +17,8 @@ */ package org.apache.twill.api; +import org.apache.twill.internal.Constants; + import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -124,6 +126,58 @@ public abstract class EventHandler { } /** + * Invoked by the application when it starts. + */ + public void started() { + // No-op + } + + /** + * Invoked by the application when new container is launched for a {@link TwillRunnable}. + * + * @param runnableName name of the runnable to be run in the new container + * @param instanceId the instance ID of the runnable instance to be run in the new container + * @param containerId the ID of the newly launched container + */ + public void containerLaunched(String runnableName, int instanceId, String containerId) { + // No-op + } + + /** + * Invoked by the application when the container allocated for a {@link TwillRunnable} is stopped. + * + * @param runnableName name of the runnable in the stopped container + * @param instanceId the instance ID of the runnable instance run in the stopped container + * @param containerId the ID of the stopped container + * @param exitStatus the exit status of the container + */ + public void containerStopped(String runnableName, int instanceId, String containerId, int exitStatus) { + // No-op + } + + /** + * Invoked by the application when all containers complete. + */ + public void completed() { + // No-op + } + + /** + * Invoked by the application when stop command is received to kill the current application. + * + */ + public void killed() { + // No-op + } + + /** + * Invoked by the application when the application is aborted because of timeout. + */ + public void aborted() { + // No-op + } + + /** * Invoked by the application when shutting down. */ public void destroy() { @@ -133,9 +187,11 @@ public abstract class EventHandler { /** * Invoked when the number of expected instances doesn't match with number of actual instances. * @param timeoutEvents An Iterable of {@link TimeoutEvent} that contains information about runnable launch timeout. - * @return A {@link TimeoutAction} to govern action to act. + * @return A {@link TimeoutAction} to govern action to act. By default it is to recheck after 60 seconds. */ - public abstract TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents); + public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) { + return new TimeoutAction(Constants.PROVISION_TIMEOUT); + } /** * Returns set of configurations available at runtime for access. http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java b/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java index 8e58af6..09429de 100644 --- a/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java +++ b/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java @@ -22,5 +22,18 @@ package org.apache.twill.api; */ public interface EventHandlerContext { + /** + * @return Name of the {@link TwillApplication} containing the {@link EventHandler} + */ + String getApplicationName(); + + /** + * @return the unique ID of the current run of the {@link TwillApplication} containing the {@link EventHandler} + */ + RunId getRunId(); + + /** + * @return {@link EventHandlerSpecification} of the {@link EventHandler} + */ EventHandlerSpecification getSpecification(); } http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 4917f4d..6fc31f5 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; import org.apache.twill.api.Command; import org.apache.twill.api.EventHandler; +import org.apache.twill.api.EventHandlerContext; import org.apache.twill.api.EventHandlerSpecification; import org.apache.twill.api.LocalFile; import org.apache.twill.api.ResourceReport; @@ -107,6 +108,13 @@ import javax.annotation.Nullable; * The class that acts as {@code ApplicationMaster} for Twill applications. */ public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> { + /** + * Final status of this service when it stops. + */ + private enum StopStatus { + COMPLETED, // All containers complete + ABORTED // Aborted because of timeout + } private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class); private static final Gson GSON = new GsonBuilder() @@ -131,6 +139,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private final Map<String, Map<String, String>> environments; private final TwillRuntimeSpecification twillRuntimeSpec; + private volatile StopStatus stopStatus; private volatile boolean stopped; private Queue<RunnableContainerRequest> runnableContainerRequests; private ExecutorService instanceChangeExecutor; @@ -157,8 +166,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp twillRuntimeSpec.getKafkaZKConnect()); this.expectedContainers = new ExpectedContainers(twillSpec); - this.runningContainers = createRunningContainers(amClient.getContainerId(), amClient.getHost()); this.eventHandler = createEventHandler(twillSpec); + this.runningContainers = createRunningContainers(amClient.getContainerId(), amClient.getHost()); } private JvmOptions loadJvmOptions() throws IOException { @@ -172,19 +181,104 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } @SuppressWarnings("unchecked") - @Nullable private EventHandler createEventHandler(TwillSpecification twillSpec) throws ClassNotFoundException { // Should be able to load by this class ClassLoader, as they packaged in the same jar. EventHandlerSpecification handlerSpec = twillSpec.getEventHandler(); if (handlerSpec == null) { - return null; + // if no handler is specified, return an EventHandler with no-op + return new EventHandler() {}; } Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName()); Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass), "Class {} does not implements {}", handlerClass, EventHandler.class.getName()); - return Instances.newInstance((Class<? extends EventHandler>) handlerClass); + final EventHandler delegate = Instances.newInstance((Class<? extends EventHandler>) handlerClass); + // wrap all calls to the delegate EventHandler methods except initialize so that all errors will be caught + return new EventHandler() { + + @Override + public void initialize(EventHandlerContext context) { + delegate.initialize(context); + } + + @Override + public void started() { + try { + delegate.started(); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.started()", delegate.getClass().getName(), t); + } + } + + @Override + public void containerLaunched(String runnableName, int instanceId, String containerId) { + try { + delegate.containerLaunched(runnableName, instanceId, containerId); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.containerLaunched(String, int, String)", + delegate.getClass().getName(), t); + } + } + + @Override + public void containerStopped(String runnableName, int instanceId, String containerId, int exitStatus) { + try { + delegate.containerStopped(runnableName, instanceId, containerId, exitStatus); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.containerStopped(String, int, String, int)", + delegate.getClass().getName(), t); + } + } + + @Override + public void completed() { + try { + delegate.completed(); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.completed()", delegate.getClass().getName(), t); + } + } + + @Override + public void killed() { + try { + delegate.killed(); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.killed()", delegate.getClass().getName(), t); + } + } + + @Override + public void aborted() { + try { + delegate.aborted(); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.aborted()", delegate.getClass().getName(), t); + } + } + + @Override + public void destroy() { + try { + delegate.destroy(); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.destroy()", delegate.getClass().getName(), t); + } + } + + @Override + public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) { + try { + return delegate.launchTimeout(timeoutEvents); + } catch (Throwable t) { + LOG.warn("Exception raised when calling {}.launchTimeout(Iterable<TimeoutEvent>)", + delegate.getClass().getName(), t); + } + // call super.launchTimeout in case of any errors from the delegate + return super.launchTimeout(timeoutEvents); + } + }; } private RunningContainers createRunningContainers(ContainerId appMasterContainerId, @@ -203,8 +297,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp maxHeapMemoryMB, appMasterHost, null); String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString(); - return new RunningContainers(appId, appMasterResources, zkClient, applicationLocation, - twillSpec.getRunnables(), twillRuntimeSpec.getMaxRetries()); + return new RunningContainers(twillRuntimeSpec, appId, appMasterResources, zkClient, applicationLocation, + twillSpec.getRunnables(), eventHandler); } @Override @@ -218,9 +312,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec)); // initialize the event handler, if it fails, it will fail the application. - if (eventHandler != null) { - eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler())); - } + eventHandler.initialize(new BasicEventHandlerContext(twillRuntimeSpec)); + // call event handler started. + eventHandler.started(); instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger")); @@ -237,15 +331,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp LOG.info("Stop application master with spec: {}", TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec)); - if (eventHandler != null) { - try { - // call event handler destroy. If there is error, only log and not affected stop sequence. - eventHandler.destroy(); - } catch (Throwable t) { - LOG.warn("Exception when calling {}.destroy()", eventHandler.getClass().getName(), t); - } - } - instanceChangeExecutor.shutdownNow(); // For checking if all containers are stopped. @@ -289,6 +374,24 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // Since all the runnables are now stopped, it is okay to stop the poller. stopPoller.shutdownNow(); cleanupDir(); + if (stopStatus == null) { + // if finalStatus is not set, the application must be stopped by a SystemMessages#STOP_COMMAND + eventHandler.killed(); + } else { + switch (stopStatus) { + case COMPLETED: + eventHandler.completed(); + break; + case ABORTED: + eventHandler.aborted(); + break; + default: + // should never reach here + LOG.error("Unsupported FinalStatus '{}'", stopStatus.name()); + } + } + // call event handler destroy + eventHandler.destroy(); } @Override @@ -392,6 +495,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // Looks for containers requests. if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) { LOG.info("All containers completed. Shutting down application master."); + stopStatus = StopStatus.COMPLETED; break; } @@ -520,17 +624,18 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } } - if (!timeoutEvents.isEmpty() && eventHandler != null) { + if (!timeoutEvents.isEmpty()) { + EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents); try { - EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents); if (action.getTimeout() < 0) { // Abort application + stopStatus = StopStatus.ABORTED; stop(); } else { return nextTimeoutCheck + action.getTimeout(); } } catch (Throwable t) { - LOG.warn("Exception when calling EventHandler {}. Ignore the result.", t); + LOG.warn("Exception when handling TimeoutAction.", t); } } return nextTimeoutCheck + Constants.PROVISION_TIMEOUT; http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java index 1769910..1d5c4e7 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java @@ -19,16 +19,32 @@ package org.apache.twill.internal.appmaster; import org.apache.twill.api.EventHandlerContext; import org.apache.twill.api.EventHandlerSpecification; +import org.apache.twill.api.RunId; +import org.apache.twill.internal.TwillRuntimeSpecification; /** * */ final class BasicEventHandlerContext implements EventHandlerContext { + private final String applicationName; + private final RunId runId; private final EventHandlerSpecification specification; - BasicEventHandlerContext(EventHandlerSpecification specification) { - this.specification = specification; + public BasicEventHandlerContext(TwillRuntimeSpecification twillRuntimeSpec) { + this.applicationName = twillRuntimeSpec.getTwillAppName(); + this.runId = twillRuntimeSpec.getTwillAppRunId(); + this.specification = twillRuntimeSpec.getTwillSpecification().getEventHandler(); + } + + @Override + public String getApplicationName() { + return applicationName; + } + + @Override + public RunId getRunId() { + return runId; } @Override http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java index c85c372..f869515 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java @@ -38,6 +38,7 @@ import com.google.common.util.concurrent.Service; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.twill.api.EventHandler; import org.apache.twill.api.ResourceReport; import org.apache.twill.api.RunId; import org.apache.twill.api.RuntimeSpecification; @@ -53,6 +54,7 @@ import org.apache.twill.internal.DefaultTwillRunResources; import org.apache.twill.internal.RunIds; import org.apache.twill.internal.TwillContainerController; import org.apache.twill.internal.TwillContainerLauncher; +import org.apache.twill.internal.TwillRuntimeSpecification; import org.apache.twill.internal.container.TwillContainerMain; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.state.SystemMessages; @@ -116,9 +118,12 @@ final class RunningContainers { private final Map<String, Map<String, String>> logLevels; private final Map<String, Integer> maxRetries; private final Map<String, Map<Integer, AtomicInteger>> numRetries; + private final EventHandler eventHandler; - RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient, - Location applicationLocation, Map<String, RuntimeSpecification> runnables, Map<String, Integer> maxRetriesMap) { + RunningContainers(TwillRuntimeSpecification twillRuntimeSpec, String appId, TwillRunResources appMasterResources, + ZKClient zookeeperClient, Location applicationLocation, + Map<String, RuntimeSpecification> runnables, + EventHandler eventHandler) { containers = HashBasedTable.create(); runnableInstances = Maps.newHashMap(); completedContainerCount = Maps.newHashMap(); @@ -131,8 +136,9 @@ final class RunningContainers { this.applicationLocation = applicationLocation; this.runnableNames = runnables.keySet(); this.logLevels = new TreeMap<>(); - this.maxRetries = Maps.newHashMap(maxRetriesMap); + this.maxRetries = Maps.newHashMap(twillRuntimeSpec.getMaxRetries()); this.numRetries = Maps.newHashMap(); + this.eventHandler = eventHandler; } /** @@ -172,8 +178,9 @@ final class RunningContainers { if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) { startSequence.addLast(runnableName); } - containerChange.signalAll(); - + containerChange.signalAll(); + // call event handler containerLaunched. + eventHandler.containerLaunched(runnableName, instanceId, containerInfo.getId()); } finally { containerLock.unlock(); } @@ -267,6 +274,7 @@ final class RunningContainers { resourceReport.removeRunnableResources(runnableName, containerId); containerChange.signalAll(); + eventHandler.containerStopped(runnableName, instanceId, containerId, ContainerExitCodes.ABORTED); } } finally { containerLock.unlock(); @@ -401,6 +409,20 @@ final class RunningContainers { // When we acquire this lock, all stopped runnables should have been cleaned up by handleCompleted() method containerLock.lock(); try { + for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) { + String runnableName = entry.getKey(); + Collection<ContainerInfo> containerInfos = containerStats.get(runnableName); + for (Map.Entry<String, TwillContainerController> containerControllerEntry : entry.getValue().entrySet()) { + for (ContainerInfo containerInfo : containerInfos) { + if (containerInfo.getId().equals(containerControllerEntry.getKey())) { + // Only call eventHandler.containerStopped if container is not removed by handleCompleted + eventHandler.containerStopped(runnableName, containerControllerEntry.getValue().getInstanceId(), + containerControllerEntry.getKey(), ContainerExitCodes.ABORTED); + break; + } + } + } + } containers.clear(); runnableInstances.clear(); numRetries.clear(); @@ -468,6 +490,7 @@ final class RunningContainers { // TODO: won't they get added back when the container is re-requested? removeInstanceId(runnableName, controller.getInstanceId()); resourceReport.removeRunnableResources(runnableName, containerId); + eventHandler.containerStopped(runnableName, instanceId, containerId, exitStatus); } http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java new file mode 100644 index 0000000..bd2b245 --- /dev/null +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.yarn; + +import com.google.common.base.Joiner; +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.EventHandler; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.PrinterLogHandler; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests {@link EventHandler} methods + */ +public final class EventHandlerTestRun extends BaseYarnTest { + private static final Logger LOG = LoggerFactory.getLogger(EventHandlerTestRun.class); + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + public static final String PARENT_FOLDER = "parent_folder"; + public static final String STARTED_FILE = "started_file"; + public static final String RUN_FILE = "run_file"; + public static final String CONTAINER_LAUNCHED_FOLDER = "launched_folder"; + public static final String CONTAINER_STOPPED_FOLDER = "stopped_folder"; + public static final String COMPLETED_FILE = "completed_file"; + public static final String KILLED_FILE = "killed_file"; + public static final String ABORTED_FILE = "aborted_file"; + + @Test + public void testComplete() throws InterruptedException, ExecutionException, TimeoutException, IOException { + // Create a parent folder to be written by EventHandler + File parentFolder = TMP_FOLDER.newFolder(); + parentFolder.setWritable(true, false); + TwillController controller = getTwillRunner().prepare(new CompleteApplication(parentFolder.getAbsolutePath())) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .withApplicationArguments(parentFolder.getAbsolutePath()) + .start(); + + // Wait for the app to complete within 120 seconds. + controller.awaitTerminated(120, TimeUnit.SECONDS); + // EventHandler#started() method should be called to create a file + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), STARTED_FILE).exists()); + // CompleteRunnable#run() method should be called to create a file after EventHandler#started() method is called + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), RUN_FILE).exists()); + // EventHandler#containerLaunched(String, int, String) method should be called to create a folder + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), CONTAINER_LAUNCHED_FOLDER).exists()); + // EventHandler#containerStopped(String, int, String, int) method should be called to create a folder + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), CONTAINER_STOPPED_FOLDER).exists()); + // EventHandler#completed() method should be called to create a file + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), COMPLETED_FILE).exists()); + // EventHandler#killed() method should not be called + Assert.assertFalse(new File(parentFolder.getAbsolutePath(), KILLED_FILE).exists()); + // EventHandler#aborted() method should not be called + Assert.assertFalse(new File(parentFolder.getAbsolutePath(), ABORTED_FILE).exists()); + // Assert that containerLaunched and containerStopped are called for the same containers + // for the same number of times + String[] containerLaunchedFiles = new File(parentFolder.getAbsolutePath(), CONTAINER_LAUNCHED_FOLDER).list(); + String[] containerStoppedFiles = new File(parentFolder.getAbsolutePath(), CONTAINER_STOPPED_FOLDER).list(); + Assert.assertEquals(containerLaunchedFiles.length, containerStoppedFiles.length); + Assert.assertTrue(Arrays.asList(containerLaunchedFiles).containsAll(Arrays.asList(containerStoppedFiles))); + } + + @Test + public void testKilled() throws IOException, InterruptedException, TimeoutException, ExecutionException { + // Create a parent folder to be written by EventHandler + File parentFolder = TMP_FOLDER.newFolder(); + parentFolder.setWritable(true, false); + TwillController controller = getTwillRunner().prepare(new SleepApplication(parentFolder.getAbsolutePath())) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .start(); + // Wait for the runnable to run and create runFile within 120 secs + File runFile = new File(parentFolder, RUN_FILE); + Stopwatch stopwatch = new Stopwatch().start(); + while (!runFile.exists() && stopwatch.elapsedTime(TimeUnit.SECONDS) < 120) { + TimeUnit.SECONDS.sleep(1); + } + Assert.assertTrue(runFile.exists()); + // Terminate the app once the runnable runs + controller.terminate(); + controller.awaitTerminated(120, TimeUnit.SECONDS); + // EventHandler#killed() method should be called to create a file + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), KILLED_FILE).exists()); + // EventHandler#completed() method should not be called + Assert.assertFalse(new File(parentFolder.getAbsolutePath(), COMPLETED_FILE).exists()); + // EventHandler#aborted() method should not be called + Assert.assertFalse(new File(parentFolder.getAbsolutePath(), ABORTED_FILE).exists()); + } + + private static void createFile(String parentPath, String childPath) { + try { + new File(parentPath, childPath).createNewFile(); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + /** + * The handler for testing timeout handling. + */ + public static final class Handler extends EventHandler { + + private String parentFolderPath; + + public Handler(String parentFolderPath) { + this.parentFolderPath = parentFolderPath; + } + + @Override + protected Map<String, String> getConfigs() { + return ImmutableMap.of(PARENT_FOLDER, parentFolderPath); + } + + @Override + public void started() { + createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), STARTED_FILE); + } + + @Override + public void containerLaunched(String runnableName, int instanceId, String containerId) { + LOG.info("Launched {}#{} in container {}", runnableName, instanceId, containerId); + createContainerFile(runnableName, instanceId, containerId, CONTAINER_LAUNCHED_FOLDER); + } + + @Override + public void containerStopped(String runnableName, int instanceId, String containerId, int exitStatus) { + LOG.info("Stopped {}#{} in container {} with status {}", runnableName, instanceId, containerId, exitStatus); + createContainerFile(runnableName, instanceId, containerId, CONTAINER_STOPPED_FOLDER); + } + + private void createContainerFile(String runnableName, int instanceId, String containerId, String childFolder) { + File launchedFolder = new File(context.getSpecification().getConfigs().get(PARENT_FOLDER), childFolder); + if (!launchedFolder.exists()) { + launchedFolder.mkdirs(); + launchedFolder.setReadable(true, false); + } + createFile(launchedFolder.getAbsolutePath(), Joiner.on(":").join(runnableName, instanceId, containerId)); + } + + @Override + public void completed() { + createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), COMPLETED_FILE); + } + + @Override + public void killed() { + createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), KILLED_FILE); + } + + @Override + public void aborted() { + createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), ABORTED_FILE); + } + } + + /** + * Testing application with completed run. + */ + public static final class CompleteApplication implements TwillApplication { + + private final String parentFolderPath; + + public CompleteApplication(String parentFolderPath) { + this.parentFolderPath = parentFolderPath; + } + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName("CompleteApplication") + .withRunnable() + .add(new CompleteRunnable()) + .noLocalFiles() + .anyOrder() + .withEventHandler(new Handler(parentFolderPath)) + .build(); + } + } + + /** + * A runnable that creates a file in run method only if the file created by {@link Handler#started()} exists. + */ + public static final class CompleteRunnable extends AbstractTwillRunnable { + + @Override + public void run() { + try { + File startedFile = new File(getContext().getApplicationArguments()[0], STARTED_FILE); + // CompleteRunnable#run() method should be called after EventHandler#started() method is called + if (startedFile.exists()) { + new File(startedFile.getParent(), RUN_FILE).createNewFile(); + } + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + + /** + * Application with a runnable that sleeps. + */ + public static final class SleepApplication implements TwillApplication { + + private final String parentFolderPath; + + public SleepApplication(String parentFolderPath) { + this.parentFolderPath = parentFolderPath; + } + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName("SleepApplication") + .withRunnable() + .add(new SleepRunnable(parentFolderPath)) + .noLocalFiles() + .anyOrder() + .withEventHandler(new Handler(parentFolderPath)) + .build(); + } + } + + /** + * A runnable that sleeps in run method. + */ + public static final class SleepRunnable extends AbstractTwillRunnable { + + private CountDownLatch stopLatch; + + public SleepRunnable() { + this.stopLatch = new CountDownLatch(1); + } + + public SleepRunnable(String parentFolderPath) { + super(ImmutableMap.of(PARENT_FOLDER, parentFolderPath, "startedFile", STARTED_FILE, + "runFile", RUN_FILE)); + } + + @Override + public void run() { + try { + createFile(getContext().getSpecification().getConfigs().get(PARENT_FOLDER), + getContext().getSpecification().getConfigs().get("runFile")); + LOG.info("runFile created"); + stopLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void stop() { + stopLatch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java index 7eb475e..6afa092 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java @@ -28,8 +28,13 @@ import org.apache.twill.api.TwillController; import org.apache.twill.api.TwillRunner; import org.apache.twill.api.TwillSpecification; import org.apache.twill.api.logging.PrinterLogHandler; +import org.junit.Assert; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; import java.io.PrintWriter; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -41,12 +46,17 @@ import java.util.concurrent.TimeoutException; * */ public final class ProvisionTimeoutTestRun extends BaseYarnTest { + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + public static final String ABORTED_FILE = "aborted_file"; @Test - public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException { + public void testProvisionTimeout() throws InterruptedException, ExecutionException, TimeoutException, IOException { TwillRunner runner = getTwillRunner(); - - TwillController controller = runner.prepare(new TimeoutApplication()) + // Create a parent folder to be written by EventHandler#aborted() + File parentFolder = TMP_FOLDER.newFolder(); + parentFolder.setWritable(true, false); + TwillController controller = runner.prepare(new TimeoutApplication(parentFolder.getAbsolutePath())) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) .start(); @@ -54,6 +64,11 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest { // Hence we give 90 seconds max time here. try { controller.awaitTerminated(90, TimeUnit.SECONDS); + // EventHandler#aborted() method should be called to create a file + Assert.assertTrue(new File(parentFolder.getAbsolutePath(), ABORTED_FILE).exists()); + String[] abortedFiles = parentFolder.list(); + Assert.assertNotNull(abortedFiles); + Assert.assertEquals(1, abortedFiles.length); } finally { // If it timeout, kill the app as cleanup. controller.kill(); @@ -65,15 +80,21 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest { */ public static final class Handler extends EventHandler { + private final String parentFolderPath; private boolean abort; + public Handler(String parentFolderPath) { + this.parentFolderPath = parentFolderPath; + } + @Override protected Map<String, String> getConfigs() { - return ImmutableMap.of("abort", "true"); + return ImmutableMap.of("abort", "true", "parentFolderPath", parentFolderPath); } @Override public void initialize(EventHandlerContext context) { + super.initialize(context); this.abort = Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort")); } @@ -85,6 +106,15 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest { return TimeoutAction.recheck(10, TimeUnit.SECONDS); } } + + @Override + public void aborted() { + try { + new File(context.getSpecification().getConfigs().get("parentFolderPath"), ABORTED_FILE).createNewFile(); + } catch (IOException e) { + Throwables.propagate(e); + } + } } /** @@ -92,6 +122,12 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest { */ public static final class TimeoutApplication implements TwillApplication { + private final String parentFolderPath; + + public TimeoutApplication(String parentFolderPath) { + this.parentFolderPath = parentFolderPath; + } + @Override public TwillSpecification configure() { return TwillSpecification.Builder.with() @@ -103,7 +139,7 @@ public final class ProvisionTimeoutTestRun extends BaseYarnTest { .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build()) .noLocalFiles() .anyOrder() - .withEventHandler(new Handler()) + .withEventHandler(new Handler(parentFolderPath)) .build(); } } http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java index 0bb7fce..9f9bb06 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java @@ -31,6 +31,7 @@ import org.junit.runners.Suite; DistributeShellTestRun.class, EchoServerTestRun.class, EnvironmentTestRun.class, + EventHandlerTestRun.class, FailureRestartTestRun.class, InitializeFailTestRun.class, JvmOptionsTestRun.class,
