Repository: twill
Updated Branches:
  refs/heads/master 5cacb8eb6 -> 3c32e1e45


Add started, containerLaunched, containerStopped, completed, killed, aborted to 
EventHandler

This closes #58 on Github

Signed-off-by: Poorna Chandra <poo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/3c32e1e4
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/3c32e1e4
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/3c32e1e4

Branch: refs/heads/master
Commit: 3c32e1e457cdb168cc65e104e86cf3fba8d2c372
Parents: 5cacb8e
Author: Chengfeng <m...@cask.co>
Authored: Tue Jul 25 12:53:26 2017 -0700
Committer: Poorna Chandra <poo...@apache.org>
Committed: Wed Aug 9 18:52:52 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/twill/api/EventHandler.java |  60 +++-
 .../apache/twill/api/EventHandlerContext.java   |  13 +
 .../appmaster/ApplicationMasterService.java     | 147 ++++++++--
 .../appmaster/BasicEventHandlerContext.java     |  20 +-
 .../internal/appmaster/RunningContainers.java   |  33 ++-
 .../apache/twill/yarn/EventHandlerTestRun.java  | 290 +++++++++++++++++++
 .../twill/yarn/ProvisionTimeoutTestRun.java     |  46 ++-
 .../org/apache/twill/yarn/YarnTestSuite.java    |   1 +
 8 files changed, 575 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-api/src/main/java/org/apache/twill/api/EventHandler.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/EventHandler.java 
b/twill-api/src/main/java/org/apache/twill/api/EventHandler.java
index a19c0b3..45a353e 100644
--- a/twill-api/src/main/java/org/apache/twill/api/EventHandler.java
+++ b/twill-api/src/main/java/org/apache/twill/api/EventHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.twill.api;
 
+import org.apache.twill.internal.Constants;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -124,6 +126,58 @@ public abstract class EventHandler {
   }
 
   /**
+   * Invoked by the application when it starts.
+   */
+  public void started() {
+    // No-op
+  }
+
+  /**
+   * Invoked by the application when new container is launched for a {@link 
TwillRunnable}.
+   *
+   * @param runnableName name of the runnable to be run in the new container
+   * @param instanceId the instance ID of the runnable instance to be run in 
the new container
+   * @param containerId the ID of the newly launched container
+   */
+  public void containerLaunched(String runnableName, int instanceId, String 
containerId) {
+    // No-op
+  }
+
+  /**
+   * Invoked by the application when the container allocated for a {@link 
TwillRunnable} is stopped.
+   *
+   * @param runnableName name of the runnable in the stopped container
+   * @param instanceId the instance ID of the runnable instance run in the 
stopped container
+   * @param containerId the ID of the stopped container
+   * @param exitStatus the exit status of the container
+   */
+  public void containerStopped(String runnableName, int instanceId, String 
containerId, int exitStatus) {
+    // No-op
+  }
+
+  /**
+   * Invoked by the application when all containers complete.
+   */
+  public void completed() {
+    // No-op
+  }
+
+  /**
+   * Invoked by the application when stop command is received to kill the 
current application.
+   *
+   */
+  public void killed() {
+    // No-op
+  }
+
+  /**
+   * Invoked by the application when the application is aborted because of 
timeout.
+   */
+  public void aborted() {
+    // No-op
+  }
+
+  /**
    * Invoked by the application when shutting down.
    */
   public void destroy() {
@@ -133,9 +187,11 @@ public abstract class EventHandler {
   /**
    * Invoked when the number of expected instances doesn't match with number 
of actual instances.
    * @param timeoutEvents An Iterable of {@link TimeoutEvent} that contains 
information about runnable launch timeout.
-   * @return A {@link TimeoutAction} to govern action to act.
+   * @return A {@link TimeoutAction} to govern action to act. By default it is 
to recheck after 60 seconds.
    */
-  public abstract TimeoutAction launchTimeout(Iterable<TimeoutEvent> 
timeoutEvents);
+  public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) {
+    return new TimeoutAction(Constants.PROVISION_TIMEOUT);
+  }
 
   /**
    * Returns set of configurations available at runtime for access.

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java
----------------------------------------------------------------------
diff --git 
a/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java 
b/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java
index 8e58af6..09429de 100644
--- a/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java
+++ b/twill-api/src/main/java/org/apache/twill/api/EventHandlerContext.java
@@ -22,5 +22,18 @@ package org.apache.twill.api;
  */
 public interface EventHandlerContext {
 
+  /**
+   * @return Name of the {@link TwillApplication} containing the {@link 
EventHandler}
+   */
+  String getApplicationName();
+
+  /**
+   * @return the unique ID of the current run of the {@link TwillApplication} 
containing the {@link EventHandler}
+   */
+  RunId getRunId();
+
+  /**
+   * @return {@link EventHandlerSpecification} of the {@link EventHandler}
+   */
   EventHandlerSpecification getSpecification();
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 4917f4d..6fc31f5 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.twill.api.Command;
 import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerContext;
 import org.apache.twill.api.EventHandlerSpecification;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.api.ResourceReport;
@@ -107,6 +108,13 @@ import javax.annotation.Nullable;
  * The class that acts as {@code ApplicationMaster} for Twill applications.
  */
 public final class ApplicationMasterService extends AbstractYarnTwillService 
implements Supplier<ResourceReport> {
+  /**
+   * Final status of this service when it stops.
+   */
+  private enum StopStatus {
+    COMPLETED, // All containers complete
+    ABORTED // Aborted because of timeout
+  }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationMasterService.class);
   private static final Gson GSON = new GsonBuilder()
@@ -131,6 +139,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   private final Map<String, Map<String, String>> environments;
   private final TwillRuntimeSpecification twillRuntimeSpec;
 
+  private volatile StopStatus stopStatus;
   private volatile boolean stopped;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
@@ -157,8 +166,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
                                                         
twillRuntimeSpec.getKafkaZKConnect());
 
     this.expectedContainers = new ExpectedContainers(twillSpec);
-    this.runningContainers = 
createRunningContainers(amClient.getContainerId(), amClient.getHost());
     this.eventHandler = createEventHandler(twillSpec);
+    this.runningContainers = 
createRunningContainers(amClient.getContainerId(), amClient.getHost());
   }
 
   private JvmOptions loadJvmOptions() throws IOException {
@@ -172,19 +181,104 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   }
 
   @SuppressWarnings("unchecked")
-  @Nullable
   private EventHandler createEventHandler(TwillSpecification twillSpec) throws 
ClassNotFoundException {
     // Should be able to load by this class ClassLoader, as they packaged in 
the same jar.
     EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
     if (handlerSpec == null) {
-      return null;
+      // if no handler is specified, return an EventHandler with no-op
+      return new EventHandler() {};
     }
 
     Class<?> handlerClass = 
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
     
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
                                 "Class {} does not implements {}",
                                 handlerClass, EventHandler.class.getName());
-    return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
+    final EventHandler delegate = Instances.newInstance((Class<? extends 
EventHandler>) handlerClass);
+    // wrap all calls to the delegate EventHandler methods except initialize 
so that all errors will be caught
+    return new EventHandler() {
+
+      @Override
+      public void initialize(EventHandlerContext context) {
+        delegate.initialize(context);
+      }
+
+      @Override
+      public void started() {
+        try {
+          delegate.started();
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.started()", 
delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void containerLaunched(String runnableName, int instanceId, 
String containerId) {
+        try {
+          delegate.containerLaunched(runnableName, instanceId, containerId);
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.containerLaunched(String, 
int, String)",
+                   delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void containerStopped(String runnableName, int instanceId, String 
containerId, int exitStatus) {
+        try {
+          delegate.containerStopped(runnableName, instanceId, containerId, 
exitStatus);
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.containerStopped(String, 
int, String, int)",
+                   delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void completed() {
+        try {
+          delegate.completed();
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.completed()", 
delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void killed() {
+        try {
+          delegate.killed();
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.killed()", 
delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void aborted() {
+        try {
+          delegate.aborted();
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.aborted()", 
delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public void destroy() {
+        try {
+          delegate.destroy();
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling {}.destroy()", 
delegate.getClass().getName(), t);
+        }
+      }
+
+      @Override
+      public TimeoutAction launchTimeout(Iterable<TimeoutEvent> timeoutEvents) 
{
+        try {
+          return delegate.launchTimeout(timeoutEvents);
+        } catch (Throwable t) {
+          LOG.warn("Exception raised when calling 
{}.launchTimeout(Iterable<TimeoutEvent>)",
+                   delegate.getClass().getName(), t);
+        }
+        // call super.launchTimeout in case of any errors from the delegate
+        return super.launchTimeout(timeoutEvents);
+      }
+    };
   }
 
   private RunningContainers createRunningContainers(ContainerId 
appMasterContainerId,
@@ -203,8 +297,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       maxHeapMemoryMB,
       appMasterHost, null);
     String appId = 
appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
-    return new RunningContainers(appId, appMasterResources, zkClient, 
applicationLocation,
-      twillSpec.getRunnables(), twillRuntimeSpec.getMaxRetries());
+    return new RunningContainers(twillRuntimeSpec, appId, appMasterResources, 
zkClient, applicationLocation,
+                                 twillSpec.getRunnables(), eventHandler);
   }
 
   @Override
@@ -218,9 +312,9 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec));
 
     // initialize the event handler, if it fails, it will fail the application.
-    if (eventHandler != null) {
-      eventHandler.initialize(new 
BasicEventHandlerContext(twillSpec.getEventHandler()));
-    }
+    eventHandler.initialize(new BasicEventHandlerContext(twillRuntimeSpec));
+    // call event handler started.
+    eventHandler.started();
 
     instanceChangeExecutor = 
Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
@@ -237,15 +331,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     LOG.info("Stop application master with spec: {}",
              
TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec));
 
-    if (eventHandler != null) {
-      try {
-        // call event handler destroy. If there is error, only log and not 
affected stop sequence.
-        eventHandler.destroy();
-      } catch (Throwable t) {
-        LOG.warn("Exception when calling {}.destroy()", 
eventHandler.getClass().getName(), t);
-      }
-    }
-
     instanceChangeExecutor.shutdownNow();
 
     // For checking if all containers are stopped.
@@ -289,6 +374,24 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     // Since all the runnables are now stopped, it is okay to stop the poller.
     stopPoller.shutdownNow();
     cleanupDir();
+    if (stopStatus == null) {
+      // if finalStatus is not set, the application must be stopped by a 
SystemMessages#STOP_COMMAND
+      eventHandler.killed();
+    } else {
+      switch (stopStatus) {
+        case COMPLETED:
+          eventHandler.completed();
+          break;
+        case ABORTED:
+          eventHandler.aborted();
+          break;
+        default:
+          // should never reach here
+          LOG.error("Unsupported FinalStatus '{}'", stopStatus.name());
+      }
+    }
+    // call event handler destroy
+    eventHandler.destroy();
   }
 
   @Override
@@ -392,6 +495,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       // Looks for containers requests.
       if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && 
runningContainers.isEmpty()) {
         LOG.info("All containers completed. Shutting down application 
master.");
+        stopStatus = StopStatus.COMPLETED;
         break;
       }
 
@@ -520,17 +624,18 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       }
     }
 
-    if (!timeoutEvents.isEmpty() && eventHandler != null) {
+    if (!timeoutEvents.isEmpty()) {
+      EventHandler.TimeoutAction action = 
eventHandler.launchTimeout(timeoutEvents);
       try {
-        EventHandler.TimeoutAction action = 
eventHandler.launchTimeout(timeoutEvents);
         if (action.getTimeout() < 0) {
           // Abort application
+          stopStatus = StopStatus.ABORTED;
           stop();
         } else {
           return nextTimeoutCheck + action.getTimeout();
         }
       } catch (Throwable t) {
-        LOG.warn("Exception when calling EventHandler {}. Ignore the result.", 
t);
+        LOG.warn("Exception when handling TimeoutAction.", t);
       }
     }
     return nextTimeoutCheck + Constants.PROVISION_TIMEOUT;

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
index 1769910..1d5c4e7 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
@@ -19,16 +19,32 @@ package org.apache.twill.internal.appmaster;
 
 import org.apache.twill.api.EventHandlerContext;
 import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.RunId;
+import org.apache.twill.internal.TwillRuntimeSpecification;
 
 /**
  *
  */
 final class BasicEventHandlerContext implements EventHandlerContext {
 
+  private final String applicationName;
+  private final RunId runId;
   private final EventHandlerSpecification specification;
 
-  BasicEventHandlerContext(EventHandlerSpecification specification) {
-    this.specification = specification;
+  public BasicEventHandlerContext(TwillRuntimeSpecification twillRuntimeSpec) {
+    this.applicationName = twillRuntimeSpec.getTwillAppName();
+    this.runId = twillRuntimeSpec.getTwillAppRunId();
+    this.specification = 
twillRuntimeSpec.getTwillSpecification().getEventHandler();
+  }
+
+  @Override
+  public String getApplicationName() {
+    return applicationName;
+  }
+
+  @Override
+  public RunId getRunId() {
+    return runId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index c85c372..f869515 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.Service;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.twill.api.EventHandler;
 import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
@@ -53,6 +54,7 @@ import org.apache.twill.internal.DefaultTwillRunResources;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.TwillContainerController;
 import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.SystemMessages;
@@ -116,9 +118,12 @@ final class RunningContainers {
   private final Map<String, Map<String, String>> logLevels;
   private final Map<String, Integer> maxRetries;
   private final Map<String, Map<Integer, AtomicInteger>> numRetries;
+  private final EventHandler eventHandler;
 
-  RunningContainers(String appId, TwillRunResources appMasterResources, 
ZKClient zookeeperClient,
-    Location applicationLocation, Map<String, RuntimeSpecification> runnables, 
Map<String, Integer> maxRetriesMap) {
+  RunningContainers(TwillRuntimeSpecification twillRuntimeSpec, String appId, 
TwillRunResources appMasterResources,
+                    ZKClient zookeeperClient, Location applicationLocation,
+                    Map<String, RuntimeSpecification> runnables,
+                    EventHandler eventHandler) {
     containers = HashBasedTable.create();
     runnableInstances = Maps.newHashMap();
     completedContainerCount = Maps.newHashMap();
@@ -131,8 +136,9 @@ final class RunningContainers {
     this.applicationLocation = applicationLocation;
     this.runnableNames = runnables.keySet();
     this.logLevels = new TreeMap<>();
-    this.maxRetries = Maps.newHashMap(maxRetriesMap);
+    this.maxRetries = Maps.newHashMap(twillRuntimeSpec.getMaxRetries());
     this.numRetries = Maps.newHashMap();
+    this.eventHandler = eventHandler;
   }
 
   /**
@@ -172,8 +178,9 @@ final class RunningContainers {
       if (startSequence.isEmpty() || 
!runnableName.equals(startSequence.peekLast())) {
         startSequence.addLast(runnableName);
       }
-      containerChange.signalAll();
-
+      containerChange.signalAll();       
+      // call event handler containerLaunched. 
+      eventHandler.containerLaunched(runnableName, instanceId, 
containerInfo.getId());
     } finally {
       containerLock.unlock();
     }
@@ -267,6 +274,7 @@ final class RunningContainers {
 
         resourceReport.removeRunnableResources(runnableName, containerId);
         containerChange.signalAll();
+        eventHandler.containerStopped(runnableName, instanceId, containerId, 
ContainerExitCodes.ABORTED);
       }
     } finally {
       containerLock.unlock();
@@ -401,6 +409,20 @@ final class RunningContainers {
     // When we acquire this lock, all stopped runnables should have been 
cleaned up by handleCompleted() method
     containerLock.lock();
     try {
+      for (Map.Entry<String, Map<String, TwillContainerController>> entry : 
containers.rowMap().entrySet()) {
+        String runnableName = entry.getKey();
+        Collection<ContainerInfo> containerInfos = 
containerStats.get(runnableName);
+        for (Map.Entry<String, TwillContainerController> 
containerControllerEntry : entry.getValue().entrySet()) {
+          for (ContainerInfo containerInfo : containerInfos) {
+            if 
(containerInfo.getId().equals(containerControllerEntry.getKey())) {
+              // Only call eventHandler.containerStopped if container is not 
removed by handleCompleted
+              eventHandler.containerStopped(runnableName, 
containerControllerEntry.getValue().getInstanceId(),
+                                            containerControllerEntry.getKey(), 
ContainerExitCodes.ABORTED);
+              break;
+            }
+          }
+        }
+      }
       containers.clear();
       runnableInstances.clear();
       numRetries.clear();
@@ -468,6 +490,7 @@ final class RunningContainers {
         // TODO: won't they get added back when the container is re-requested?
         removeInstanceId(runnableName, controller.getInstanceId());
         resourceReport.removeRunnableResources(runnableName, containerId);
+        eventHandler.containerStopped(runnableName, instanceId, containerId, 
exitStatus);
       }
       
 

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java
new file mode 100644
index 0000000..bd2b245
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests {@link EventHandler} methods
+ */
+public final class EventHandlerTestRun extends BaseYarnTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(EventHandlerTestRun.class);
+
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+  public static final String PARENT_FOLDER = "parent_folder";
+  public static final String STARTED_FILE = "started_file";
+  public static final String RUN_FILE = "run_file";
+  public static final String CONTAINER_LAUNCHED_FOLDER = "launched_folder";
+  public static final String CONTAINER_STOPPED_FOLDER = "stopped_folder";
+  public static final String COMPLETED_FILE = "completed_file";
+  public static final String KILLED_FILE = "killed_file";
+  public static final String ABORTED_FILE = "aborted_file";
+
+  @Test
+  public void testComplete() throws InterruptedException, ExecutionException, 
TimeoutException, IOException {
+    // Create a parent folder to be written by EventHandler
+    File parentFolder = TMP_FOLDER.newFolder();
+    parentFolder.setWritable(true, false);
+    TwillController controller = getTwillRunner().prepare(new 
CompleteApplication(parentFolder.getAbsolutePath()))
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .withApplicationArguments(parentFolder.getAbsolutePath())
+      .start();
+
+    // Wait for the app to complete within 120 seconds.
+    controller.awaitTerminated(120, TimeUnit.SECONDS);
+    // EventHandler#started() method should be called to create a file
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
STARTED_FILE).exists());
+    // CompleteRunnable#run() method should be called to create a file after 
EventHandler#started() method is called
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
RUN_FILE).exists());
+    // EventHandler#containerLaunched(String, int, String) method should be 
called to create a folder
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
CONTAINER_LAUNCHED_FOLDER).exists());
+    // EventHandler#containerStopped(String, int, String, int) method should 
be called to create a folder
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
CONTAINER_STOPPED_FOLDER).exists());
+    // EventHandler#completed() method should be called to create a file
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
COMPLETED_FILE).exists());
+    // EventHandler#killed() method should not be called
+    Assert.assertFalse(new File(parentFolder.getAbsolutePath(), 
KILLED_FILE).exists());
+    // EventHandler#aborted() method should not be called
+    Assert.assertFalse(new File(parentFolder.getAbsolutePath(), 
ABORTED_FILE).exists());
+    // Assert that containerLaunched and containerStopped are called for the 
same containers
+    // for the same number of times
+    String[] containerLaunchedFiles = new File(parentFolder.getAbsolutePath(), 
CONTAINER_LAUNCHED_FOLDER).list();
+    String[] containerStoppedFiles = new File(parentFolder.getAbsolutePath(), 
CONTAINER_STOPPED_FOLDER).list();
+    Assert.assertEquals(containerLaunchedFiles.length, 
containerStoppedFiles.length);
+    
Assert.assertTrue(Arrays.asList(containerLaunchedFiles).containsAll(Arrays.asList(containerStoppedFiles)));
+  }
+
+  @Test
+  public void testKilled() throws IOException, InterruptedException, 
TimeoutException, ExecutionException {
+    // Create a parent folder to be written by EventHandler
+    File parentFolder = TMP_FOLDER.newFolder();
+    parentFolder.setWritable(true, false);
+    TwillController controller = getTwillRunner().prepare(new 
SleepApplication(parentFolder.getAbsolutePath()))
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+    // Wait for the runnable to run and create runFile within 120 secs
+    File runFile = new File(parentFolder, RUN_FILE);
+    Stopwatch stopwatch = new Stopwatch().start();
+    while (!runFile.exists() && stopwatch.elapsedTime(TimeUnit.SECONDS) < 120) 
{
+      TimeUnit.SECONDS.sleep(1);
+    }
+    Assert.assertTrue(runFile.exists());
+    // Terminate the app once the runnable runs
+    controller.terminate();
+    controller.awaitTerminated(120, TimeUnit.SECONDS);
+    // EventHandler#killed() method should be called to create a file
+    Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
KILLED_FILE).exists());
+    // EventHandler#completed() method should not be called
+    Assert.assertFalse(new File(parentFolder.getAbsolutePath(), 
COMPLETED_FILE).exists());
+    // EventHandler#aborted() method should not be called
+    Assert.assertFalse(new File(parentFolder.getAbsolutePath(), 
ABORTED_FILE).exists());
+  }
+
+  private static void createFile(String parentPath, String childPath) {
+    try {
+      new File(parentPath, childPath).createNewFile();
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * The handler for testing timeout handling.
+   */
+  public static final class Handler extends EventHandler {
+
+    private String parentFolderPath;
+
+    public Handler(String parentFolderPath) {
+      this.parentFolderPath = parentFolderPath;
+    }
+
+    @Override
+    protected Map<String, String> getConfigs() {
+      return ImmutableMap.of(PARENT_FOLDER, parentFolderPath);
+    }
+
+    @Override
+    public void started() {
+      createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), 
STARTED_FILE);
+    }
+
+    @Override
+    public void containerLaunched(String runnableName, int instanceId, String 
containerId) {
+      LOG.info("Launched {}#{} in container {}", runnableName, instanceId, 
containerId);
+      createContainerFile(runnableName, instanceId, containerId, 
CONTAINER_LAUNCHED_FOLDER);
+    }
+
+    @Override
+    public void containerStopped(String runnableName, int instanceId, String 
containerId, int exitStatus) {
+      LOG.info("Stopped {}#{} in container {} with status {}", runnableName, 
instanceId, containerId, exitStatus);
+      createContainerFile(runnableName, instanceId, containerId, 
CONTAINER_STOPPED_FOLDER);
+    }
+
+    private void createContainerFile(String runnableName, int instanceId, 
String containerId, String childFolder) {
+      File launchedFolder = new 
File(context.getSpecification().getConfigs().get(PARENT_FOLDER), childFolder);
+      if (!launchedFolder.exists()) {
+        launchedFolder.mkdirs();
+        launchedFolder.setReadable(true, false);
+      }
+      createFile(launchedFolder.getAbsolutePath(), 
Joiner.on(":").join(runnableName, instanceId, containerId));
+    }
+
+    @Override
+    public void completed() {
+      createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), 
COMPLETED_FILE);
+    }
+
+    @Override
+    public void killed() {
+      createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), 
KILLED_FILE);
+    }
+
+    @Override
+    public void aborted() {
+      createFile(context.getSpecification().getConfigs().get(PARENT_FOLDER), 
ABORTED_FILE);
+    }
+  }
+
+  /**
+   * Testing application with completed run.
+   */
+  public static final class CompleteApplication implements TwillApplication {
+
+    private final String parentFolderPath;
+
+    public CompleteApplication(String parentFolderPath) {
+      this.parentFolderPath = parentFolderPath;
+    }
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("CompleteApplication")
+        .withRunnable()
+        .add(new CompleteRunnable())
+        .noLocalFiles()
+        .anyOrder()
+        .withEventHandler(new Handler(parentFolderPath))
+        .build();
+    }
+  }
+
+  /**
+   * A runnable that creates a file in run method only if the file created by 
{@link Handler#started()} exists.
+   */
+  public static final class CompleteRunnable extends AbstractTwillRunnable {
+
+    @Override
+    public void run() {
+      try {
+        File startedFile = new File(getContext().getApplicationArguments()[0], 
STARTED_FILE);
+        // CompleteRunnable#run() method should be called after 
EventHandler#started() method is called
+        if (startedFile.exists()) {
+          new File(startedFile.getParent(), RUN_FILE).createNewFile();
+        }
+      } catch (IOException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+
+  /**
+   * Application with a runnable that sleeps.
+   */
+  public static final class SleepApplication implements TwillApplication {
+
+    private final String parentFolderPath;
+
+    public SleepApplication(String parentFolderPath) {
+      this.parentFolderPath = parentFolderPath;
+    }
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("SleepApplication")
+        .withRunnable()
+        .add(new SleepRunnable(parentFolderPath))
+        .noLocalFiles()
+        .anyOrder()
+        .withEventHandler(new Handler(parentFolderPath))
+        .build();
+    }
+  }
+
+  /**
+   * A runnable that sleeps in run method.
+   */
+  public static final class SleepRunnable extends AbstractTwillRunnable {
+
+    private CountDownLatch stopLatch;
+
+    public SleepRunnable() {
+      this.stopLatch = new CountDownLatch(1);
+    }
+
+    public SleepRunnable(String parentFolderPath) {
+      super(ImmutableMap.of(PARENT_FOLDER, parentFolderPath, "startedFile", 
STARTED_FILE,
+                            "runFile", RUN_FILE));
+    }
+
+    @Override
+    public void run() {
+      try {
+        
createFile(getContext().getSpecification().getConfigs().get(PARENT_FOLDER),
+                   
getContext().getSpecification().getConfigs().get("runFile"));
+        LOG.info("runFile created");
+        stopLatch.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    public void stop() {
+      stopLatch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
index 7eb475e..6afa092 100644
--- 
a/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
+++ 
b/twill-yarn/src/test/java/org/apache/twill/yarn/ProvisionTimeoutTestRun.java
@@ -28,8 +28,13 @@ import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.api.logging.PrinterLogHandler;
+import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -41,12 +46,17 @@ import java.util.concurrent.TimeoutException;
  *
  */
 public final class ProvisionTimeoutTestRun extends BaseYarnTest {
+  @ClassRule
+  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+  public static final String ABORTED_FILE = "aborted_file";
 
   @Test
-  public void testProvisionTimeout() throws InterruptedException, 
ExecutionException, TimeoutException {
+  public void testProvisionTimeout() throws InterruptedException, 
ExecutionException, TimeoutException, IOException {
     TwillRunner runner = getTwillRunner();
-
-    TwillController controller = runner.prepare(new TimeoutApplication())
+    // Create a parent folder to be written by EventHandler#aborted()
+    File parentFolder = TMP_FOLDER.newFolder();
+    parentFolder.setWritable(true, false);
+    TwillController controller = runner.prepare(new 
TimeoutApplication(parentFolder.getAbsolutePath()))
                                        .addLogHandler(new 
PrinterLogHandler(new PrintWriter(System.out, true)))
                                        .start();
 
@@ -54,6 +64,11 @@ public final class ProvisionTimeoutTestRun extends 
BaseYarnTest {
     // Hence we give 90 seconds max time here.
     try {
       controller.awaitTerminated(90, TimeUnit.SECONDS);
+      // EventHandler#aborted() method should be called to create a file
+      Assert.assertTrue(new File(parentFolder.getAbsolutePath(), 
ABORTED_FILE).exists());
+      String[] abortedFiles = parentFolder.list();
+      Assert.assertNotNull(abortedFiles);
+      Assert.assertEquals(1, abortedFiles.length);
     } finally {
       // If it timeout, kill the app as cleanup.
       controller.kill();
@@ -65,15 +80,21 @@ public final class ProvisionTimeoutTestRun extends 
BaseYarnTest {
    */
   public static final class Handler extends EventHandler {
 
+    private final String parentFolderPath;
     private boolean abort;
 
+    public Handler(String parentFolderPath) {
+      this.parentFolderPath = parentFolderPath;
+    }
+
     @Override
     protected Map<String, String> getConfigs() {
-      return ImmutableMap.of("abort", "true");
+      return ImmutableMap.of("abort", "true", "parentFolderPath", 
parentFolderPath);
     }
 
     @Override
     public void initialize(EventHandlerContext context) {
+      super.initialize(context);
       this.abort = 
Boolean.parseBoolean(context.getSpecification().getConfigs().get("abort"));
     }
 
@@ -85,6 +106,15 @@ public final class ProvisionTimeoutTestRun extends 
BaseYarnTest {
         return TimeoutAction.recheck(10, TimeUnit.SECONDS);
       }
     }
+
+    @Override
+    public void aborted() {
+      try {
+        new 
File(context.getSpecification().getConfigs().get("parentFolderPath"), 
ABORTED_FILE).createNewFile();
+      } catch (IOException e) {
+        Throwables.propagate(e);
+      }
+    }
   }
 
   /**
@@ -92,6 +122,12 @@ public final class ProvisionTimeoutTestRun extends 
BaseYarnTest {
    */
   public static final class TimeoutApplication implements TwillApplication {
 
+    private final String parentFolderPath;
+
+    public TimeoutApplication(String parentFolderPath) {
+      this.parentFolderPath = parentFolderPath;
+    }
+
     @Override
     public TwillSpecification configure() {
       return TwillSpecification.Builder.with()
@@ -103,7 +139,7 @@ public final class ProvisionTimeoutTestRun extends 
BaseYarnTest {
                .setMemory(8, ResourceSpecification.SizeUnit.GIGA).build())
         .noLocalFiles()
         .anyOrder()
-        .withEventHandler(new Handler())
+        .withEventHandler(new Handler(parentFolderPath))
         .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/twill/blob/3c32e1e4/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index 0bb7fce..9f9bb06 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
   DistributeShellTestRun.class,
   EchoServerTestRun.class,
   EnvironmentTestRun.class,
+  EventHandlerTestRun.class,
   FailureRestartTestRun.class,
   InitializeFailTestRun.class,
   JvmOptionsTestRun.class,

Reply via email to