Repository: incubator-reef
Updated Branches:
  refs/heads/master 74282695a -> 9c820fd55


[REEF-312] Fail_Alarm fails with assumed state RUNNING but reported state FAILED

This addressed the issue by

 * Created ResourceManagerStartHandler that the resource manager implements for 
initialization.
 * Created ResourceManagerStopHandler that the resource manager implements for 
shutdown.
 * Made all runtime packages implement the interfaces.
 * Changed DriverRuntimeStartHandler and DriverRuntimeStopHandler to use 
ResourceManagerStartHandler
and ResourceManagerStopHandler.

JIRA:
  [REEF-312] (https://issues.apache.org/jira/browse/REEF-312)

Pull Request:
  This closes #326


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9c820fd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9c820fd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9c820fd5

Branch: refs/heads/master
Commit: 9c820fd55b6c6547dd787b165008d8b366bcf79a
Parents: 7428269
Author: Kim_Geon_Woo <[email protected]>
Authored: Fri Jul 31 14:49:48 2015 +0900
Committer: Brian Cho <[email protected]>
Committed: Tue Aug 4 10:47:30 2015 +0900

----------------------------------------------------------------------
 .../driver/DriverRuntimeStartHandler.java       |  6 +++
 .../common/driver/DriverRuntimeStopHandler.java |  5 +++
 .../driver/api/ResourceManagerStartHandler.java | 30 +++++++++++++++
 .../driver/api/ResourceManagerStopHandler.java  | 30 +++++++++++++++
 .../client/HDInsightDriverConfiguration.java    |  9 ++---
 .../runtime/local/driver/ContainerManager.java  | 28 ++------------
 .../local/driver/LocalDriverConfiguration.java  |  6 +--
 .../LocalResourceManagerStartHandler.java       | 39 ++++++++++++++++++++
 .../driver/LocalResourceManagerStopHandler.java | 39 ++++++++++++++++++++
 .../mesos/driver/MesosDriverConfiguration.java  |  9 ++---
 .../mesos/driver/MesosRuntimeStartHandler.java  |  4 +-
 .../mesos/driver/MesosRuntimeStopHandler.java   |  4 +-
 .../yarn/driver/YARNRuntimeStartHandler.java    |  4 +-
 .../yarn/driver/YARNRuntimeStopHandler.java     |  4 +-
 .../yarn/driver/YarnDriverConfiguration.java    |  9 ++---
 15 files changed, 173 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
index a5e7232..3421a22 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStartHandler.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.proto.EvaluatorRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorHeartbeatHandler;
 import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorResourceManagerErrorHandler;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceManagerStatus;
@@ -43,6 +44,7 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
   private final EvaluatorResourceManagerErrorHandler 
evaluatorResourceManagerErrorHandler;
   private final EvaluatorHeartbeatHandler evaluatorHeartbeatHandler;
   private final ResourceManagerStatus resourceManagerStatus;
+  private final ResourceManagerStartHandler resourceManagerStartHandler;
   private final DriverStatusManager driverStatusManager;
 
   /**
@@ -50,6 +52,7 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
    * @param remoteManager                        the remoteManager in the 
Driver.
    * @param evaluatorResourceManagerErrorHandler This will be wired up to the 
remoteManager on onNext()
    * @param evaluatorHeartbeatHandler            This will be wired up to the 
remoteManager on onNext()
+   * @param resourceManagerStartHandler          This will initialize the 
resource manager
    * @param resourceManagerStatus                will be set to RUNNING in 
onNext()
    * @param driverStatusManager                  will be set to RUNNING in 
onNext()
    */
@@ -59,11 +62,13 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
                             final EvaluatorResourceManagerErrorHandler 
evaluatorResourceManagerErrorHandler,
                             final EvaluatorHeartbeatHandler 
evaluatorHeartbeatHandler,
                             final ResourceManagerStatus resourceManagerStatus,
+                            final ResourceManagerStartHandler 
resourceManagerStartHandler,
                             final DriverStatusManager driverStatusManager) {
     this.remoteManager = remoteManager;
     this.evaluatorResourceManagerErrorHandler = 
evaluatorResourceManagerErrorHandler;
     this.evaluatorHeartbeatHandler = evaluatorHeartbeatHandler;
     this.resourceManagerStatus = resourceManagerStatus;
+    this.resourceManagerStartHandler = resourceManagerStartHandler;
     this.driverStatusManager = driverStatusManager;
   }
 
@@ -76,6 +81,7 @@ final class DriverRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
     
this.remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, 
evaluatorResourceManagerErrorHandler);
     this.resourceManagerStatus.setRunning();
     this.driverStatusManager.onRunning();
+    this.resourceManagerStartHandler.onNext(runtimeStart);
     LOG.log(Level.FINEST, "DriverRuntimeStartHandler complete.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
index fc43d14..f580ed9 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverRuntimeStopHandler.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
 import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
 import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.util.Optional;
@@ -40,14 +41,17 @@ final class DriverRuntimeStopHandler implements 
EventHandler<RuntimeStop> {
   private static final Logger LOG = 
Logger.getLogger(DriverRuntimeStopHandler.class.getName());
 
   private final DriverStatusManager driverStatusManager;
+  private final ResourceManagerStopHandler resourceManagerStopHandler;
   private final RemoteManager remoteManager;
   private final Evaluators evaluators;
 
   @Inject
   DriverRuntimeStopHandler(final DriverStatusManager driverStatusManager,
+                           final ResourceManagerStopHandler 
resourceManagerStopHandler,
                            final RemoteManager remoteManager,
                            final Evaluators evaluators) {
     this.driverStatusManager = driverStatusManager;
+    this.resourceManagerStopHandler = resourceManagerStopHandler;
     this.remoteManager = remoteManager;
     this.evaluators = evaluators;
   }
@@ -57,6 +61,7 @@ final class DriverRuntimeStopHandler implements 
EventHandler<RuntimeStop> {
     LOG.log(Level.FINEST, "RuntimeStop: {0}", runtimeStop);
     // Shutdown the Evaluators.
     this.evaluators.close();
+    this.resourceManagerStopHandler.onNext(runtimeStop);
     // Inform the client of the shutdown.
     final Optional<Throwable> exception = 
Optional.<Throwable>ofNullable(runtimeStop.getException());
     this.driverStatusManager.sendJobEndingMessageToClient(exception);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStartHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStartHandler.java
new file mode 100644
index 0000000..0744ded
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStartHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+/**
+ * Initialize the resource manager.
+ */
+@RuntimeAuthor
+public interface ResourceManagerStartHandler extends 
EventHandler<RuntimeStart> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStopHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStopHandler.java
new file mode 100644
index 0000000..896e904
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceManagerStopHandler.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.api;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+/**
+ * Shutdown the resource manager.
+ */
+@RuntimeAuthor
+public interface ResourceManagerStopHandler extends EventHandler<RuntimeStop> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
index d991dce..1a79505 100644
--- 
a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightDriverConfiguration.java
@@ -23,9 +23,7 @@ import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
@@ -44,7 +42,6 @@ import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.tang.formats.RequiredParameter;
-import org.apache.reef.wake.time.Clock;
 
 /**
  * ConfigurationModule to create a Driver configuration.
@@ -88,9 +85,9 @@ public final class HDInsightDriverConfiguration extends 
ConfigurationModuleBuild
       .bindImplementation(ResourceLaunchHandler.class, 
YARNResourceLaunchHandler.class)
       .bindImplementation(ResourceReleaseHandler.class, 
YARNResourceReleaseHandler.class)
       .bindImplementation(ResourceRequestHandler.class, 
YarnResourceRequestHandler.class)
+      .bindImplementation(ResourceManagerStartHandler.class, 
YARNRuntimeStartHandler.class)
+      .bindImplementation(ResourceManagerStopHandler.class, 
YARNRuntimeStopHandler.class)
       .bindConstructor(YarnConfiguration.class, 
YarnConfigurationConstructor.class)
-      .bindSetEntry(Clock.RuntimeStartHandler.class, 
YARNRuntimeStartHandler.class)
-      .bindSetEntry(Clock.RuntimeStopHandler.class, 
YARNRuntimeStopHandler.class)
       .bindImplementation(TempFileCreator.class, 
WorkingDirectoryTempFileCreator.class)
 
           // Bind the YARN Configuration parameters

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index 34b1146..a31c0e0 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -41,10 +41,6 @@ import org.apache.reef.util.Optional;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.remote.RemoteMessage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.time.Time;
-import org.apache.reef.wake.time.runtime.RuntimeClock;
-import org.apache.reef.wake.time.runtime.event.RuntimeStart;
-import org.apache.reef.wake.time.runtime.event.RuntimeStop;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -109,7 +105,6 @@ final class ContainerManager implements AutoCloseable {
   @Inject
   ContainerManager(
       final RemoteManager remoteManager,
-      final RuntimeClock clock,
       final REEFFileNames fileNames,
       @Parameter(MaxNumberOfEvaluators.class) final int capacity,
       @Parameter(RootFolder.class) final String rootFolderName,
@@ -142,24 +137,6 @@ final class ContainerManager implements AutoCloseable {
             release(error.getId());
           }
         });
-    clock.registerEventHandler(RuntimeStart.class, new EventHandler<Time>() {
-      @Override
-      public void onNext(final Time value) {
-        synchronized (ContainerManager.this) {
-          ContainerManager.this.sendNodeDescriptors();
-        }
-      }
-    });
-
-    clock.registerEventHandler(RuntimeStop.class, new EventHandler<Time>() {
-      @Override
-      public void onNext(final Time value) {
-        synchronized (ContainerManager.this) {
-          LOG.log(Level.FINEST, "RuntimeStop: close the container manager");
-          ContainerManager.this.close();
-        }
-      }
-    });
 
     init();
 
@@ -218,6 +195,9 @@ final class ContainerManager implements AutoCloseable {
     }
   }
 
+  synchronized void start() {
+    sendNodeDescriptors();
+  }
 
   private void sendNodeDescriptors() {
     final IDMaker idmaker = new IDMaker("Node-");
@@ -412,7 +392,7 @@ final class ContainerManager implements AutoCloseable {
   }
 
   @Override
-  public void close() {
+  public synchronized void close() {
     synchronized (this.containers) {
       if (this.containers.isEmpty()) {
         LOG.log(Level.FINEST, "Clean shutdown with no outstanding 
containers.");

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
index 308e316..82c3560 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java
@@ -18,9 +18,7 @@
  */
 package org.apache.reef.runtime.local.driver;
 
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
@@ -73,6 +71,8 @@ public class LocalDriverConfiguration extends 
ConfigurationModuleBuilder {
       .bindImplementation(ResourceLaunchHandler.class, 
LocalResourceLaunchHandler.class)
       .bindImplementation(ResourceRequestHandler.class, 
LocalResourceRequestHandler.class)
       .bindImplementation(ResourceReleaseHandler.class, 
LocalResourceReleaseHandler.class)
+      .bindImplementation(ResourceManagerStartHandler.class, 
LocalResourceManagerStartHandler.class)
+      .bindImplementation(ResourceManagerStopHandler.class, 
LocalResourceManagerStopHandler.class)
       .bindNamedParameter(ClientRemoteIdentifier.class, 
CLIENT_REMOTE_IDENTIFIER)
       .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER)
       .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStartHandler.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStartHandler.java
new file mode 100644
index 0000000..ace2736
--- /dev/null
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStartHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+
+final class LocalResourceManagerStartHandler implements 
ResourceManagerStartHandler {
+
+  private final ContainerManager containerManager;
+
+  @Inject
+  private LocalResourceManagerStartHandler(final ContainerManager 
containerManager) {
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  public void onNext(final RuntimeStart value) {
+    this.containerManager.start();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStopHandler.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStopHandler.java
new file mode 100644
index 0000000..a30eaa2
--- /dev/null
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceManagerStopHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+
+final class LocalResourceManagerStopHandler implements 
ResourceManagerStopHandler {
+
+  private final ContainerManager containerManager;
+
+  @Inject
+  private LocalResourceManagerStopHandler(final ContainerManager 
containerManager) {
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  public void onNext(final RuntimeStop value) {
+    this.containerManager.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
index 4593567..eadfe98 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosDriverConfiguration.java
@@ -21,9 +21,7 @@ package org.apache.reef.runtime.mesos.driver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
@@ -41,7 +39,6 @@ import org.apache.reef.tang.formats.RequiredParameter;
 import org.apache.reef.wake.EStage;
 import org.apache.reef.wake.StageConfiguration;
 import org.apache.reef.wake.impl.SingleThreadStage;
-import org.apache.reef.wake.time.Clock;
 
 /**
  * Binds Driver's runtime event handlers.
@@ -81,8 +78,8 @@ public final class MesosDriverConfiguration extends 
ConfigurationModuleBuilder {
       .bindImplementation(ResourceLaunchHandler.class, 
MesosResourceLaunchHandler.class)
       .bindImplementation(ResourceReleaseHandler.class, 
MesosResourceReleaseHandler.class)
       .bindImplementation(ResourceRequestHandler.class, 
MesosResourceRequestHandler.class)
-      .bindSetEntry(Clock.RuntimeStartHandler.class, 
MesosRuntimeStartHandler.class)
-      .bindSetEntry(Clock.RuntimeStopHandler.class, 
MesosRuntimeStopHandler.class)
+      .bindImplementation(ResourceManagerStartHandler.class, 
MesosRuntimeStartHandler.class)
+      .bindImplementation(ResourceManagerStopHandler.class, 
MesosRuntimeStopHandler.class)
       .bindImplementation(TempFileCreator.class, 
WorkingDirectoryTempFileCreator.class)
 
       .bindNamedParameter(MesosMasterIp.class, MESOS_MASTER_IP)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
index d442a4d..268f7c9 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStartHandler.java
@@ -18,12 +18,12 @@
  */
 package org.apache.reef.runtime.mesos.driver;
 
-import org.apache.reef.wake.EventHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
 import org.apache.reef.wake.time.runtime.event.RuntimeStart;
 
 import javax.inject.Inject;
 
-final class MesosRuntimeStartHandler implements EventHandler<RuntimeStart> {
+final class MesosRuntimeStartHandler implements ResourceManagerStartHandler {
   private final REEFScheduler reefScheduler;
 
   @Inject

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
index 80648d0..aa58c26 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosRuntimeStopHandler.java
@@ -18,12 +18,12 @@
  */
 package org.apache.reef.runtime.mesos.driver;
 
-import org.apache.reef.wake.EventHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
 import org.apache.reef.wake.time.runtime.event.RuntimeStop;
 
 import javax.inject.Inject;
 
-final class MesosRuntimeStopHandler implements EventHandler<RuntimeStop> {
+final class MesosRuntimeStopHandler implements ResourceManagerStopHandler {
   private final REEFScheduler reefScheduler;
 
   @Inject

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
index 697b4de..de53f79 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStartHandler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
-import org.apache.reef.wake.EventHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
 import org.apache.reef.wake.time.runtime.event.RuntimeStart;
 
 import javax.inject.Inject;
@@ -26,7 +26,7 @@ import javax.inject.Inject;
 /**
  * Handler of RuntimeStart for the YARN Runtime.
  */
-public final class YARNRuntimeStartHandler implements 
EventHandler<RuntimeStart> {
+public final class YARNRuntimeStartHandler implements 
ResourceManagerStartHandler {
 
   private final YarnContainerManager yarnContainerManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
index 82e6f2f..3d4492e 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNRuntimeStopHandler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
-import org.apache.reef.wake.EventHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
 import org.apache.reef.wake.time.runtime.event.RuntimeStop;
 
 import javax.inject.Inject;
@@ -26,7 +26,7 @@ import javax.inject.Inject;
 /**
  * Shuts down the YARN resource manager.
  */
-public final class YARNRuntimeStopHandler implements EventHandler<RuntimeStop> 
{
+public final class YARNRuntimeStopHandler implements 
ResourceManagerStopHandler {
 
   private final YarnContainerManager yarnContainerManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9c820fd5/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
index 89545b3..abe5c7d 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
@@ -21,9 +21,7 @@ package org.apache.reef.runtime.yarn.driver;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.reef.io.TempFileCreator;
 import org.apache.reef.io.WorkingDirectoryTempFileCreator;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.driver.api.*;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.driver.parameters.EvaluatorTimeout;
 import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
@@ -39,7 +37,6 @@ import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.tang.formats.RequiredParameter;
-import org.apache.reef.wake.time.Clock;
 
 /**
  * Created by marku_000 on 2014-07-07.
@@ -80,9 +77,9 @@ public class YarnDriverConfiguration extends 
ConfigurationModuleBuilder {
       .bindImplementation(ResourceLaunchHandler.class, 
YARNResourceLaunchHandler.class)
       .bindImplementation(ResourceReleaseHandler.class, 
YARNResourceReleaseHandler.class)
       .bindImplementation(ResourceRequestHandler.class, 
YarnResourceRequestHandler.class)
+      .bindImplementation(ResourceManagerStartHandler.class, 
YARNRuntimeStartHandler.class)
+      .bindImplementation(ResourceManagerStopHandler.class, 
YARNRuntimeStopHandler.class)
       .bindConstructor(YarnConfiguration.class, 
YarnConfigurationConstructor.class)
-      .bindSetEntry(Clock.RuntimeStartHandler.class, 
YARNRuntimeStartHandler.class)
-      .bindSetEntry(Clock.RuntimeStopHandler.class, 
YARNRuntimeStopHandler.class)
       .bindImplementation(TempFileCreator.class, 
WorkingDirectoryTempFileCreator.class)
 
           // Bind the YARN Configuration parameters

Reply via email to