http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java new file mode 100644 index 0000000..ad7eaa8 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java @@ -0,0 +1,919 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service.grpc; + +import com.google.protobuf.ByteString; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.reef.bridge.driver.service.DriverClientException; +import org.apache.reef.bridge.driver.service.IDriverService; +import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.bridge.proto.*; +import org.apache.reef.bridge.proto.Void; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.*; +import org.apache.reef.driver.restart.DriverRestartCompleted; +import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; +import org.apache.reef.runtime.common.driver.idle.IdleMessage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.OSUtils; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * GRPC DriverBridgeService that interacts with higher-level languages. + */ +public final class GRPCDriverService implements IDriverService { + private static final Logger LOG = Logger.getLogger(GRPCDriverService.class.getName()); + + private static final Void VOID = Void.newBuilder().build(); + + private Server server; + + private Process driverProcess; + + private DriverClientGrpc.DriverClientFutureStub clientStub; + + private final Clock clock; + + private final ConfigurationSerializer configurationSerializer; + + private final EvaluatorRequestor evaluatorRequestor; + + private final JVMProcessFactory jvmProcessFactory; + + private final CLRProcessFactory clrProcessFactory; + + private final TcpPortProvider tcpPortProvider; + + private final String driverClientCommand; + + private final Map<String, AllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>(); + + private final Map<String, ActiveContext> activeContextMap = new HashMap<>(); + + private final Map<String, RunningTask> runningTaskMap = new HashMap<>(); + + private boolean stopped = false; + + @Inject + private GRPCDriverService( + final Clock clock, + final EvaluatorRequestor evaluatorRequestor, + final ConfigurationSerializer configurationSerializer, + final JVMProcessFactory jvmProcessFactory, + final CLRProcessFactory clrProcessFactory, + final TcpPortProvider tcpPortProvider, + @Parameter(DriverClientCommand.class) final String driverClientCommand) { + this.clock = clock; + this.configurationSerializer = configurationSerializer; + this.jvmProcessFactory = jvmProcessFactory; + this.clrProcessFactory = clrProcessFactory; + this.evaluatorRequestor = evaluatorRequestor; + this.driverClientCommand = driverClientCommand; + this.tcpPortProvider = tcpPortProvider; + } + + private void start() throws IOException, InterruptedException { + for (final Integer port : this.tcpPortProvider) { + try { + this.server = ServerBuilder.forPort(port) + .addService(new DriverBridgeServiceImpl()) + .build() + .start(); + LOG.info("Server started, listening on " + port); + break; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to bind to port [{0}]", port); + } + } + if (this.server == null || this.server.isTerminated()) { + throw new IOException("Unable to start gRPC server"); + } else { + final String cmd = this.driverClientCommand + " " + this.server.getPort(); + final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd; + final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr"; + this.driverProcess = Runtime.getRuntime().exec(cmdStd); + synchronized (this) { + // wait for driver client process to register + while (this.clientStub == null && driverProcessIsAlive()) { + this.wait(1000); // a second + } + } + if (driverProcessIsAlive()) { + Thread closeChildThread = new Thread() { + public void run() { + synchronized (GRPCDriverService.this) { + if (GRPCDriverService.this.driverProcess != null) { + GRPCDriverService.this.driverProcess.destroy(); + GRPCDriverService.this.driverProcess = null; + } + } + } + }; + Runtime.getRuntime().addShutdownHook(closeChildThread); + } + } + } + + private void stop() { + stop(null); + } + + private void stop(final Throwable t) { + if (!stopped) { + try { + if (t != null) { + clock.stop(t); + } else { + clock.stop(); + } + if (server != null) { + LOG.log(Level.INFO, "Shutdown gRPC"); + this.server.shutdown(); + this.server = null; + } + if (this.driverProcess != null) { + LOG.log(Level.INFO, "Shutdown driver process"); + this.driverProcess.destroy(); + this.driverProcess = null; + } + } finally { + stopped = true; + } + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Determines if the driver process is still alive by + * testing for its exit value, which throws {@link IllegalThreadStateException} + * if process is still running. + * @return true if driver process is alive, false otherwise + */ + private boolean driverProcessIsAlive() { + if (this.driverProcess != null) { + try { + this.driverProcess.exitValue(); + } catch (IllegalThreadStateException e) { + return true; + } + } + return false; + } + + private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) { + if (descriptor == null) { + return null; + } else { + return EvaluatorDescriptorInfo.newBuilder() + .setCores(descriptor.getNumberOfCores()) + .setMemory(descriptor.getMemory()) + .setRuntimeName(descriptor.getRuntimeName()) + .build(); + } + } + + @Override + public IdleMessage getIdleStatus() { + final String componentName = "Java Bridge DriverService"; + if (this.clientStub != null) { + try { + final IdleStatus idleStatus = this.clientStub.idlenessCheckHandler(VOID).get(); + LOG.log(Level.INFO, "is idle: " + idleStatus.getIsIdle()); + return new IdleMessage( + componentName, + idleStatus.getReason(), + idleStatus.getIsIdle()); + } catch (ExecutionException | InterruptedException e) { + stop(e); + } + } + return new IdleMessage( + componentName, + "stub not initialized", + true); + } + + @Override + public void startHandler(final StartTime startTime) { + try { + start(); + synchronized (this) { + if (this.clientStub != null) { + this.clientStub.startHandler( + StartTimeInfo.newBuilder().setStartTime(startTime.getTimestamp()).build()); + } else { + stop(new IllegalStateException("Unable to start driver client")); + } + } + } catch (IOException | InterruptedException e) { + stop(e); + } + } + + @Override + public void stopHandler(final StopTime stopTime) { + synchronized (this) { + try { + if (clientStub != null) { + this.clientStub.stopHandler( + StopTimeInfo.newBuilder().setStopTime(stopTime.getTimestamp()).build()); + } + } finally { + stop(); + } + } + + } + + @Override + public void allocatedEvaluatorHandler(final AllocatedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.put(eval.getId(), eval); + this.clientStub.allocatedEvaluatorHandler( + EvaluatorInfo.newBuilder() + .setEvaluatorId(eval.getId()) + .setDescriptorInfo(toEvaluatorDescriptorInfo(eval.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void completedEvaluatorHandler(final CompletedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.completedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); + } + } + + @Override + public void failedEvaluatorHandler(final FailedEvaluator eval) { + synchronized (this) { + this.allocatedEvaluatorMap.remove(eval.getId()); + this.clientStub.failedEvaluatorHandler( + EvaluatorInfo.newBuilder().setEvaluatorId(eval.getId()).build()); + } + } + + @Override + public void activeContextHandler(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.activeContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentId().isPresent() ? + context.getParentId().get() : null) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void closedContextHandler(final ClosedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentContext().getId()) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void failedContextHandler(final FailedContext context) { + synchronized (this) { + this.activeContextMap.remove(context.getId()); + this.clientStub.closedContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentContext().isPresent() ? + context.getParentContext().get().getId() : null) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void contextMessageHandler(final ContextMessage message) { + synchronized (this) { + this.clientStub.contextMessageHandler( + ContextMessageInfo.newBuilder() + .setContextId(message.getId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void runningTaskHandler(final RunningTask task) { + synchronized (this) { + final ActiveContext context = task.getActiveContext(); + if (!this.activeContextMap.containsKey(context.getId())) { + this.activeContextMap.put(context.getId(), context); + } + this.runningTaskMap.put(task.getId(), task); + this.clientStub.runningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + task.getActiveContext().getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void failedTaskHandler(final FailedTask task) { + synchronized (this) { + if (task.getActiveContext().isPresent() && + !this.activeContextMap.containsKey(task.getActiveContext().get().getId())) { + this.activeContextMap.put(task.getActiveContext().get().getId(), task.getActiveContext().get()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.failedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(task.getActiveContext().isPresent() ? + ContextInfo.newBuilder() + .setContextId(task.getActiveContext().get().getId()) + .setEvaluatorId(task.getActiveContext().get().getEvaluatorId()) + .setParentId(task.getActiveContext().get().getParentId().isPresent() ? + task.getActiveContext().get().getParentId().get() : "") + .build() : null) + .build()); + } + } + + @Override + public void completedTaskHandler(final CompletedTask task) { + synchronized (this) { + if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) { + this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.completedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().getId()) + .setEvaluatorId(task.getActiveContext().getEvaluatorId()) + .setParentId(task.getActiveContext().getParentId().isPresent() ? + task.getActiveContext().getParentId().get() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + task.getActiveContext().getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void suspendedTaskHandler(final SuspendedTask task) { + synchronized (this) { + if (!this.activeContextMap.containsKey(task.getActiveContext().getId())) { + this.activeContextMap.put(task.getActiveContext().getId(), task.getActiveContext()); + } + this.runningTaskMap.remove(task.getId()); + this.clientStub.suspendedTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(task.getActiveContext().getId()) + .setEvaluatorId(task.getActiveContext().getEvaluatorId()) + .setParentId(task.getActiveContext().getParentId().isPresent() ? + task.getActiveContext().getParentId().get() : "") + .build()) + .build()); + } + } + + @Override + public void taskMessageHandler(final TaskMessage message) { + synchronized (this) { + this.clientStub.taskMessageHandler( + TaskMessageInfo.newBuilder() + .setTaskId(message.getId()) + .setContextId(message.getContextId()) + .setMessageSourceId(message.getMessageSourceID()) + .setSequenceNumber(message.getSequenceNumber()) + .setPayload(ByteString.copyFrom(message.get())) + .build()); + } + } + + @Override + public void clientMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + @Override + public void clientCloseHandler() { + synchronized (this) { + this.clientStub.clientCloseHandler(VOID); + } + } + + @Override + public void clientCloseWithMessageHandler(final byte[] message) { + synchronized (this) { + this.clientStub.clientCloseWithMessageHandler( + ClientMessageInfo.newBuilder() + .setPayload(ByteString.copyFrom(message)) + .build()); + } + } + + @Override + public void driverRestarted(final DriverRestarted restart) { + try { + start(); + synchronized (this) { + if (this.clientStub != null) { + this.clientStub.driverRestartHandler(DriverRestartInfo.newBuilder() + .setResubmissionAttempts(restart.getResubmissionAttempts()) + .setStartTime(StartTimeInfo.newBuilder() + .setStartTime(restart.getStartTime().getTimestamp()).build()) + .addAllExpectedEvaluatorIds(restart.getExpectedEvaluatorIds()) + .build()); + } else { + stop(new DriverClientException("Failed to restart driver client")); + } + } + } catch (InterruptedException | IOException e) { + stop(e); + } + } + + @Override + public void restartRunningTask(final RunningTask task) { + synchronized (this) { + final ActiveContext context = task.getActiveContext(); + if (!this.activeContextMap.containsKey(context.getId())) { + this.activeContextMap.put(context.getId(), context); + } + this.runningTaskMap.put(task.getId(), task); + this.clientStub.driverRestartRunningTaskHandler( + TaskInfo.newBuilder() + .setTaskId(task.getId()) + .setContext(ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId(context.getParentId().isPresent() ? context.getParentId().get() : "") + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(context.getEvaluatorDescriptor())) + .build()) + .build()); + } + } + + @Override + public void restartActiveContext(final ActiveContext context) { + synchronized (this) { + this.activeContextMap.put(context.getId(), context); + this.clientStub.driverRestartActiveContextHandler( + ContextInfo.newBuilder() + .setContextId(context.getId()) + .setEvaluatorId(context.getEvaluatorId()) + .setParentId( + context.getParentId().isPresent() ? + context.getParentId().get() : null) + .setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( + context.getEvaluatorDescriptor())) + .build()); + } + } + + @Override + public void driverRestartCompleted(final DriverRestartCompleted restartCompleted) { + synchronized (this) { + this.clientStub.driverRestartCompletedHandler(DriverRestartCompletedInfo.newBuilder() + .setCompletionTime(StopTimeInfo.newBuilder() + .setStopTime(restartCompleted.getCompletedTime().getTimestamp()).build()) + .setIsTimedOut(restartCompleted.isTimedOut()) + .build()); + } + } + + @Override + public void restartFailedEvalautor(final FailedEvaluator evaluator) { + synchronized (this) { + this.clientStub.driverRestartFailedEvaluatorHandler(EvaluatorInfo.newBuilder() + .setEvaluatorId(evaluator.getId()) + .setFailure(EvaluatorInfo.FailureInfo.newBuilder() + .setMessage(evaluator.getEvaluatorException() != null ? + evaluator.getEvaluatorException().getMessage() : "unknown failure during restart") + .build()) + .build()); + } + } + + private final class DriverBridgeServiceImpl + extends DriverServiceGrpc.DriverServiceImplBase { + + @Override + public void registerDriverClient( + final DriverClientRegistration request, + final StreamObserver<Void> responseObserver) { + try { + final ManagedChannel channel = ManagedChannelBuilder + .forAddress(request.getHost(), request.getPort()) + .usePlaintext(true) + .build(); + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel); + GRPCDriverService.this.notifyAll(); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void requestResources( + final ResourceRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + EvaluatorRequest.Builder requestBuilder = GRPCDriverService.this.evaluatorRequestor.newRequest(); + requestBuilder.setNumber(request.getResourceCount()); + requestBuilder.setNumberOfCores(request.getCores()); + requestBuilder.setMemory(request.getMemorySize()); + requestBuilder.setRelaxLocality(request.getRelaxLocality()); + requestBuilder.setRuntimeName(request.getRuntimeName()); + if (request.getNodeNameListCount() > 0) { + requestBuilder.addNodeNames(request.getNodeNameListList()); + } + if (request.getRackNameListCount() > 0) { + for (final String rackName : request.getRackNameListList()) { + requestBuilder.addRackName(rackName); + } + } + GRPCDriverService.this.evaluatorRequestor.submit(requestBuilder.build()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void shutdown( + final ShutdownRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + if (request.getException() != null) { + GRPCDriverService.this.clock.stop( + new DriverClientException(request.getException().getMessage())); + } else { + GRPCDriverService.this.clock.stop(); + } + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void setAlarm( + final AlarmRequest request, + final StreamObserver<Void> responseObserver) { + try { + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clock.scheduleAlarm(request.getTimeoutMs(), new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + synchronized (GRPCDriverService.this) { + GRPCDriverService.this.clientStub.alarmTrigger( + AlarmTriggerInfo.newBuilder().setAlarmId(request.getAlarmId()).build()); + } + } + }); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void allocatedEvaluatorOp( + final AllocatedEvaluatorRequest request, + final StreamObserver<Void> responseObserver) { + try { + if (request.getEvaluatorConfiguration() == null) { + responseObserver.onError(Status.INTERNAL + .withDescription("Evaluator configuration required") + .asRuntimeException()); + } else if (request.getContextConfiguration() == null && request.getTaskConfiguration() == null) { + responseObserver.onError(Status.INTERNAL + .withDescription("Context and/or Task configuration required") + .asRuntimeException()); + } else { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.allocatedEvaluatorMap.containsKey(request.getEvaluatorId())) { + responseObserver.onError(Status.INTERNAL + .withDescription("Unknown allocated evaluator " + request.getEvaluatorId()) + .asRuntimeException()); + } + final AllocatedEvaluator evaluator = + GRPCDriverService.this.allocatedEvaluatorMap.get(request.getEvaluatorId()); + if (request.getCloseEvaluator()) { + evaluator.close(); + } else { + if (request.getAddFilesCount() > 0) { + for (final String file : request.getAddFilesList()) { + evaluator.addFile(new File(file)); + } + } + if (request.getAddLibrariesCount() > 0) { + for (final String library : request.getAddLibrariesList()) { + evaluator.addLibrary(new File(library)); + } + } + if (request.getSetProcess() != null) { + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest = + request.getSetProcess(); + switch (evaluator.getEvaluatorDescriptor().getProcess().getType()) { + case JVM: + setJVMProcess(evaluator, processRequest); + break; + case CLR: + setCLRProcess(evaluator, processRequest); + break; + default: + throw new RuntimeException("Unknown evaluator process type"); + } + } + if (StringUtils.isEmpty(request.getEvaluatorConfiguration())) { + // Assume that we are running Java driver client, but this assumption could be a bug so log a warning + LOG.log(Level.WARNING, "No evaluator configuration detected. Assuming a Java driver client."); + if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) { + // submit context and task + Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); + Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); + try { + evaluator.submitContextAndTask( + configurationSerializer.fromString(request.getContextConfiguration()), + configurationSerializer.fromString(request.getTaskConfiguration())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (request.getContextConfiguration() != null) { + // submit context + Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); + try { + evaluator.submitContext(configurationSerializer.fromString(request.getContextConfiguration())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (request.getTaskConfiguration() != null) { + // submit task + Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); + try { + evaluator.submitTask(configurationSerializer.fromString(request.getTaskConfiguration())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Missing check for required evaluator configurations"); + } + } else { + if (request.getContextConfiguration() != null && request.getTaskConfiguration() != null) { + // submit context and task + Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); + Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); + Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); + ((AllocatedEvaluatorImpl) evaluator).submitContextAndTask( + request.getEvaluatorConfiguration(), + request.getContextConfiguration(), + request.getTaskConfiguration()); + } else if (request.getContextConfiguration() != null) { + // submit context + Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); + Validate.notEmpty(request.getContextConfiguration(), "Context configuration not set"); + ((AllocatedEvaluatorImpl) evaluator).submitContext( + request.getEvaluatorConfiguration(), + request.getContextConfiguration()); + } else if (request.getTaskConfiguration() != null) { + // submit task + Validate.notEmpty(request.getEvaluatorConfiguration(), "Evaluator configuration not set"); + Validate.notEmpty(request.getTaskConfiguration(), "Task configuration not set"); + ((AllocatedEvaluatorImpl) evaluator).submitTask( + request.getEvaluatorConfiguration(), + request.getTaskConfiguration()); + } else { + throw new RuntimeException("Missing check for required evaluator configurations"); + } + } + } + } + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + + @Override + public void activeContextOp( + final ActiveContextRequest request, + final StreamObserver<Void> responseObserver) { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) { + LOG.log(Level.SEVERE, "Context does not exist with id " + request.getContextId()); + responseObserver.onError(Status.INTERNAL + .withDescription("Context does not exist with id " + request.getContextId()) + .asRuntimeException()); + } else { + final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId()); + if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) { + if (request.getCloseContext()) { + try { + LOG.log(Level.INFO, "closing context " + context.getId()); + context.close(); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + LOG.log(Level.SEVERE, "Close context operation not set to true"); + responseObserver.onError(Status.INTERNAL + .withDescription("Close context operation not set to true") + .asRuntimeException()); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.MESSAGE) { + if (request.getMessage() != null) { + try { + LOG.log(Level.INFO, "send message to context " + context.getId()); + context.sendMessage(request.getMessage().toByteArray()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else { + responseObserver.onError(Status.INTERNAL + .withDescription("Empty message on operation send message").asRuntimeException()); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_CONTEXT_REQUEST) { + try { + LOG.log(Level.INFO, "submitting child context to context " + context.getId()); + ((EvaluatorContext) context).submitContext(request.getNewContextRequest()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } else if (request.getOperationCase() == ActiveContextRequest.OperationCase.NEW_TASK_REQUEST) { + try { + LOG.log(Level.INFO, "submitting task to context " + context.getId()); + ((EvaluatorContext) context).submitTask(request.getNewTaskRequest()); + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + } + } + } + + @Override + public void runningTaskOp( + final RunningTaskRequest request, + final StreamObserver<Void> responseObserver) { + synchronized (GRPCDriverService.this) { + if (!GRPCDriverService.this.runningTaskMap.containsKey(request.getTaskId())) { + responseObserver.onError(Status.INTERNAL + .withDescription("Task does not exist with id " + request.getTaskId()).asRuntimeException()); + } + try { + final RunningTask task = GRPCDriverService.this.runningTaskMap.get(request.getTaskId()); + if (request.getCloseTask()) { + if (request.getMessage() != null) { + task.close(request.getMessage().toByteArray()); + } else { + task.close(); + } + } else if (request.getMessage() != null) { + task.send(request.getMessage().toByteArray()); + } + } finally { + responseObserver.onNext(null); + responseObserver.onCompleted(); + } + } + } + + private void setCLRProcess( + final AllocatedEvaluator evaluator, + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { + final CLRProcess process = GRPCDriverService.this.clrProcessFactory.newEvaluatorProcess(); + if (processRequest.getMemoryMb() > 0) { + process.setMemory(processRequest.getMemoryMb()); + } + if (processRequest.getConfigurationFileName() != null) { + process.setConfigurationFileName(processRequest.getConfigurationFileName()); + } + if (processRequest.getStandardOut() != null) { + process.setStandardOut(processRequest.getStandardOut()); + } + if (processRequest.getStandardErr() != null) { + process.setStandardErr(processRequest.getStandardErr()); + } + evaluator.setProcess(process); + } + + private void setJVMProcess( + final AllocatedEvaluator evaluator, + final AllocatedEvaluatorRequest.EvaluatorProcessRequest processRequest) { + final JVMProcess process = GRPCDriverService.this.jvmProcessFactory.newEvaluatorProcess(); + if (processRequest.getMemoryMb() > 0) { + process.setMemory(processRequest.getMemoryMb()); + } + if (processRequest.getConfigurationFileName() != null) { + process.setConfigurationFileName(processRequest.getConfigurationFileName()); + } + if (processRequest.getStandardOut() != null) { + process.setStandardOut(processRequest.getStandardOut()); + } + if (processRequest.getStandardErr() != null) { + process.setStandardErr(processRequest.getStandardErr()); + } + if (processRequest.getOptionsCount() > 0) { + for (final String option : processRequest.getOptionsList()) { + process.addOption(option); + } + } + evaluator.setProcess(process); + } + } +}
http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java new file mode 100644 index 0000000..f0334c5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.service.grpc; + +import org.apache.reef.bridge.driver.service.DriverServiceConfiguration; +import org.apache.reef.bridge.driver.service.DriverServiceConfigurationProviderBase; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; + +import javax.inject.Inject; +import java.util.logging.Logger; + +/** + * GRPC driver service configuration provider. + */ +public final class GRPCDriverServiceConfigurationProvider extends DriverServiceConfigurationProviderBase { + + private static final Logger LOG = Logger.getLogger(GRPCDriverServiceConfigurationProvider.class.getName()); + + @Inject + private GRPCDriverServiceConfigurationProvider() { + } + + @Override + public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) { + Configuration driverServiceConfiguration = DriverServiceConfiguration.CONF + .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class) + .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, driverConfiguration.getDriverClientLaunchCommand()) + .build(); + return Configurations.merge( + driverServiceConfiguration, + getDriverConfiguration(driverConfiguration), + getTcpPortRangeConfiguration(driverConfiguration)); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java new file mode 100644 index 0000000..a94328d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.service.grpc; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java new file mode 100644 index 0000000..9b58cb0 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * The Java-side of the CLR/Java bridge interop via gRPC/Protocol Buffers. + */ +package org.apache.reef.bridge.driver.service; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java new file mode 100644 index 0000000..255f60d --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/DriverClientCommand.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.service.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * What command to use when starting bridge process. + */ +@NamedParameter(doc = "The command to launch bridge driver process", + short_name = "command") +public final class DriverClientCommand implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java new file mode 100644 index 0000000..6a3b956 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Driver bridge service parameters. + */ +package org.apache.reef.bridge.service.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java deleted file mode 100644 index dac1200..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/WindowsRuntimePathProvider.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.examples; - -import org.apache.reef.runtime.common.files.RuntimePathProvider; - -import javax.inject.Inject; -/** - * Supplies the java binary's path for HDInsight. - */ -public final class WindowsRuntimePathProvider implements RuntimePathProvider { - - @Inject - public WindowsRuntimePathProvider() { - } - - @Override - public String getPath() { - return "java"; - } - - @Override - public String toString() { - return getPath(); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java index 020a0eb..79b2ee0 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloDriver.java @@ -20,6 +20,8 @@ package org.apache.reef.bridge.examples.hello; import org.apache.reef.driver.evaluator.AllocatedEvaluator; import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.task.CompletedTask; +import org.apache.reef.driver.task.FailedTask; import org.apache.reef.driver.task.TaskConfiguration; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.annotations.Unit; @@ -80,4 +82,26 @@ public final class HelloDriver { allocatedEvaluator.submitTask(taskConfiguration); } } + + /** + * bla bla. + */ + public final class CompletedTaskHandler implements EventHandler<CompletedTask> { + @Override + public void onNext(final CompletedTask value) { + LOG.log(Level.INFO, "Completed task {0}", value.getId()); + value.getActiveContext().close(); + } + } + + /** + * bla bla. + */ + public final class FailedTaskHandler implements EventHandler<FailedTask> { + @Override + public void onNext(final FailedTask value) { + LOG.log(Level.INFO, "Failed task {0}", value.getId()); + value.getActiveContext().get().close(); + } + } } http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java index 928828c..e80cf1e 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/examples/hello/HelloREEF.java @@ -18,10 +18,9 @@ */ package org.apache.reef.bridge.examples.hello; -import org.apache.reef.bridge.client.DriverClientConfiguration; +import org.apache.reef.bridge.driver.client.DriverClientConfiguration; import org.apache.reef.bridge.proto.ClientProtocol; -import org.apache.reef.bridge.service.DriverServiceLauncher; -import org.apache.reef.client.LauncherStatus; +import org.apache.reef.bridge.client.DriverServiceLauncher; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.util.EnvironmentUtils; @@ -43,6 +42,8 @@ public final class HelloREEF { DriverClientConfiguration.CONF .set(DriverClientConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class) .set(DriverClientConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class) + .set(DriverClientConfiguration.ON_TASK_COMPLETED, HelloDriver.CompletedTaskHandler.class) + .set(DriverClientConfiguration.ON_TASK_FAILED, HelloDriver.FailedTaskHandler.class) .build(); /** @@ -58,15 +59,10 @@ public final class HelloREEF { builder.setLocalRuntime(ClientProtocol.LocalRuntimeParameters.newBuilder() .setMaxNumberOfEvaluators(1) .build()); - builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.START); - builder.addHandler(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED); builder.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class)); - final LauncherStatus status = - DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG); - - LOG.log(Level.INFO, "REEF job completed: {0}", status); - + DriverServiceLauncher.submit(builder.build(), DRIVER_CONFIG); + LOG.log(Level.INFO, "REEF job completed"); ThreadLogger.logThreads(LOG, Level.FINE, "Threads running at the end of HelloREEF:"); } http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java deleted file mode 100644 index 4f83fdb..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverClientException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.service; - -/** - * An exception thrown by the driver client. - */ -public final class DriverClientException extends Exception { - - public DriverClientException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java deleted file mode 100644 index e3f74c8..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceConfiguration.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.service; - -import org.apache.reef.annotations.audience.Private; -import org.apache.reef.bridge.service.parameters.DriverClientCommand; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.driver.parameters.DriverIdleSources; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationModuleBuilder; -import org.apache.reef.tang.formats.RequiredImpl; -import org.apache.reef.tang.formats.RequiredParameter; - -/** - * Binds all driver bridge service handlers to the driver. - */ -@Private -public final class DriverServiceConfiguration extends ConfigurationModuleBuilder { - - public static final RequiredImpl<IDriverService> DRIVER_SERVICE_IMPL = new RequiredImpl<>(); - - public static final RequiredParameter<String> DRIVER_CLIENT_COMMAND = new RequiredParameter<>(); - - /** Configuration module that binds all driver handlers. */ - public static final ConfigurationModule CONF = new DriverServiceConfiguration() - .merge(DriverConfiguration.CONF) - .bindImplementation(IDriverService.class, DRIVER_SERVICE_IMPL) - .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND) - .bindSetEntry(DriverIdleSources.class, IDriverService.class) - .build(); -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java deleted file mode 100644 index cca2436..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceHandlers.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.service; - -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.context.ClosedContext; -import org.apache.reef.driver.context.ContextMessage; -import org.apache.reef.driver.context.FailedContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.CompletedEvaluator; -import org.apache.reef.driver.evaluator.FailedEvaluator; -import org.apache.reef.driver.task.*; -import org.apache.reef.tang.annotations.Unit; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.time.event.StartTime; -import org.apache.reef.wake.time.event.StopTime; - -import javax.inject.Inject; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Contains Java side event handlers that perform - * hand-off with the driver client side. - */ -@Unit -public final class DriverServiceHandlers { - - private static final Logger LOG = Logger.getLogger(DriverServiceHandlers.class.getName()); - - private final IDriverService driverBridgeService; - - @Inject - private DriverServiceHandlers( - final IDriverService driverBridgeService) { - this.driverBridgeService = driverBridgeService; - } - - /** - * Job Driver is ready and the clock is set up: request the evaluators. - */ - final class StartHandler implements EventHandler<StartTime> { - @Override - public void onNext(final StartTime startTime) { - LOG.log(Level.INFO, "JavaBridge: Start Driver"); - DriverServiceHandlers.this.driverBridgeService.startHandler(startTime); - } - } - - /** - * Job Driver is is shutting down: write to the log. - */ - final class StopHandler implements EventHandler<StopTime> { - @Override - public void onNext(final StopTime stopTime) { - LOG.log(Level.INFO, "JavaBridge: Stop Driver"); - DriverServiceHandlers.this.driverBridgeService.stopHandler(stopTime); - } - } - - /** - * Receive notification that an Evaluator had been allocated, - * and submitTask a new Task in that Evaluator. - */ - final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> { - @Override - public void onNext(final AllocatedEvaluator eval) { - LOG.log(Level.INFO, "JavaBridge: Allocated Evaluator {0}", eval.getId()); - DriverServiceHandlers.this.driverBridgeService.allocatedEvaluatorHandler(eval); - } - } - - final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> { - @Override - public void onNext(final CompletedEvaluator eval) { - LOG.log(Level.INFO, "JavaBridge: Completed Evaluator {0}", eval.getId()); - DriverServiceHandlers.this.driverBridgeService.completedEvaluatorHandler(eval); - } - } - - final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { - @Override - public void onNext(final FailedEvaluator eval) { - LOG.log(Level.INFO, "JavaBridge: Failed Evaluator {0}", eval.getId()); - DriverServiceHandlers.this.driverBridgeService.failedEvaluatorHandler(eval); - } - } - - /** - * Receive notification that the Context is active. - */ - final class ActiveContextHandler implements EventHandler<ActiveContext> { - @Override - public void onNext(final ActiveContext context) { - LOG.log(Level.INFO, "JavaBridge: Active Context {0}", context.getId()); - DriverServiceHandlers.this.driverBridgeService.activeContextHandler(context); - } - } - - /** - * Received notification that the Context is closed. - */ - final class ClosedContextHandler implements EventHandler<ClosedContext> { - @Override - public void onNext(final ClosedContext context) { - LOG.log(Level.INFO, "JavaBridge: Closed Context {0}", context.getId()); - DriverServiceHandlers.this.driverBridgeService.closedContextHandler(context); - } - } - - /** - * Received a message from the context. - */ - final class ContextMessageHandler implements EventHandler<ContextMessage> { - @Override - public void onNext(final ContextMessage message) { - LOG.log(Level.INFO, "JavaBridge: Context Message id {0}", message.getId()); - DriverServiceHandlers.this.driverBridgeService.contextMessageHandler(message); - } - } - - /** - * Received notification that the Context failed. - */ - final class ContextFailedHandler implements EventHandler<FailedContext> { - @Override - public void onNext(final FailedContext context) { - LOG.log(Level.INFO, "JavaBridge: Context Failed {0}", context.getId()); - DriverServiceHandlers.this.driverBridgeService.failedContextHandler(context); - } - } - - /** - * Receive notification that the Task is running. - */ - final class RunningTaskHandler implements EventHandler<RunningTask> { - @Override - public void onNext(final RunningTask task) { - LOG.log(Level.INFO, "JavaBridge: Running Task {0}", task.getId()); - DriverServiceHandlers.this.driverBridgeService.runningTaskHandler(task); - } - } - - /** - * Received notification that the Task failed. - */ - final class FailedTaskHandler implements EventHandler<FailedTask> { - @Override - public void onNext(final FailedTask task) { - LOG.log(Level.INFO, "JavaBridge: Failed Task {0}", task.getId()); - DriverServiceHandlers.this.driverBridgeService.failedTaskHandler(task); - } - } - - /** - * Receive notification that the Task has completed successfully. - */ - final class CompletedTaskHandler implements EventHandler<CompletedTask> { - @Override - public void onNext(final CompletedTask task) { - LOG.log(Level.INFO, "JavaBridge: Completed Task {0}", task.getId()); - DriverServiceHandlers.this.driverBridgeService.completedTaskHandler(task); - } - } - - /** - * Received notification that the Task was suspended. - */ - final class SuspendedTaskHandler implements EventHandler<SuspendedTask> { - @Override - public void onNext(final SuspendedTask task) { - LOG.log(Level.INFO, "JavaBridge: Suspended Task {0}", task.getId()); - DriverServiceHandlers.this.driverBridgeService.suspendedTaskHandler(task); - } - } - - /** - * Received a message from the task. - */ - final class TaskMessageHandler implements EventHandler<TaskMessage> { - @Override - public void onNext(final TaskMessage message) { - LOG.log(Level.INFO, "JavaBridge: Message from Task {0}", message.getId()); - DriverServiceHandlers.this.driverBridgeService.taskMessageHandler(message); - } - } - - /** - * Received a message from the client. - */ - final class ClientMessageHandler implements EventHandler<byte[]> { - @Override - public void onNext(final byte[] message) { - LOG.log(Level.INFO, "JavaBridge: Message from Client"); - DriverServiceHandlers.this.driverBridgeService.clientMessageHandler(message); - } - } - - /** - * Received a close event from the client. - */ - final class ClientCloseHandler implements EventHandler<Void> { - @Override - public void onNext(final Void value) { - LOG.log(Level.INFO, "JavaBridge: Close event from Client"); - DriverServiceHandlers.this.driverBridgeService.clientCloseHandler(); - } - } - - /** - * Received a close event with message. - */ - final class ClientCloseWithMessageHandler implements EventHandler<byte[]> { - @Override - public void onNext(final byte[] message) { - LOG.log(Level.INFO, "JavaBridge: Close event with messages from Client"); - DriverServiceHandlers.this.driverBridgeService.clientCloseWithMessageHandler(message); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/d243aa2a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java deleted file mode 100644 index fbafff9..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/service/DriverServiceLauncher.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.bridge.service; - -import com.google.protobuf.util.JsonFormat; -import org.apache.commons.lang.StringUtils; -import org.apache.reef.bridge.client.JavaDriverClientLauncher; -import org.apache.reef.bridge.examples.WindowsRuntimePathProvider; -import org.apache.reef.bridge.proto.ClientProtocol; -import org.apache.reef.bridge.service.grpc.GRPCDriverService; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.LauncherStatus; -import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.io.TcpPortConfigurationProvider; -import org.apache.reef.runtime.common.files.ClasspathProvider; -import org.apache.reef.runtime.common.files.REEFFileNames; -import org.apache.reef.runtime.common.files.RuntimePathProvider; -import org.apache.reef.runtime.common.files.UnixJVMPathProvider; -import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; -import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; -import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; -import org.apache.reef.tang.*; -import org.apache.reef.tang.exceptions.BindException; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.tang.formats.ConfigurationSerializer; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.util.OSUtils; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Driver Service Launcher - main class. - */ -public final class DriverServiceLauncher { - - /** - * Standard Java logger. - */ - private static final Logger LOG = Logger.getLogger(DriverServiceLauncher.class.getName()); - - /** - * This class should not be instantiated. - */ - private DriverServiceLauncher() { - throw new RuntimeException("Do not instantiate this class!"); - } - - /** - * Parse command line arguments and create TANG configuration ready to be submitted to REEF. - * - * @param driverClientConfigurationProto containing which runtime to configure: local, yarn, azbatch - * @return (immutable) TANG Configuration object. - * @throws BindException if configuration commandLineInjector fails. - * @throws InjectionException if configuration commandLineInjector fails. - */ - private static Configuration getRuntimeConfiguration( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) - throws BindException { - switch (driverClientConfigurationProto.getRuntimeCase()) { - case LOCAL_RUNTIME: - return getLocalRuntimeConfiguration(driverClientConfigurationProto); - case YARN_RUNTIME: - return getYarnRuntimeConfiguration(driverClientConfigurationProto); - default: - throw new IllegalArgumentException("Unsupported runtime " + driverClientConfigurationProto.getRuntimeCase()); - } - } - - private static Configuration getLocalRuntimeConfiguration( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) - throws BindException { - LOG.log(Level.FINE, "JavaBridge: Running on the local runtime"); - return LocalRuntimeConfiguration.CONF - .build(); - } - - private static Configuration getYarnRuntimeConfiguration( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) - throws BindException { - LOG.log(Level.FINE, "JavaBridge: Running on YARN"); - return YarnClientConfiguration.CONF.build(); - } - - private static Configuration getDriverServiceConfiguration( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { - // Set required parameters - ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.CONF - .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class) - .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, - driverClientConfigurationProto.getDriverClientLaunchCommand()) - .set(DriverConfiguration.DRIVER_IDENTIFIER, driverClientConfigurationProto.getJobid()); - - // Set file dependencies - final List<String> localLibraries = new ArrayList<>(); - localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); - if (driverClientConfigurationProto.getLocalLibrariesCount() > 0) { - localLibraries.addAll(driverClientConfigurationProto.getLocalLibrariesList()); - } - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries); - if (driverClientConfigurationProto.getGlobalLibrariesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, - driverClientConfigurationProto.getGlobalLibrariesList()); - } - if (driverClientConfigurationProto.getLocalFilesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.LOCAL_FILES, - driverClientConfigurationProto.getLocalFilesList()); - } - if (driverClientConfigurationProto.getGlobalFilesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.GLOBAL_FILES, - driverClientConfigurationProto.getGlobalFilesList()); - } - // Setup driver resources - if (driverClientConfigurationProto.getCpuCores() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.DRIVER_CPU_CORES, driverClientConfigurationProto.getCpuCores()); - } - if (driverClientConfigurationProto.getMemoryMb() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.DRIVER_MEMORY, driverClientConfigurationProto.getMemoryMb()); - } - - // Setup handlers - final Set<ClientProtocol.DriverClientConfiguration.Handlers> handlerLabelSet = new HashSet<>(); - handlerLabelSet.addAll(driverClientConfigurationProto.getHandlerList()); - if (!handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.START.START)) { - throw new IllegalArgumentException("Start handler required"); - } else { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class) - .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_ALLOCATED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_COMPLETED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.EVALUATOR_FAILED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_ACTIVE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_CLOSED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_FAILED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CONTEXT_MESSAGE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_RUNNING)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_COMPLETED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_FAILED)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.TASK_MESSAGE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_MESSAGE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class); - } - if (handlerLabelSet.contains(ClientProtocol.DriverClientConfiguration.Handlers.CLIENT_CLOSE_WITH_MESSAGE)) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class); - } - - return setTcpPortRange(driverClientConfigurationProto, driverServiceConfigurationModule.build()); - } - - private static Configuration setTcpPortRange( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, - final Configuration driverServiceConfiguration) { - JavaConfigurationBuilder configurationModuleBuilder = - Tang.Factory.getTang().newConfigurationBuilder(driverServiceConfiguration) - .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class); - // Setup TCP constraints - if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeBegin.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin())); - } - if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeCount.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount())); - } - if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeCount.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount())); - } - return configurationModuleBuilder.build(); - } - - public static LauncherStatus submit( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto, - final Configuration driverClientConfiguration) - throws InjectionException, IOException { - ClientProtocol.DriverClientConfiguration.Builder builder = - ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto); - final File driverClientConfigurationFile = new File("driverclient.conf"); - try { - // Write driver client configuration to a file - final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration); - final ConfigurationSerializer configurationSerializer = - driverClientInjector.getInstance(ConfigurationSerializer.class); - configurationSerializer.toFile(driverClientConfiguration, driverClientConfigurationFile); - - // Get runtime injector and piece together the launch command based on its classpath info - final Configuration runtimeConfiguration = getRuntimeConfiguration(driverClientConfigurationProto); - // Resolve OS Runtime Path Provider - final Configuration runtimeOSConfiguration = Configurations.merge( - Tang.Factory.getTang().newConfigurationBuilder() - .bind(RuntimePathProvider.class, - OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class) - .build(), - runtimeConfiguration); - final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration); - final REEFFileNames fileNames = runtimeInjector.getInstance(REEFFileNames.class); - final ClasspathProvider classpathProvider = runtimeInjector.getInstance(ClasspathProvider.class); - final RuntimePathProvider runtimePathProvider = runtimeInjector.getInstance(RuntimePathProvider.class); - final List<String> launchCommand = new JavaLaunchCommandBuilder(JavaDriverClientLauncher.class, null) - .setConfigurationFilePaths( - Collections.singletonList("./" + fileNames.getLocalFolderPath() + "/" + - driverClientConfigurationFile.getName())) - .setJavaPath(runtimePathProvider.getPath()) - .setClassPath(classpathProvider.getEvaluatorClasspath()) - .build(); - final String cmd = StringUtils.join(launchCommand, ' '); - builder.setDriverClientLaunchCommand(cmd); - builder.addLocalFiles(driverClientConfigurationFile.getAbsolutePath()); - - - - // Configure driver service and launch the job - final Configuration driverServiceConfiguration = getDriverServiceConfiguration(builder.build()); - return DriverLauncher.getLauncher(runtimeOSConfiguration).run(driverServiceConfiguration); - } finally { - driverClientConfigurationFile.delete(); - } - } - - /** - * Main method that launches the REEF job. - * - * @param args command line parameters. - */ - public static void main(final String[] args) { - try { - if (args.length != 1) { - LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + - " accepts single argument referencing a file that contains a client protocol buffer driver configuration"); - } - final String content; - try { - content = new String(Files.readAllBytes(Paths.get(args[0]))); - } catch (IOException e) { - throw new RuntimeException(e); - } - final ClientProtocol.DriverClientConfiguration.Builder driverClientConfigurationProtoBuilder = - ClientProtocol.DriverClientConfiguration.newBuilder(); - JsonFormat.parser() - .usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) - .merge(content, driverClientConfigurationProtoBuilder); - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = - driverClientConfigurationProtoBuilder.build(); - - final Configuration runtimeConfig = getRuntimeConfiguration(driverClientConfigurationProto); - final Configuration driverConfig = getDriverServiceConfiguration(driverClientConfigurationProto); - DriverLauncher.getLauncher(runtimeConfig).run(driverConfig); - LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid()); - } catch (final BindException | InjectionException | IOException ex) { - LOG.log(Level.SEVERE, "Job configuration error", ex); - } - } -}
