[Trivial] Add Sergiy test cases for the bridge. Failure tests added for bridge
Pull Request: Closes #1465 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8f3dafa9 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8f3dafa9 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8f3dafa9 Branch: refs/heads/REEF-335 Commit: 8f3dafa9fd39f5c172a0957a9935ab2d91c0d542 Parents: 3b16519 Author: Tyson Condie <tcon...@apache.org> Authored: Mon May 21 14:29:02 2018 -0700 Committer: Tyson Condie <tcon...@apache.org> Committed: Wed May 23 13:24:08 2018 -0700 ---------------------------------------------------------------------- .../proto/bridge/DriverClientProtocol.proto | 2 +- .../proto/bridge/DriverCommonProtocol.proto | 11 +- .../proto/bridge/DriverServiceProtocol.proto | 14 +- .../bridge/client/DriverServiceLauncher.java | 86 +++-- .../bridge/driver/client/DriverClientClock.java | 3 + .../driver/client/DriverClientDispatcher.java | 33 +- .../client/DriverClientEvaluatorRequestor.java | 8 +- .../client/DriverClientExceptionHandler.java | 8 +- .../driver/client/IDriverClientService.java | 7 + .../driver/client/IDriverServiceClient.java | 9 + .../driver/client/JavaDriverClientLauncher.java | 22 +- .../client/events/FailedContextBridge.java | 10 +- .../driver/client/events/RunningTaskBridge.java | 4 +- .../client/events/SuspendedTaskBridge.java | 56 +++ .../driver/client/grpc/DriverClientService.java | 107 ++++-- .../driver/client/grpc/DriverServiceClient.java | 65 +++- .../bridge/driver/launch/IDriverLauncher.java | 3 +- .../launch/azbatch/AzureBatchLauncher.java | 28 +- .../driver/launch/local/LocalLauncher.java | 5 +- .../bridge/driver/launch/yarn/YarnLauncher.java | 3 +- .../service/DriverServiceConfiguration.java | 1 + .../driver/service/grpc/GRPCDriverService.java | 353 +++++++++++++------ lang/java/reef-tests/pom.xml | 11 + .../tests/fail/driver/FailBridgeClient.java | 103 ++++++ .../reef/tests/fail/driver/FailDriver.java | 1 + .../reef/tests/fail/task/BridgeClient.java | 92 +++++ .../tests/fail/util/FailBridgeClientUtils.java | 103 ++++++ .../reef/tests/fail/util/package-info.java | 22 ++ .../reef/tests/fail/FailBridgeDriverTest.java | 141 ++++++++ .../reef/tests/fail/FailBridgeTaskTest.java | 95 +++++ .../apache/reef/tests/fail/FailTestSuite.java | 4 +- 31 files changed, 1166 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto index 6d0d08b..86d8836 100644 --- a/lang/common/proto/bridge/DriverClientProtocol.proto +++ b/lang/common/proto/bridge/DriverClientProtocol.proto @@ -37,7 +37,7 @@ service DriverClient { // Request for resources rpc StartHandler (StartTimeInfo) returns (Void) {} - rpc StopHandler (StopTimeInfo) returns (Void) {} + rpc StopHandler (StopTimeInfo) returns (ExceptionInfo) {} rpc AlarmTrigger (AlarmTriggerInfo) returns (Void) {} http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverCommonProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverCommonProtocol.proto b/lang/common/proto/bridge/DriverCommonProtocol.proto index 7ec8905..e67ca73 100644 --- a/lang/common/proto/bridge/DriverCommonProtocol.proto +++ b/lang/common/proto/bridge/DriverCommonProtocol.proto @@ -30,15 +30,18 @@ package driverbridge; message Void {} message ExceptionInfo { + // no error present if true + bool no_error = 1; + // Exception name/type - string name = 1; + string name = 2; // Exception message - string message = 2; + string message = 3; // Stack trace - repeated string stack_trace = 3; + repeated string stack_trace = 4; // Data associated with exception - bytes data = 4; + bytes data = 5; } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/common/proto/bridge/DriverServiceProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverServiceProtocol.proto b/lang/common/proto/bridge/DriverServiceProtocol.proto index 7f6da24..0bc72ef 100644 --- a/lang/common/proto/bridge/DriverServiceProtocol.proto +++ b/lang/common/proto/bridge/DriverServiceProtocol.proto @@ -59,6 +59,9 @@ message DriverClientRegistration { // The client's server port int32 port = 2; + + // Error during initialization + ExceptionInfo exception = 5; } // The request message containing resource request. @@ -147,9 +150,12 @@ message ActiveContextRequest { message RunningTaskRequest { string task_id = 1; - // close the task - bool close_task = 2; + bytes message = 2; - // send task a message - bytes message = 3; + enum Operation { + CLOSE = 0; + SUSPEND = 1; + SEND_MESSAGE = 2; + } + Operation operation = 5; } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java index 5a7342c..23a73f2 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java @@ -18,6 +18,7 @@ */ package org.apache.reef.bridge.client; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.commons.lang.StringUtils; import org.apache.reef.bridge.driver.launch.IDriverLauncher; @@ -28,6 +29,7 @@ import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher; import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.LauncherStatus; import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider; import org.apache.reef.runtime.common.files.*; import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; @@ -43,7 +45,6 @@ import org.apache.reef.tang.formats.ConfigurationSerializer; import java.io.File; import java.io.IOException; -import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; @@ -70,7 +71,7 @@ public final class DriverServiceLauncher { throw new RuntimeException("Do not instantiate this class!"); } - public static void submit( + public static LauncherStatus submit( final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, final Configuration driverClientConfiguration) throws InjectionException, IOException { @@ -118,22 +119,13 @@ public final class DriverServiceLauncher { .setClassPath(driverClientConfigurationProto.getOperatingSystem() == ClientProtocol.DriverClientConfiguration.OS.WINDOWS ? StringUtils.join(classpathProvider.getDriverClasspath(), ";") : - StringUtils.join(classpathProvider.getDriverClasspath(), ":")) + StringUtils.join(classpathProvider.getDriverClasspath(), ":")) .build(); final String cmd = StringUtils.join(launchCommand, ' '); builder.setDriverClientLaunchCommand(cmd); builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath()); - // call main() - final File driverClientConfFile = File.createTempFile("driverclient", ".json"); - try { - try (PrintWriter out = new PrintWriter(driverClientConfFile)) { - out.println(JsonFormat.printer().print(builder.build())); - } - main(new String[]{driverClientConfFile.getAbsolutePath()}); - } finally { - driverClientConfFile.deleteOnExit(); - } + return launch(driverClientConfigurationProto); } finally { driverClientConfigurationFile.deleteOnExit(); } @@ -160,7 +152,7 @@ public final class DriverServiceLauncher { .newInjector(yarnJobSubmissionClientConfig).getInstance(YarnLauncher.class); } - private static IDriverLauncher getAzureBatchDriverServiceLauncher() throws InjectionException { + private static IDriverLauncher getAzureBatchDriverServiceLauncher() throws InjectionException { final Configuration azbatchJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() .bindImplementation(IDriverLauncher.class, AzureBatchLauncher.class) .bindImplementation(IDriverServiceConfigurationProvider.class, @@ -169,48 +161,52 @@ public final class DriverServiceLauncher { return Tang.Factory.getTang().newInjector(azbatchJobSubmissionClientConfig).getInstance(AzureBatchLauncher.class); } - /** - * Main method that launches the REEF job. - * - * @param args command line parameters. - */ - public static void main(final String[] args) { + private static LauncherStatus launch( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { try { - if (args.length != 1) { - LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + - " accepts single argument referencing a file that contains a client protocol buffer driver configuration"); - } - final String content; - try { - content = new String(Files.readAllBytes(Paths.get(args[0]))); - } catch (IOException e) { - throw new RuntimeException(e); - } - final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder = - ClientProtocol.DriverClientConfiguration.newBuilder(); - JsonFormat.parser() - .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) - .merge(content, driverClientConfigurationProtoBuilder); - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = - driverClientConfigurationProtoBuilder.build(); switch (driverClientConfigurationProto.getRuntimeCase()) { case YARN_RUNTIME: final IDriverLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher(); - yarnDriverServiceLauncher.launch(driverClientConfigurationProto); - break; + return yarnDriverServiceLauncher.launch(driverClientConfigurationProto); case LOCAL_RUNTIME: final IDriverLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher(); - localDriverServiceLauncher.launch(driverClientConfigurationProto); - break; + return localDriverServiceLauncher.launch(driverClientConfigurationProto); case AZBATCH_RUNTIME: final IDriverLauncher azureBatchDriverServiceLauncher = getAzureBatchDriverServiceLauncher(); - azureBatchDriverServiceLauncher.launch(driverClientConfigurationProto); - break; + return azureBatchDriverServiceLauncher.launch(driverClientConfigurationProto); default: + throw new RuntimeException("Unknown runtime"); } - LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid()); - } catch (final BindException | InjectionException | IOException ex) { + } catch (final BindException | InjectionException ex) { LOG.log(Level.SEVERE, "Job configuration error", ex); + throw new RuntimeException(ex); + } + } + + /** + * Main method that launches the REEF job. + * + * @param args command line parameters. + */ + public static void main(final String[] args) throws InvalidProtocolBufferException { + if (args.length != 1) { + LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + + " accepts single argument referencing a file that contains a client protocol buffer driver configuration"); + } + final String content; + try { + content = new String(Files.readAllBytes(Paths.get(args[0]))); + } catch (IOException e) { + throw new RuntimeException(e); } + final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder = + ClientProtocol.DriverClientConfiguration.newBuilder(); + JsonFormat.parser() + .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) + .merge(content, driverClientConfigurationProtoBuilder); + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = + driverClientConfigurationProtoBuilder.build(); + final LauncherStatus status = launch(driverClientConfigurationProto); + LOG.log(Level.INFO, "Status: " + status.toString()); } } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java index ca3817b..9857ab9 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientClock.java @@ -63,10 +63,12 @@ public final class DriverClientClock implements Clock, IAlarmDispatchHandler { @Override public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + LOG.log(Level.INFO, "Schedule alarm offset {0}", offset); final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); final String alarmId = UUID.randomUUID().toString(); this.alarmMap.put(alarmId, alarm); this.driverServiceClient.onSetAlarm(alarmId, offset); + LOG.log(Level.INFO, "Alarm {0} scheduled at offset {1}", new Object[]{alarmId, offset}); return alarm; } @@ -117,6 +119,7 @@ public final class DriverClientClock implements Clock, IAlarmDispatchHandler { */ @Override public void onNext(final String alarmId) { + LOG.log(Level.INFO, "Alarm {0} triggered", alarmId); if (this.alarmMap.containsKey(alarmId)) { final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId); clientAlarm.run(); http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java index 8c4eb28..ce66692 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientDispatcher.java @@ -50,6 +50,11 @@ import java.util.Set; public final class DriverClientDispatcher { /** + * Exception handler. + */ + private final DriverClientExceptionHandler exceptionHandler; + + /** * Dispatcher used for application provided event handlers. */ private final DispatchingEStage applicationDispatcher; @@ -79,6 +84,12 @@ public final class DriverClientDispatcher { */ private final DispatchingEStage driverRestartDispatcher; + + /** + * Synchronous set of stop handlers. + */ + private final Set<EventHandler<StopTime>> stopHandlers; + @Inject private DriverClientDispatcher( final DriverClientExceptionHandler driverExceptionHandler, @@ -124,12 +135,12 @@ public final class DriverClientDispatcher { final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, @Parameter(ClientMessageHandlers.class) final Set<EventHandler<byte[]>> clientMessageHandlers) { - + this.exceptionHandler = driverExceptionHandler; this.applicationDispatcher = new DispatchingEStage( driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher"); // Application start and stop handlers this.applicationDispatcher.register(StartTime.class, startHandlers); - this.applicationDispatcher.register(StopTime.class, stopHandlers); + this.stopHandlers = stopHandlers; // must be called synchronously // Application Context event handlers this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); @@ -276,8 +287,22 @@ public final class DriverClientDispatcher { this.applicationDispatcher.onNext(StartTime.class, startTime); } - public void dispatch(final StopTime stopTime) { - this.applicationDispatcher.onNext(StopTime.class, stopTime); + /** + * We must implement this synchronously in order to catch exceptions and + * forward them back via the bridge before the server shuts down, after + * this method returns. + * @param stopTime stop time + */ + @SuppressWarnings("checkstyle:illegalCatch") + public Throwable dispatch(final StopTime stopTime) { + try { + for (final EventHandler<StopTime> handler : stopHandlers) { + handler.onNext(stopTime); + } + return null; + } catch (Throwable t) { + return t; + } } public void dispatch(final ActiveContext context) { http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java index 54692ec..e1c01d6 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java @@ -31,13 +31,19 @@ public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor private final IDriverServiceClient driverServiceClient; + private final IDriverClientService driverClientService; + @Inject - private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) { + private DriverClientEvaluatorRequestor( + final IDriverServiceClient driverServiceClient, + final IDriverClientService driverClientService) { this.driverServiceClient = driverServiceClient; + this.driverClientService = driverClientService; } @Override public void submit(final EvaluatorRequest req) { + this.driverClientService.notifyEvaluatorRequest(req.getNumber()); this.driverServiceClient.onEvaluatorRequest(req); } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java index 9bd99d6..9e31b99 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientExceptionHandler.java @@ -20,6 +20,7 @@ package org.apache.reef.bridge.driver.client; import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; import javax.inject.Inject; import java.util.logging.Level; @@ -31,13 +32,18 @@ import java.util.logging.Logger; public final class DriverClientExceptionHandler implements EventHandler<Throwable> { private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName()); + private final Clock clock; + @Inject - private DriverClientExceptionHandler() { + private DriverClientExceptionHandler(final Clock clock) { LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'"); + this.clock = clock; } @Override public void onNext(final Throwable throwable) { + LOG.log(Level.SEVERE, throwable.toString()); + this.clock.stop(throwable); } } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java index 38758bd..3fd9cd0 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverClientService.java @@ -38,6 +38,13 @@ public interface IDriverClientService { /** + * Notify that the count number of evaluators have been + * requested by the application. + * @param count of the number of evaluators + */ + void notifyEvaluatorRequest(final int count); + + /** * Wait for termination of driver client service. */ void awaitTermination() throws InterruptedException; http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java index 7cc1346..1421666 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/IDriverServiceClient.java @@ -34,6 +34,8 @@ import java.util.List; @DefaultImplementation(DriverServiceClient.class) public interface IDriverServiceClient { + void onInitializationException(final Throwable ex); + /** * Initiate shutdown. */ @@ -129,4 +131,11 @@ public interface IDriverServiceClient { * @param message to send */ void onTaskMessage(final String taskId, final byte[] message); + + /** + * Suspend a running task. + * @param taskId task identifier + * @param message optional message + */ + void onSuspendTask(final String taskId, final Optional<byte[]> message); } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java index 3b675ea..964036f 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/JavaDriverClientLauncher.java @@ -190,10 +190,24 @@ public final class JavaDriverClientLauncher { Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig); - try (final Clock reef = injector.getInstance(Clock.class)) { - reef.run(); - } catch (final Throwable ex) { - throw fatal("Unable to configure and start Clock.", ex); + try { + final IDriverServiceClient driverServiceClient = injector.getInstance(IDriverServiceClient.class); + try (final Clock reef = injector.getInstance(Clock.class)) { + reef.run(); + } catch (final InjectionException ex) { + LOG.log(Level.SEVERE, "Unable to configure driver client."); + driverServiceClient.onInitializationException(ex.getCause() != null ? ex.getCause() : ex); + } catch (final Throwable t) { + if (t.getCause() != null && t.getCause() instanceof InjectionException) { + LOG.log(Level.SEVERE, "Unable to configure driver client."); + final InjectionException ex = (InjectionException) t.getCause(); + driverServiceClient.onInitializationException(ex.getCause() != null ? ex.getCause() : ex); + } else { + throw fatal("Unable run clock.", t); + } + } + } catch (InjectionException e) { + throw fatal("Unable initialize driver service client.", e); } ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():"); http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java index 1c315bb..084d584 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/FailedContextBridge.java @@ -40,7 +40,7 @@ public final class FailedContextBridge implements FailedContext { private final Optional<ActiveContext> parentContext; - private final Optional<byte[]> data; + private final Optional<Throwable> reason; public FailedContextBridge( final String contextId, @@ -48,13 +48,13 @@ public final class FailedContextBridge implements FailedContext { final String message, final EvaluatorDescriptor evaluatorDescriptor, final Optional<ActiveContext> parentContext, - final Optional<byte[]> data) { + final Optional<Throwable> reason) { this.contextId = contextId; this.evaluatorId = evaluatorId; this.message = message; this.evaluatorDescriptor = evaluatorDescriptor; this.parentContext = parentContext; - this.data = data; + this.reason = reason; } @Override @@ -74,12 +74,12 @@ public final class FailedContextBridge implements FailedContext { @Override public Optional<Throwable> getReason() { - return Optional.<Throwable>of(new EvaluatorException(this.evaluatorId, this.message)); + return this.reason; } @Override public Optional<byte[]> getData() { - return this.data; + return Optional.empty(); } @Override http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java index d6d3f5e..b537b71 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/RunningTaskBridge.java @@ -60,12 +60,12 @@ public final class RunningTaskBridge implements RunningTask { @Override public void suspend(final byte[] message) { - throw new UnsupportedOperationException("Suspend task not supported"); + this.driverServiceClient.onSuspendTask(this.taskId, Optional.of(message)); } @Override public void suspend() { - throw new UnsupportedOperationException("Suspend task not supported"); + this.driverServiceClient.onSuspendTask(this.taskId, Optional.<byte[]>empty()); } @Override http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java new file mode 100644 index 0000000..92b7993 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/events/SuspendedTaskBridge.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.client.events; + +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.task.SuspendedTask; + +/** + * Suspended task bridge. + */ +public final class SuspendedTaskBridge implements SuspendedTask { + + private final String taskId; + + private final ActiveContext context; + + private final byte[] result; + + public SuspendedTaskBridge(final String taskId, final ActiveContext context, final byte[] result) { + this.taskId = taskId; + this.context = context; + this.result = result; + } + + @Override + public ActiveContext getActiveContext() { + return this.context; + } + + @Override + public byte[] get() { + return this.result; + } + + @Override + public String getId() { + return this.taskId; + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java index b646d6e..05fcd40 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java @@ -20,6 +20,7 @@ package org.apache.reef.bridge.driver.client.grpc; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; @@ -39,6 +40,7 @@ import org.apache.reef.driver.restart.DriverRestarted; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.exception.EvaluatorException; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorDescriptorImpl; +import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.tang.InjectionFuture; import org.apache.reef.util.Optional; import org.apache.reef.wake.remote.ports.TcpPortProvider; @@ -64,8 +66,12 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl private Server server; + private final Object lock = new Object(); + private final InjectionFuture<Clock> clock; + private final ExceptionCodec exceptionCodec; + private final DriverServiceClient driverServiceClient; private final TcpPortProvider tcpPortProvider; @@ -76,22 +82,28 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl private final Map<String, ActiveContextBridge> activeContextBridgeMap = new HashMap<>(); - private boolean isIdle = false; + private int outstandingEvaluatorCount = 0; @Inject private DriverClientService( + final ExceptionCodec exceptionCodec, final DriverServiceClient driverServiceClient, final TcpPortProvider tcpPortProvider, final InjectionFuture<Clock> clock, final InjectionFuture<DriverClientDispatcher> clientDriverDispatcher) { + this.exceptionCodec = exceptionCodec; this.driverServiceClient = driverServiceClient; this.tcpPortProvider = tcpPortProvider; this.clock = clock; this.clientDriverDispatcher = clientDriverDispatcher; } - void setNotIdle() { - this.isIdle = false; + @Override + public void notifyEvaluatorRequest(final int count) { + synchronized (this.lock) { + this.outstandingEvaluatorCount += count; + this.lock.notify(); + } } @Override @@ -123,21 +135,19 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Override public void idlenessCheckHandler(final Void request, final StreamObserver<IdleStatus> responseObserver) { - if (clock.get().isIdle() && this.evaluatorBridgeMap.isEmpty()) { + if (isIdle()) { LOG.log(Level.INFO, "possibly idle. waiting for some action."); - this.isIdle = true; try { - Thread.sleep(120000); // a couple of minutes + synchronized (this.lock) { + this.lock.wait(1000); // wait a second + } } catch (InterruptedException e) { LOG.log(Level.WARNING, e.getMessage()); } - } else { - LOG.log(Level.INFO, "not idle"); - this.isIdle = false; } responseObserver.onNext(IdleStatus.newBuilder() .setReason("DriverClient checking idleness") - .setIsIdle(this.isIdle) + .setIsIdle(this.isIdle()) .build()); responseObserver.onCompleted(); } @@ -155,13 +165,22 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl } @Override - public void stopHandler(final StopTimeInfo request, final StreamObserver<Void> responseObserver) { + public void stopHandler(final StopTimeInfo request, final StreamObserver<ExceptionInfo> responseObserver) { try { LOG.log(Level.INFO, "StopHandler at time {0}", request.getStopTime()); final StopTime stopTime = new StopTime(request.getStopTime()); - this.clientDriverDispatcher.get().dispatch(stopTime); + final Throwable error = this.clientDriverDispatcher.get().dispatch(stopTime); + if (error != null) { + responseObserver.onNext( + ExceptionInfo.newBuilder() + .setName(error.getCause() != null ? error.getCause().toString() : error.toString()) + .setMessage(error.getMessage() == null ? error.toString() : error.getMessage()) + .setData(ByteString.copyFrom(exceptionCodec.toBytes(error))) + .build()); + } else { + responseObserver.onNext(ExceptionInfo.newBuilder().setNoError(true).build()); + } } finally { - responseObserver.onNext(null); responseObserver.onCompleted(); this.server.shutdown(); } @@ -181,7 +200,9 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Override public void allocatedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { try { - this.isIdle = false; + synchronized (this.lock) { + this.outstandingEvaluatorCount--; + } LOG.log(Level.INFO, "Allocated evaluator id {0}", request.getEvaluatorId()); final AllocatedEvaluatorBridge eval = new AllocatedEvaluatorBridge( request.getEvaluatorId(), @@ -210,6 +231,15 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Override public void failedEvaluatorHandler(final EvaluatorInfo request, final StreamObserver<Void> responseObserver) { try { + if (!this.evaluatorBridgeMap.containsKey(request.getEvaluatorId())) { + LOG.log(Level.INFO, "Failed evalautor that we were not allocated"); + synchronized (this.lock) { + if (this.outstandingEvaluatorCount > 0) { + this.outstandingEvaluatorCount--; + } + } + return; + } LOG.log(Level.INFO, "Failed Evaluator id {0}", request.getEvaluatorId()); final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.remove(request.getEvaluatorId()); List<FailedContext> failedContextList = new ArrayList<>(); @@ -224,7 +254,7 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl context.getParentId().isPresent() ? Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : Optional.<ActiveContext>empty(), - Optional.<byte[]>empty())); + Optional.<Throwable>empty())); } for (final String failedContextId : request.getFailure().getFailedContextsList()) { this.activeContextBridgeMap.remove(failedContextId); @@ -253,7 +283,6 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Override public void activeContextHandler(final ContextInfo request, final StreamObserver<Void> responseObserver) { try { - this.isIdle = false; LOG.log(Level.INFO, "Active context id {0}", request.getContextId()); final AllocatedEvaluatorBridge eval = this.evaluatorBridgeMap.get(request.getEvaluatorId()); final ActiveContextBridge context = new ActiveContextBridge( @@ -302,8 +331,9 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl final Optional<ActiveContext> parent = context.getParentId().isPresent() ? Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : Optional.<ActiveContext>empty(); - final Optional<byte[]> data = request.getException().getData() != null ? - Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); + final Optional<Throwable> reason = !request.getException().getData().isEmpty() ? + this.exceptionCodec.fromBytes(request.getException().getData().toByteArray()) : + Optional.<Throwable>empty(); this.clientDriverDispatcher.get().dispatch( new FailedContextBridge( context.getId(), @@ -311,7 +341,7 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl request.getException().getMessage(), context.getEvaluatorDescriptor(), parent, - data)); + reason)); } finally { responseObserver.onNext(null); responseObserver.onCompleted(); @@ -374,15 +404,15 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl this.activeContextBridgeMap.containsKey(request.getContext().getContextId()) ? Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContext().getContextId())) : Optional.<ActiveContext>empty(); - final Optional<byte[]> data = request.getException().getData() != null ? - Optional.of(request.getException().getData().toByteArray()) : Optional.<byte[]>empty(); this.clientDriverDispatcher.get().dispatch( new FailedTask( request.getTaskId(), request.getException().getMessage(), Optional.of(request.getException().getName()), - Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())), - data, + request.getException().getData().isEmpty() ? + Optional.<Throwable>of(new EvaluatorException(request.getException().getMessage())) : + this.exceptionCodec.fromBytes(request.getException().getData().toByteArray()), + Optional.<byte[]>empty(), context)); } finally { responseObserver.onNext(null); @@ -403,7 +433,8 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl new CompletedTaskBridge( request.getTaskId(), context, - request.getResult() != null ? request.getResult().toByteArray() : null)); + request.getResult() != null && !request.getResult().isEmpty() ? + request.getResult().toByteArray() : null)); } finally { responseObserver.onNext(null); responseObserver.onCompleted(); @@ -412,7 +443,23 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl @Override public void suspendedTaskHandler(final TaskInfo request, final StreamObserver<Void> responseObserver) { - responseObserver.onError(Status.INTERNAL.withDescription("Not supported").asRuntimeException()); + final ContextInfo contextInfo = request.getContext(); + if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) { + this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo)); + } + LOG.log(Level.INFO, "Suspended task id {0}", request.getTaskId()); + try { + final ActiveContextBridge context = this.activeContextBridgeMap.get(request.getContext().getContextId()); + this.clientDriverDispatcher.get().dispatch( + new SuspendedTaskBridge( + request.getTaskId(), + context, + request.getResult() != null && !request.getResult().isEmpty() ? + request.getResult().toByteArray() : null)); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } } @Override @@ -589,6 +636,16 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl } // Helper methods + private boolean isIdle() { + LOG.log(Level.INFO, "Clock idle {0}, outstanding evaluators {1}, current evaluators {2}", + new Object[] { + this.clock.get().isIdle(), + this.outstandingEvaluatorCount, + this.evaluatorBridgeMap.isEmpty()}); + return clock.get().isIdle() && + this.outstandingEvaluatorCount == 0 && + this.evaluatorBridgeMap.isEmpty(); + } private EvaluatorDescriptor toEvaluatorDescriptor(final EvaluatorDescriptorInfo info) { return new EvaluatorDescriptorImpl( http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java index 81fb290..e89a5a5 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverServiceClient.java @@ -27,10 +27,11 @@ import org.apache.reef.bridge.driver.client.IDriverServiceClient; import org.apache.reef.bridge.driver.client.JVMClientProcess; import org.apache.reef.bridge.driver.client.grpc.parameters.DriverServicePort; import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; import org.apache.reef.driver.context.ContextConfiguration; import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.InjectionFuture; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.Optional; @@ -38,6 +39,10 @@ import org.apache.reef.util.Optional; import javax.inject.Inject; import java.io.File; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * The client that exposes methods for communicating back to the @@ -46,7 +51,7 @@ import java.util.List; @Private public final class DriverServiceClient implements IDriverServiceClient { - private final InjectionFuture<DriverClientService> driverClientService; + private final ExceptionCodec exceptionCodec; private final ConfigurationSerializer configurationSerializer; @@ -54,11 +59,11 @@ public final class DriverServiceClient implements IDriverServiceClient { @Inject private DriverServiceClient( - final InjectionFuture<DriverClientService> driverClientService, final ConfigurationSerializer configurationSerializer, + final ExceptionCodec exceptionCodec, @Parameter(DriverServicePort.class) final Integer driverServicePort) { - this.driverClientService = driverClientService; this.configurationSerializer = configurationSerializer; + this.exceptionCodec = exceptionCodec; final ManagedChannel channel = ManagedChannelBuilder .forAddress("localhost", driverServicePort) .usePlaintext(true) @@ -75,6 +80,23 @@ public final class DriverServiceClient implements IDriverServiceClient { } @Override + public void onInitializationException(final Throwable ex) { + final Future<Void> callComplete = this.serviceStub.registerDriverClient( + DriverClientRegistration.newBuilder() + .setException(ExceptionInfo.newBuilder() + .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) + .setMessage(ex.getMessage() == null ? ex.toString() : ex.getMessage()) + .setData(ByteString.copyFrom(exceptionCodec.toBytes(ex))) + .build()) + .build()); + try { + callComplete.get(5, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override public void onShutdown() { this.serviceStub.shutdown(ShutdownRequest.newBuilder().build()); } @@ -84,14 +106,14 @@ public final class DriverServiceClient implements IDriverServiceClient { this.serviceStub.shutdown(ShutdownRequest.newBuilder() .setException(ExceptionInfo.newBuilder() .setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) - .setMessage(ex.getMessage()) + .setMessage(ex.getMessage() == null ? ex.toString() : ex.getMessage()) + .setData(ByteString.copyFrom(exceptionCodec.toBytes(ex))) .build()) .build()); } @Override public void onSetAlarm(final String alarmId, final int timeoutMS) { - this.driverClientService.get().setNotIdle(); this.serviceStub.setAlarm( AlarmRequest.newBuilder() .setAlarmId(alarmId) @@ -101,7 +123,6 @@ public final class DriverServiceClient implements IDriverServiceClient { @Override public void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest) { - this.driverClientService.get().setNotIdle(); this.serviceStub.requestResources( ResourceRequest.newBuilder() .setCores(evaluatorRequest.getNumberOfCores()) @@ -215,18 +236,38 @@ public final class DriverServiceClient implements IDriverServiceClient { @Override public void onTaskClose(final String taskId, final Optional<byte[]> message) { - this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() - .setTaskId(taskId) - .setCloseTask(true) - .setMessage(message.isPresent() ? ByteString.copyFrom(message.get()) : null) - .build()); + this.serviceStub.runningTaskOp(message.isPresent() ? + RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setOperation(RunningTaskRequest.Operation.CLOSE) + .setMessage(ByteString.copyFrom(message.get())) + .build() : + RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setOperation(RunningTaskRequest.Operation.CLOSE) + .build()); } @Override public void onTaskMessage(final String taskId, final byte[] message) { this.serviceStub.runningTaskOp(RunningTaskRequest.newBuilder() .setTaskId(taskId) + .setOperation(RunningTaskRequest.Operation.SEND_MESSAGE) .setMessage(ByteString.copyFrom(message)) .build()); } + + @Override + public void onSuspendTask(final String taskId, final Optional<byte[]> message) { + this.serviceStub.runningTaskOp(message.isPresent() ? + RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setOperation(RunningTaskRequest.Operation.SUSPEND) + .setMessage(ByteString.copyFrom(message.get())) + .build() : + RunningTaskRequest.newBuilder() + .setTaskId(taskId) + .setOperation(RunningTaskRequest.Operation.SUSPEND) + .build()); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java index c122552..9ccf1d8 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java @@ -19,6 +19,7 @@ package org.apache.reef.bridge.driver.launch; import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.LauncherStatus; /** * All driver launchers implement this method. @@ -29,5 +30,5 @@ public interface IDriverLauncher { * Launch the driver with the dynamic {@link ClientProtocol.DriverClientConfiguration}. * @param driverClientConfiguration dynamic driver configuration parameters */ - void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration); + LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration); } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java index d1b4557..e1235b8 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java @@ -21,16 +21,14 @@ package org.apache.reef.bridge.driver.launch.azbatch; import org.apache.reef.bridge.driver.launch.IDriverLauncher; import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; import org.apache.reef.bridge.proto.ClientProtocol; -import org.apache.reef.client.REEF; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfiguration; import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfigurationCreator; import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; /** * This is a bootstrap launcher for Azure Batch for submission from C#. It allows for Java Driver @@ -39,9 +37,6 @@ import java.util.logging.Logger; */ public final class AzureBatchLauncher implements IDriverLauncher { - private static final Logger LOG = Logger.getLogger(AzureBatchLauncher.class.getName()); - private static final Tang TANG = Tang.Factory.getTang(); - private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; @Inject @@ -49,17 +44,13 @@ public final class AzureBatchLauncher implements IDriverLauncher { this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; } - public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { - try (final REEF reef = TANG.newInjector( - generateConfigurationFromJobSubmissionParameters(driverClientConfiguration)).getInstance(REEF.class)) { - LOG.log(Level.INFO, "Submitting job"); - reef.submit(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); + public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + try { + return DriverLauncher.getLauncher(generateConfigurationFromJobSubmissionParameters(driverClientConfiguration)) + .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); } catch (InjectionException e) { - fatal("unable to launch", e); + throw new RuntimeException(e); } - LOG.log(Level.INFO, "Exiting BootstrapLauncher.main()"); - - System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() } private static Configuration generateConfigurationFromJobSubmissionParameters( @@ -83,9 +74,4 @@ public final class AzureBatchLauncher implements IDriverLauncher { driverClientConfiguration.getAzbatchRuntime().getAzureStorageContainerName()) .build(); } - - private static RuntimeException fatal(final String msg, final Throwable t) { - LOG.log(Level.SEVERE, msg, t); - return new RuntimeException(msg, t); - } } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java index 153693c..4cdbfcb 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java @@ -23,6 +23,7 @@ import org.apache.reef.bridge.driver.launch.IDriverLauncher; import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; import org.apache.reef.bridge.proto.ClientProtocol; import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.ConfigurationModule; @@ -45,7 +46,7 @@ public final class LocalLauncher implements IDriverLauncher { this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; } - public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { ConfigurationModule localRuntimeCM = LocalRuntimeConfiguration.CONF; if (driverClientConfiguration.getLocalRuntime().getMaxNumberOfEvaluators() > 0) { localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, @@ -64,7 +65,7 @@ public final class LocalLauncher implements IDriverLauncher { driverClientConfiguration.getDriverJobSubmissionDirectory()); } try { - DriverLauncher + return DriverLauncher .getLauncher(localRuntimeCM.build()) .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); } catch (InjectionException e) { http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java index 21f3989..810fad6 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java @@ -50,7 +50,7 @@ public final class YarnLauncher implements IDriverLauncher { private YarnLauncher(){ } - public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + public LauncherStatus launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { try { try { final IDriverServiceConfigurationProvider driverConfigurationProvider = @@ -77,6 +77,7 @@ public final class YarnLauncher implements IDriverLauncher { LOG.log(Level.SEVERE, status.getError().get().getMessage()); status.getError().get().printStackTrace(); } + return status; } catch (InjectionException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java index eebc51c..b9690cc 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java @@ -67,6 +67,7 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class) .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class) .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class) + .set(DriverConfiguration.ON_TASK_SUSPENDED, DriverServiceHandlers.SuspendedTaskHandler.class) .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class) .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class) .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class) http://git-wip-us.apache.org/repos/asf/reef/blob/8f3dafa9/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java index 7666695..5c2476b 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java @@ -22,7 +22,6 @@ import com.google.protobuf.ByteString; import io.grpc.*; import io.grpc.stub.StreamObserver; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.Validate; import org.apache.reef.bridge.driver.service.DriverClientException; import org.apache.reef.bridge.driver.service.IDriverService; import org.apache.reef.bridge.service.parameters.DriverClientCommand; @@ -39,9 +38,11 @@ import org.apache.reef.driver.task.*; import org.apache.reef.runtime.common.driver.context.EvaluatorContext; import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.OSUtils; +import org.apache.reef.util.Optional; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.time.Clock; @@ -53,9 +54,14 @@ import javax.inject.Inject; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,14 +73,18 @@ public final class GRPCDriverService implements IDriverService { private static final Void VOID = Void.newBuilder().build(); - private Server server; - private Process driverProcess; + private enum StreamType { STDOUT, STDERR } + + private Server server; + private DriverClientGrpc.DriverClientFutureStub clientStub; private final Clock clock; + private final ExceptionCodec exceptionCodec; + private final ConfigurationSerializer configurationSerializer; private final EvaluatorRequestor evaluatorRequestor; @@ -103,8 +113,10 @@ public final class GRPCDriverService implements IDriverService { final JVMProcessFactory jvmProcessFactory, final CLRProcessFactory clrProcessFactory, final TcpPortProvider tcpPortProvider, + final ExceptionCodec exceptionCodec, @Parameter(DriverClientCommand.class) final String driverClientCommand) { this.clock = clock; + this.exceptionCodec = exceptionCodec; this.configurationSerializer = configurationSerializer; this.jvmProcessFactory = jvmProcessFactory; this.clrProcessFactory = clrProcessFactory; @@ -130,12 +142,19 @@ public final class GRPCDriverService implements IDriverService { throw new IOException("Unable to start gRPC server"); } else { final String cmd = this.driverClientCommand + " " + this.server.getPort(); - final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd; + final List<String> cmdOs = OSUtils.isWindows() ? + Arrays.asList("cmd.exe", "/c", cmd) : Arrays.asList("/bin/sh", "-c", cmd); LOG.log(Level.INFO, "CMD: " + cmdOs); - this.driverProcess = Runtime.getRuntime().exec(cmdOs, null, new File(System.getProperty("user.dir"))); + this.driverProcess = new ProcessBuilder() + .command(cmdOs) + .redirectError(new File("driverclient.stderr")) + .redirectOutput(new File("driverclient.stdout")) + .directory(new File(System.getProperty("user.dir"))) + .start(); synchronized (this) { + int attempts = 10; // give some time // wait for driver client process to register - while (this.clientStub == null && driverProcessIsAlive()) { + while (attempts-- > 0 && this.clientStub == null && driverProcessIsAlive()) { LOG.log(Level.INFO, "waiting for driver process to register"); this.wait(1000); // a second } @@ -164,10 +183,12 @@ public final class GRPCDriverService implements IDriverService { LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); if (!stopped) { try { - if (t != null) { - clock.stop(t); - } else { - clock.stop(); + if (!clock.isClosed()) { + if (t != null) { + clock.stop(t); + } else { + clock.stop(); + } } if (server != null) { LOG.log(Level.INFO, "Shutdown gRPC"); @@ -191,35 +212,40 @@ public final class GRPCDriverService implements IDriverService { if (!driverProcessIsAlive()) { LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue()); } - LOG.log(Level.INFO, "capturing driver process stderr"); - StringBuffer outputBuffer = new StringBuffer(); + dumpStream(StreamType.STDOUT); + dumpStream(StreamType.STDERR); + } + + private void dumpStream(final StreamType type) { + StringBuffer buffer = new StringBuffer(); + + String name = ""; + InputStream stream = null; + switch(type) { + case STDOUT: + name = "stdout"; + stream = this.driverProcess.getInputStream(); + break; + case STDERR: + name = "stderr"; + stream = this.driverProcess.getErrorStream(); + break; + default: + LOG.log(Level.WARNING, "Invalid stream type value"); + } + + LOG.log(Level.INFO, "capturing driver process " + name); try { int nextChar; - final InputStream errStream = this.driverProcess.getErrorStream(); - outputBuffer.append("\nSTDERR =======================================\n"); - while ((nextChar = errStream.read()) != -1) { - outputBuffer.append((char) nextChar); - } - outputBuffer.append("\n==============================================\n"); - final InputStream outStream = this.driverProcess.getInputStream(); - outputBuffer.append("\nSTDOUT =======================================\n"); - while ((nextChar = outStream.read()) != -1) { - outputBuffer.append((char) nextChar); - } - outputBuffer.append("\n==============================================\n"); + buffer.append("\n==============================================\n"); + while ((nextChar = stream.read()) != -1) { + buffer.append((char) nextChar); + } + buffer.append("\n==============================================\n"); } catch (IOException e) { LOG.log(Level.WARNING, "Error while capturing output stream: " + e.getMessage()); } - LOG.log(Level.INFO, outputBuffer.toString()); - } - - /** - * Await termination on the main thread since the grpc library uses daemon threads. - */ - private void blockUntilShutdown() throws InterruptedException { - if (server != null) { - server.awaitTermination(); - } + LOG.log(Level.INFO, buffer.toString()); } /** @@ -292,16 +318,31 @@ public final class GRPCDriverService implements IDriverService { @Override public void stopHandler(final StopTime stopTime) { synchronized (this) { - try { - if (clientStub != null) { - this.clientStub.stopHandler( - StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build()); + if (clientStub != null) { + final Future<ExceptionInfo> callCompletion = this.clientStub.stopHandler( + StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build()); + try { + try { + final ExceptionInfo error = callCompletion.get(5L, TimeUnit.MINUTES); + if (!error.getNoError()) { + final Optional<Throwable> t = parseException(error); + if (t.isPresent()) { + throw new RuntimeException("driver stop exception", + t.get().getCause() != null ? t.get().getCause() : t.get()); + } else { + throw new RuntimeException(error.getMessage() != null ? error.getMessage() : error.getName()); + } + } + } catch (TimeoutException e) { + throw new RuntimeException("stop handler timed out", e); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + stop(); } - } finally { - stop(); } } - } @Override @@ -344,7 +385,7 @@ public final class GRPCDriverService implements IDriverService { .setEvaluatorId(context.getEvaluatorId()) .setParentId( context.getParentId().isPresent() ? - context.getParentId().get() : null) + context.getParentId().get() : "") .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( context.getEvaluatorDescriptor())) .build()); @@ -369,17 +410,41 @@ public final class GRPCDriverService implements IDriverService { @Override public void failedContextHandler(final FailedContext context) { synchronized (this) { - this.activeContextMap.remove(context.getId()); - this.clientStub.closedContextHandler( + final ContextInfo.Builder contextInfoBuilder = ContextInfo.newBuilder() .setContextId(context.getId()) .setEvaluatorId(context.getEvaluatorId()) .setParentId( context.getParentContext().isPresent() ? - context.getParentContext().get().getId() : null) + context.getParentContext().get().getId() : "") .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( - context.getEvaluatorDescriptor())) - .build()); + context.getEvaluatorDescriptor())); + if (context.getReason().isPresent()) { + final Throwable reason = context.getReason().get(); + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(context.getMessage() != null ? context.getMessage() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } else if (context.getData().isPresent()) { + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(context.toString()) + .setMessage(context.getDescription().isPresent() ? + context.getDescription().get() : + context.getMessage() != null ? context.getMessage() : "") + .setData(ByteString.copyFrom(context.getData().get())) + .build()); + } else { + final Throwable reason = context.asError(); + contextInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(context.getMessage() != null ? context.getMessage() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + + } + this.activeContextMap.remove(context.getId()); + this.clientStub.failedContextHandler(contextInfoBuilder.build()); } } @@ -425,18 +490,40 @@ public final class GRPCDriverService implements IDriverService { !this.activeContextMap.containsKey(task.getActiveContext().get().getId())) { this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get()); } + final TaskInfo.Builder taskInfoBuilder = TaskInfo.newBuilder() + .setTaskId(task.getId()); + if (task.getActiveContext().isPresent()) { + taskInfoBuilder.setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().get().getId()) + .setEvaluatorId(task.getActiveContext().get().getEvaluatorId()) + .setParentId(task.getActiveContext().get().getParentId().isPresent() ? + task.getActiveContext().get().getParentId().get() : "") + .build()); + } + if (task.getReason().isPresent()) { + final Throwable reason = task.getReason().get(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() != null ? task.getMessage() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } else if (task.getData().isPresent()) { + final Throwable reason = task.asError(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() != null ? task.getMessage() : "") + .setData(ByteString.copyFrom(task.getData().get())) + .build()); + } else { + final Throwable reason = task.asError(); + taskInfoBuilder.setException(ExceptionInfo.newBuilder() + .setName(reason.toString()) + .setMessage(task.getMessage() != null ? task.getMessage() : "") + .setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) + .build()); + } this.runningTaskMap.remove(task.getId()); - this.clientStub.failedTaskHandler( - TaskInfo.newBuilder() - .setTaskId(task.getId()) - .setContext(task.getActiveContext().isPresent() ? - ContextInfo.newBuilder() - .setContextId(task.getActiveContext().get().getId()) - .setEvaluatorId(task.getActiveContext().get().getEvaluatorId()) - .setParentId(task.getActiveContext().get().getParentId().isPresent() ? - task.getActiveContext().get().getParentId().get() : "") - .build() : null) - .build()); + this.clientStub.failedTaskHandler(taskInfoBuilder.build()); } } @@ -478,6 +565,8 @@ public final class GRPCDriverService implements IDriverService { .setParentId(task.getActiveContext().getParentId().isPresent() ? task.getActiveContext().getParentId().get() : "") .build()) + .setResult(task.get() == null || task.get().length == 0 ? + null : ByteString.copyFrom(task.get())) .build()); } } @@ -606,6 +695,14 @@ public final class GRPCDriverService implements IDriverService { } } + private Optional<Throwable> parseException(final ExceptionInfo info) { + if (info.getData() == null || info.getData().isEmpty()) { + return Optional.empty(); + } else { + return exceptionCodec.fromBytes(info.getData().toByteArray()); + } + } + private final class DriverBridgeServiceImpl extends DriverServiceGrpc.DriverServiceImplBase { @@ -613,16 +710,31 @@ public final class GRPCDriverService implements IDriverService { public void registerDriverClient( final DriverClientRegistration request, final StreamObserver<Void> responseObserver) { + LOG.log(Level.INFO, "driver client register"); try { - final ManagedChannel channel = ManagedChannelBuilder - .forAddress(request.getHost(), request.getPort()) - .usePlaintext(true) - .build(); - synchronized (GRPCDriverService.this) { - GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel); - GRPCDriverService.this.notifyAll(); + if (request.hasException()) { + LOG.log(Level.SEVERE, "Driver client initialization exception"); + final Optional<Throwable> ex = parseException(request.getException()); + if (ex.isPresent()) { + GRPCDriverService.this.clock.stop(ex.get()); + } else { + GRPCDriverService.this.clock.stop(new RuntimeException( + request.getException().getMessage() == null ? + request.getException().getName() : + request.getException().getMessage() + )); + } + } else { + final ManagedChannel channel = ManagedChannelBuilder + .forAddress(request.getHost(), request.getPort()) + .usePlaintext(true) + .build(); + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel); + GRPCDriverService.this.notifyAll(); + } + LOG.log(Level.INFO, "Driver has registered on port " + request.getPort()); } - LOG.log(Level.INFO, "Driver has registered on port " + request.getPort()); } finally { responseObserver.onNext(null); responseObserver.onCompleted(); @@ -662,13 +774,20 @@ public final class GRPCDriverService implements IDriverService { final ShutdownRequest request, final StreamObserver<Void> responseObserver) { try { - synchronized (GRPCDriverService.this) { - if (request.getException() != null) { + LOG.log(Level.INFO, "driver shutdown"); + if (request.hasException()) { + final Optional<Throwable> exception = parseException(request.getException()); + if (exception.isPresent()) { + LOG.log(Level.INFO, "driver exception: " + exception.get().toString()); + GRPCDriverService.this.clock.stop(exception.get()); + } else { + // exception that cannot be parsed in java GRPCDriverService.this.clock.stop( new DriverClientException(request.getException().getMessage())); - } else { - GRPCDriverService.this.clock.stop(); } + } else { + LOG.log(Level.INFO, "clean shutdown"); + GRPCDriverService.this.clock.stop(); } } finally { responseObserver.onNext(null); @@ -681,17 +800,23 @@ public final class GRPCDriverService implements IDriverService { final AlarmRequest request, final StreamObserver<Void> responseObserver) { try { - synchronized (GRPCDriverService.this) { - GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm value) { - synchronized (GRPCDriverService.this) { - GRPCDriverService.this.clientStub.alarmTrigger( - AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build()); - } + // do not synchronize when scheduling an alarm (or deadlock) + LOG.log(Level.INFO, "Set alarm {0} offset {1}", + new Object[] {request.getAlarmId(), request.getTimeoutMs()}); + LOG.log(Level.INFO, "Alarm class " + GRPCDriverService.this.clock.getClass()); + GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + LOG.log(Level.INFO, "Trigger alarm {0}", request.getAlarmId()); + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub.alarmTrigger( + AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build()); + LOG.log(Level.INFO, "DONE: trigger alarm {0}", request.getAlarmId()); } - }); - } + } + }); + LOG.log(Level.INFO, "Alarm {0} scheduled is idle? {1}", + new Object[] {request.getAlarmId(), clock.isIdle()}); } finally { responseObserver.onNext(null); responseObserver.onCompleted(); @@ -750,10 +875,9 @@ public final class GRPCDriverService implements IDriverService { if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) { // Assume that we are running Java driver client, but this assumption could be a bug so log a warning LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client."); - if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) { + if (StringUtils.isNotEmpty(request.getContextConfiguration()) && + StringUtils.isNotEmpty(request.getTaskConfiguration())) { // submit context and task - Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); - Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); try { evaluator.submitContextAndTask( configurationSerializer.fromString(request.getContextConfiguration()), @@ -761,17 +885,15 @@ public final class GRPCDriverService implements IDriverService { } catch (IOException e) { throw new RuntimeException(e); } - } else if (request.getContextConfiguration() != null) { + } else if (StringUtils.isNotEmpty(request.getContextConfiguration())) { // submit context - Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); try { evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration())); } catch (IOException e) { throw new RuntimeException(e); } - } else if (request.getTaskConfiguration() != null) { + } else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) { // submit task - Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); try { evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration())); } catch (IOException e) { @@ -781,26 +903,20 @@ public final class GRPCDriverService implements IDriverService { throw new RuntimeException("Missing check for required evaluator configurations"); } } else { - if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) { + if (StringUtils.isNotEmpty(request.getContextConfiguration()) && + StringUtils.isNotEmpty(request.getTaskConfiguration())) { // submit context and task - Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); - Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); - Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask( request.getEvaluatorConfiguration(), request.getContextConfiguration(), request.getTaskConfiguration()); - } else if (request.getContextConfiguration() != null) { + } else if (StringUtils.isNotEmpty(request.getContextConfiguration())) { // submit context - Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); - Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); ((AllocatedEvaluatorImpl) evaluator).submitContext( request.getEvaluatorConfiguration(), request.getContextConfiguration()); - } else if (request.getTaskConfiguration() != null) { + } else if (StringUtils.isNotEmpty(request.getTaskConfiguration())) { // submit task - Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); - Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); ((AllocatedEvaluatorImpl) evaluator).submitTask( request.getEvaluatorConfiguration(), request.getTaskConfiguration()); @@ -884,23 +1000,40 @@ public final class GRPCDriverService implements IDriverService { final StreamObserver<Void> responseObserver) { synchronized (GRPCDriverService.this) { if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) { + LOG.log(Level.WARNING, "Unknown task id {0}", request.getTaskId()); responseObserver.onError(Status.INTERNAL .withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException()); - } - try { - final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId()); - if (request.getCloseTask()) { - if (request.getMessage() != null) { - task.close(request.getMessage().toByteArray()); - } else { - task.close(); + } else { + try { + final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId()); + switch (request.getOperation()) { + case CLOSE: + LOG.log(Level.INFO, "close task {0}", task.getId()); + if (request.getMessage().isEmpty()) { + task.close(); + } else { + task.close(request.getMessage().toByteArray()); + } + break; + case SUSPEND: + LOG.log(Level.INFO, "suspend task {0}", task.getId()); + if (request.getMessage().isEmpty()) { + task.suspend(); + } else { + task.suspend(request.getMessage().toByteArray()); + } + break; + case SEND_MESSAGE: + LOG.log(Level.INFO, "send message to task {0}", task.getId()); + task.send(request.getMessage().toByteArray()); + break; + default: + throw new RuntimeException("Unknown operation " + request.getOperation()); } - } else if (request.getMessage() != null) { - task.send(request.getMessage().toByteArray()); + responseObserver.onNext(null); + } finally { + responseObserver.onCompleted(); } - } finally { - responseObserver.onNext(null); - responseObserver.onCompleted(); } } }