Repository: reef Updated Branches: refs/heads/master f5107f0eb -> dc918bdc8
[REEF-1596] Add the ability to check the status of the REEF job launched directly via REEFEnvironment Driver. Summary of changes: * Add .getLastStatus() method to JobStatusHandler interface * Implement .getLastStatus() method in all classes that derive from JobStatusHandler * Set the default value for JobStatusHandler injectable parameter * implement REEFEnvironment.getLastStatus() method * Use that method in unit tests * Minor cosmetic changes in the code, more javadocs JIRA: [REEF-1596](https://issues.apache.org/jira/browse/REEF-1596) Pull Request: This changes #1151 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dc918bdc Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dc918bdc Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dc918bdc Branch: refs/heads/master Commit: dc918bdc8048606802696424d5652b0b13060d16 Parents: f5107f0 Author: Sergiy Matusevych <mo...@apache.org> Authored: Thu Oct 6 12:02:56 2016 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Mon Oct 10 20:13:14 2016 -0700 ---------------------------------------------------------------------- .../reef/runtime/common/REEFEnvironment.java | 26 +++++-- .../common/driver/client/ClientConnection.java | 19 ++--- .../common/driver/client/JobStatusHandler.java | 40 +++++++++++ .../driver/client/LoggingJobStatusHandler.java | 21 +++++- .../client/RemoteClientJobStatusHandler.java | 73 ++++++++++++++++++++ .../local/driver/LocalDriverConfiguration.java | 12 ++-- .../reef/tests/fail/driver/FailClient.java | 9 +-- .../tests/driver/REEFEnvironmentDriverTest.java | 15 +++- 8 files changed, 183 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java index c8a3d0e..b997347 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFEnvironment.java @@ -18,6 +18,8 @@ */ package org.apache.reef.runtime.common; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.client.JobStatusHandler; import org.apache.reef.runtime.common.launch.ProfilingStopHandler; import org.apache.reef.runtime.common.launch.REEFErrorHandler; import org.apache.reef.tang.Configuration; @@ -59,6 +61,8 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { /** Error handler that processes all uncaught REEF exceptions. */ private final REEFErrorHandler errorHandler; + private final JobStatusHandler jobStatusHandler; + /** * Create a new REEF environment. * @param configurations REEF component (Driver or Evaluator) configuration. @@ -86,11 +90,12 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { injector.getInstance(REEFVersion.class).logVersion(); final REEFErrorHandler errorHandler = injector.getInstance(REEFErrorHandler.class); + final JobStatusHandler jobStatusHandler = injector.getInstance(JobStatusHandler.class); try { final Clock clock = injector.getInstance(Clock.class); - return new REEFEnvironment(clock, errorHandler); + return new REEFEnvironment(clock, errorHandler, jobStatusHandler); } catch (final Throwable ex) { LOG.log(Level.SEVERE, "Error while instantiating the clock", ex); @@ -107,10 +112,15 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { * Use .fromConfiguration() method to create new REEF environment. * @param clock main event loop. * @param errorHandler error handler. + * @param jobStatusHandler an object that receives notifications on job status changes + * and can be queried for the last received job status. */ - private REEFEnvironment(final Clock clock, final REEFErrorHandler errorHandler) { + private REEFEnvironment( + final Clock clock, final REEFErrorHandler errorHandler, final JobStatusHandler jobStatusHandler) { + this.clock = clock; this.errorHandler = errorHandler; + this.jobStatusHandler = jobStatusHandler; } /** @@ -146,6 +156,7 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { /** * Launch REEF component (Driver or Evaluator). * It is usually called from the static .run() method. + * Check the status of the run via .getLastStatus() method. */ @Override @SuppressWarnings("checkstyle:illegalcatch") // Catch throwable to feed it to error handler @@ -159,12 +170,19 @@ public final class REEFEnvironment implements Runnable, AutoCloseable { LOG.log(Level.FINEST, "Clock: start"); this.clock.run(); - LOG.log(Level.FINEST, "Clock: exit normally"); + LOG.log(Level.FINEST, "Clock: exit normally: {0}", this.getLastStatus()); } catch (final Throwable ex) { LOG.log(Level.SEVERE, "Clock: Error in main event loop", ex); this.errorHandler.onNext(ex); - throw ex; } } + + /** + * Get the last known status of REEF job. Can return null if job has not started yet. + * @return Status of the REEF job launched in this environment. + */ + public ReefServiceProtos.JobStatusProto getLastStatus() { + return this.jobStatusHandler.getLastStatus(); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java index 3f22e84..7be8e07 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java @@ -21,11 +21,8 @@ package org.apache.reef.runtime.common.driver.client; import com.google.protobuf.ByteString; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.proto.ReefServiceProtos; -import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; -import org.apache.reef.runtime.common.utils.RemoteManager; import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.util.logging.Level; @@ -39,22 +36,16 @@ public final class ClientConnection { private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName()); - private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler; + private final JobStatusHandler jobStatusHandler; private final String jobIdentifier; @Inject public ClientConnection( - final RemoteManager remoteManager, - @Parameter(ClientRemoteIdentifier.class) final String clientRID, - @Parameter(JobIdentifier.class) final String jobIdentifier) { + @Parameter(JobIdentifier.class) final String jobIdentifier, + final JobStatusHandler jobStatusHandler) { + this.jobIdentifier = jobIdentifier; - if (clientRID.equals(ClientRemoteIdentifier.NONE)) { - LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client."); - this.jobStatusHandler = new LoggingJobStatusHandler(); - } else { - this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class); - LOG.log(Level.FINE, "Instantiated 'ClientConnection'"); - } + this.jobStatusHandler = jobStatusHandler; } /** http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java new file mode 100644 index 0000000..1cc2034 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobStatusHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.client; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EventHandler; + +/** + * Generic interface for job status messages' handler. + * Receive JobStatusProto messages and keep the last message so it can be retrieved via getLastStatus() method. + */ +@DriverSide +@DefaultImplementation(RemoteClientJobStatusHandler.class) +public interface JobStatusHandler extends EventHandler<ReefServiceProtos.JobStatusProto> { + + /** + * Return the last known status of the REEF job. + * Can return null if the job has not been launched yet. + * @return Last status of the REEF job. + */ + ReefServiceProtos.JobStatusProto getLastStatus(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java index f627c2b..0bf7ce5 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java @@ -18,8 +18,8 @@ */ package org.apache.reef.runtime.common.driver.client; +import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.proto.ReefServiceProtos; -import org.apache.reef.wake.EventHandler; import javax.inject.Inject; import java.util.logging.Level; @@ -28,15 +28,30 @@ import java.util.logging.Logger; /** * A handler for job status messages that just logs them. */ -final class LoggingJobStatusHandler implements EventHandler<ReefServiceProtos.JobStatusProto> { +@DriverSide +public class LoggingJobStatusHandler implements JobStatusHandler { + private static final Logger LOG = Logger.getLogger(LoggingJobStatusHandler.class.getName()); + private ReefServiceProtos.JobStatusProto lastStatus = null; + @Inject LoggingJobStatusHandler() { } @Override public void onNext(final ReefServiceProtos.JobStatusProto jobStatusProto) { - LOG.log(Level.INFO, "Received a JobStatus message that can't be sent:\n" + jobStatusProto.toString()); + this.lastStatus = jobStatusProto; + LOG.log(Level.INFO, "In-process JobStatus:\n{0}", jobStatusProto); + } + + /** + * Return the last known status of the REEF job. + * Can return null if the job has not been launched yet. + * @return Last status of the REEF job. + */ + @Override + public ReefServiceProtos.JobStatusProto getLastStatus() { + return this.lastStatus; } } http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java new file mode 100644 index 0000000..001fca3 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/RemoteClientJobStatusHandler.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.client; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.utils.RemoteManager; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Generic interface for job status messages' handler. + */ +@DriverSide +final class RemoteClientJobStatusHandler implements JobStatusHandler { + + private static final Logger LOG = Logger.getLogger(RemoteClientJobStatusHandler.class.getName()); + + private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler; + + private ReefServiceProtos.JobStatusProto lastStatus = null; + + @Inject + private RemoteClientJobStatusHandler( + final RemoteManager remoteManager, + @Parameter(ClientRemoteIdentifier.class) final String clientRID) { + + if (clientRID.equals(ClientRemoteIdentifier.NONE)) { + LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' without an actual connection to the client."); + this.jobStatusHandler = new LoggingJobStatusHandler(); + } else { + this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class); + LOG.log(Level.FINE, "Instantiated 'RemoteClientJobStatusHandler' for {0}", clientRID); + } + } + + @Override + public void onNext(final ReefServiceProtos.JobStatusProto jobStatus) { + this.lastStatus = jobStatus; + this.jobStatusHandler.onNext(jobStatus); + } + + /** + * Return the last known status of the REEF job. + * Can return null if the job has not been launched yet. + * @return Last status of the REEF job. + */ + @Override + public ReefServiceProtos.JobStatusProto getLastStatus() { + return this.lastStatus; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java index 658b107..2b7ba09 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalDriverConfiguration.java @@ -19,6 +19,7 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.common.driver.client.JobStatusHandler; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.common.driver.parameters.DefinedRuntimes; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; @@ -30,10 +31,7 @@ import org.apache.reef.runtime.local.LocalClasspathProvider; import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; import org.apache.reef.runtime.local.client.parameters.RackNames; import org.apache.reef.runtime.local.client.parameters.RootFolder; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationModuleBuilder; -import org.apache.reef.tang.formats.OptionalParameter; -import org.apache.reef.tang.formats.RequiredParameter; +import org.apache.reef.tang.formats.*; /** * ConfigurationModule for the Driver executed in the local resourcemanager. This is meant to eventually replace @@ -68,6 +66,11 @@ public class LocalDriverConfiguration extends ConfigurationModuleBuilder { public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>(); /** + * Interface to use for communications back to the client. + */ + public static final OptionalImpl<JobStatusHandler> JOB_STATUS_HANDLER = new OptionalImpl<>(); + + /** * The identifier of the Job submitted. */ public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>(); @@ -78,6 +81,7 @@ public class LocalDriverConfiguration extends ConfigurationModuleBuilder { .bindImplementation(ResourceReleaseHandler.class, LocalResourceReleaseHandler.class) .bindImplementation(ResourceManagerStartHandler.class, LocalResourceManagerStartHandler.class) .bindImplementation(ResourceManagerStopHandler.class, LocalResourceManagerStopHandler.class) + .bindImplementation(JobStatusHandler.class, JOB_STATUS_HANDLER) .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER) .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER) .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER) http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java index 9d64500..3dd7180 100644 --- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java +++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/driver/FailClient.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.client.DriverConfiguration; import org.apache.reef.client.LauncherStatus; +import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.REEFEnvironment; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Tang; @@ -81,18 +82,18 @@ public final class FailClient { * @param failMsgClass A class that should fail during the test. * @param runtimeConfig REEF runtime configuration. Can be e.g. Local or YARN. * @param timeOut REEF application timeout - not used yet. - * @return launcher status - usually FAIL. + * @return Final job status. Final status for tests is usually something + * with state = FAILED and exception like SimulatedDriverFailure. * @throws InjectionException configuration error. */ - public static LauncherStatus runInProcess(final Class<?> failMsgClass, + public static ReefServiceProtos.JobStatusProto runInProcess(final Class<?> failMsgClass, final Configuration runtimeConfig, final int timeOut) throws InjectionException { try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(runtimeConfig, buildDriverConfig(failMsgClass))) { reef.run(); + return reef.getLastStatus(); } - - return LauncherStatus.FORCE_CLOSED; // TODO[REEF-1596]: Use the actual status, when implemented. } /** http://git-wip-us.apache.org/repos/asf/reef/blob/dc918bdc/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java index 088aee3..9082a36 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/driver/REEFEnvironmentDriverTest.java @@ -19,6 +19,7 @@ package org.apache.reef.tests.driver; import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.REEFEnvironment; import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; import org.apache.reef.runtime.local.driver.LocalDriverConfiguration; @@ -30,13 +31,13 @@ import org.junit.Assert; import org.junit.Test; /** - * This tests whether the noop driver gets shutdown properly. + * This tests whether the noop driver launched in-process gets shutdown properly. */ public final class REEFEnvironmentDriverTest { private static final Configuration DRIVER_CONFIG = DriverConfiguration.CONF .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(DriverTestStartHandler.class)) - .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_DriverTest") + .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_REEFEnvironmentDriverTest") .set(DriverConfiguration.ON_DRIVER_STARTED, DriverTestStartHandler.class) .build(); @@ -45,14 +46,22 @@ public final class REEFEnvironmentDriverTest { .set(LocalDriverConfiguration.ROOT_FOLDER, ".") .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0) .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) - .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_DRIVER_TEST") + .set(LocalDriverConfiguration.JOB_IDENTIFIER, "LOCAL_ENV_DRIVER_TEST") .set(LocalDriverConfiguration.RUNTIME_NAMES, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME) .build(); @Test public void testREEFEnvironmentDriver() throws BindException, InjectionException { + try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration(LOCAL_DRIVER_MODULE, DRIVER_CONFIG)) { + reef.run(); + final ReefServiceProtos.JobStatusProto status = reef.getLastStatus(); + + Assert.assertNotNull("REEF job must report its status", status); + Assert.assertTrue("REEF job status must contain a state", status.hasState()); + Assert.assertEquals("Unexpected final job status", ReefServiceProtos.State.DONE, status.getState()); + } catch (final Throwable ex) { Assert.fail("Local driver execution failed: " + ex); }