[REEF-2002] Create Java project for gRPC two process bridge This addressed the issue by * Adding gRPC bridge implementation * Implemeting event handlers and objects that pass between application and core driver.
JIRA: [REEF-2002](https://issues.apache.org/jira/browse/REEF-2002) Pull request: This closes #1447 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ea249f7f Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ea249f7f Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ea249f7f Branch: refs/heads/REEF-335 Commit: ea249f7f4572748556a285e6828405f9a0f7107f Parents: fed6fb7 Author: Tyson Condie <tcon...@apache.org> Authored: Tue Apr 3 15:11:18 2018 -0700 Committer: Doug Service <dougserv...@apache.org> Committed: Mon Apr 16 23:24:39 2018 +0000 ---------------------------------------------------------------------- lang/common/proto/bridge/ClientProtocol.proto | 108 +++ .../proto/bridge/DriverClientProtocol.proto | 168 +++++ .../proto/bridge/DriverCommonProtocol.proto | 44 ++ .../proto/bridge/DriverServiceProtocol.proto | 155 ++++ lang/java/reef-bridge-proto-java/pom.xml | 255 +++++++ .../client/DefaultDriverClientStopHandler.java | 43 ++ .../reef/bridge/client/DriverClientClock.java | 127 ++++ .../client/DriverClientConfiguration.java | 202 ++++++ .../bridge/client/DriverClientDispatcher.java | 231 ++++++ .../client/DriverClientEvaluatorRequestor.java | 59 ++ .../client/DriverClientExceptionHandler.java | 43 ++ .../bridge/client/IAlarmDispatchHandler.java | 30 + .../bridge/client/IDriverClientService.java | 45 ++ .../bridge/client/IDriverServiceClient.java | 132 ++++ .../reef/bridge/client/JVMClientProcess.java | 121 ++++ .../bridge/client/JavaDriverClientLauncher.java | 217 ++++++ .../client/events/ActiveContextBridge.java | 102 +++ .../client/events/AllocatedEvaluatorBridge.java | 166 +++++ .../client/events/ClosedContextBridge.java | 77 ++ .../client/events/CompletedEvaluatorBridge.java | 39 + .../client/events/CompletedTaskBridge.java | 61 ++ .../client/events/ContextMessageBridge.java | 69 ++ .../client/events/FailedContextBridge.java | 110 +++ .../client/events/FailedEvaluatorBridge.java | 75 ++ .../bridge/client/events/RunningTaskBridge.java | 90 +++ .../bridge/client/events/TaskMessageBridge.java | 78 ++ .../reef/bridge/client/events/package-info.java | 22 + .../grpc/DriverClientGrpcConfiguration.java | 42 ++ .../bridge/client/grpc/DriverClientService.java | 458 ++++++++++++ .../bridge/client/grpc/DriverServiceClient.java | 225 ++++++ .../reef/bridge/client/grpc/package-info.java | 22 + .../grpc/parameters/DriverServicePort.java | 29 + .../client/grpc/parameters/package-info.java | 22 + .../apache/reef/bridge/client/package-info.java | 22 + .../parameters/ClientDriverStopHandler.java | 36 + .../DriverClientDispatchThreadCount.java | 30 + .../bridge/client/parameters/package-info.java | 22 + .../examples/WindowsRuntimePathProvider.java | 43 ++ .../reef/bridge/examples/hello/HelloDriver.java | 83 +++ .../reef/bridge/examples/hello/HelloREEF.java | 75 ++ .../reef/bridge/examples/hello/HelloTask.java | 39 + .../bridge/examples/hello/package-info.java | 22 + .../reef/bridge/examples/package-info.java | 22 + .../bridge/service/DriverClientException.java | 30 + .../service/DriverServiceConfiguration.java | 47 ++ .../bridge/service/DriverServiceHandlers.java | 236 +++++++ .../bridge/service/DriverServiceLauncher.java | 328 +++++++++ .../reef/bridge/service/IDriverService.java | 138 ++++ .../reef/bridge/service/RuntimeNames.java | 36 + .../bridge/service/grpc/GRPCDriverService.java | 706 +++++++++++++++++++ .../reef/bridge/service/grpc/package-info.java | 22 + .../reef/bridge/service/package-info.java | 22 + .../service/parameters/DriverClientCommand.java | 31 + .../bridge/service/parameters/package-info.java | 22 + .../evaluator/EvaluatorDescriptorImpl.java | 4 +- .../common/launch/JavaLaunchCommandBuilder.java | 2 +- .../ports/parameters/TcpPortRangeBegin.java | 3 +- .../ports/parameters/TcpPortRangeCount.java | 3 +- .../ports/parameters/TcpPortRangeTryCount.java | 3 +- pom.xml | 4 + 60 files changed, 5692 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/ClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/ClientProtocol.proto b/lang/common/proto/bridge/ClientProtocol.proto new file mode 100644 index 0000000..68bdcaa --- /dev/null +++ b/lang/common/proto/bridge/ClientProtocol.proto @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +syntax = "proto3"; + +// option java_generic_services = true; +// option java_multiple_files = true; +option java_package = "org.apache.reef.bridge.proto"; +option java_outer_classname = "ClientProtocol"; +option csharp_namespace = "Org.Apache.REEF.Bridge.Proto"; + +package driverbridge; + +message LocalRuntimeParameters { + uint32 max_number_of_evaluators = 1; + string runtime_root_folder = 2; + string jvm_heap_slack = 3; + repeated string rack_names = 4; +} + +message YarnRuntimeParameters { + string queue = 1; + string job_submission_directory_prefix = 2; +} + +message AzureBatchRuntimeParameters { + +} + +message MesosRuntimeParameters { + +} + +message DriverClientConfiguration { + string jobid = 1; + + // driver machine resources + uint32 cpu_cores = 2; + uint32 memory_mb = 3; + + // the runtime on which to launch + oneof runtime { + LocalRuntimeParameters local_runtime = 4; + YarnRuntimeParameters yarn_runtime = 5; + AzureBatchRuntimeParameters azbatch_runtime = 6; + MesosRuntimeParameters mesos_runtime = 7; + } + + // The command to launch the driver client + string driver_client_launch_command = 10; + + enum Handlers { + // control events + START = 0; + STOP = 1; + + // evaluator events + EVALUATOR_ALLOCATED = 5; + EVALUATOR_COMPLETED = 6; + EVALUATOR_FAILED = 7; + + // context events + CONTEXT_ACTIVE = 10; + CONTEXT_CLOSED = 11; + CONTEXT_FAILED = 12; + CONTEXT_MESSAGE = 13; + + // task events + TASK_RUNNING = 15; + TASK_FAILED = 16; + TASK_COMPLETED = 17; + TASK_MESSAGE = 18; + + // client events + CLIENT_MESSAGE = 20; + CLIENT_CLOSE = 21; + CLIENT_CLOSE_WITH_MESSAGE = 22; + } + repeated Handlers handler = 11; + + // TCP port range + uint32 tcp_port_range_begin = 15; + uint32 tcp_port_range_count = 16; + uint32 tcp_port_range_try_count = 17; + + // file dependencies + repeated string global_files = 20; + repeated string local_files = 21; + repeated string global_libraries = 22; + repeated string local_libraries = 23; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverClientProtocol.proto b/lang/common/proto/bridge/DriverClientProtocol.proto new file mode 100644 index 0000000..f80cff2 --- /dev/null +++ b/lang/common/proto/bridge/DriverClientProtocol.proto @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +syntax = "proto3"; + +// option java_generic_services = true; +option java_multiple_files = true; +option java_package = "org.apache.reef.bridge.proto"; +option java_outer_classname = "DriverClientProtocol"; +option csharp_namespace = "Org.Apache.REEF.Bridge.Proto"; + +package driverbridge; + +import "DriverCommonProtocol.proto"; + +// The java driver service definition. +service DriverClient { + // Inquire if idle + rpc IdlenessCheckHandler (Void) returns (IdleStatus) {} + + // Request for resources + rpc StartHandler (StartTimeInfo) returns (Void) {} + + rpc StopHandler (StopTimeInfo) returns (Void) {} + + rpc AlarmTrigger (AlarmTriggerInfo) returns (Void) {} + + // Evaluator handlers + rpc AllocatedEvaluatorHandler (EvaluatorInfo) returns (Void) {} + + rpc CompletedEvaluatorHandler (EvaluatorInfo) returns (Void) {} + + rpc FailedEvaluatorHandler (EvaluatorInfo) returns (Void) {} + + // Context handlers + rpc ActiveContextHandler (ContextInfo) returns (Void) {} + + rpc ClosedContextHandler (ContextInfo) returns (Void) {} + + rpc FailedContextHandler (ContextInfo) returns (Void) {} + + rpc ContextMessageHandler (ContextMessageInfo) returns (Void) {} + + // Task handlers + rpc RunningTaskHandler (TaskInfo) returns (Void) {} + + rpc FailedTaskHandler (TaskInfo) returns (Void) {} + + rpc CompletedTaskHandler (TaskInfo) returns (Void) {} + + rpc SuspendedTaskHandler (TaskInfo) returns (Void) {} + + rpc TaskMessageHandler (TaskMessageInfo) returns (Void) {} + + // Client Handlers + rpc ClientMessageHandler (ClientMessageInfo) returns (Void) {} + + rpc ClientCloseHandler (Void) returns (Void) {} + + rpc ClientCloseWithMessageHandler (ClientMessageInfo) returns (Void) {} +} + +// IdleStatus response to idleness inquiry +message IdleStatus { + bool is_idle = 1; + string reason = 2; +} + +// The request message containing resource request. +message StartTimeInfo { + int64 start_time = 1; +} + +message StopTimeInfo { + int64 stop_time = 1; +} + +// Information associated with an alarm that was set. +message AlarmTriggerInfo { + string alarm_id = 1; +} + +message EvaluatorDescriptorInfo { + // the amount of memory allocated + int32 memory = 1; + + // the number of virtual cores allocated + int32 cores = 2; + + // name of the runtime + string runtime_name = 3; +} + +message EvaluatorInfo { + string evaluator_id = 1; + + message FailureInfo { + string message = 1; + repeated string failedContexts = 2; + string failedTaskId = 3; + } + FailureInfo failure = 2; + + EvaluatorDescriptorInfo descriptor_info = 3; +} + +message ContextInfo { + string context_id = 1; + + string evaluator_id = 2; + + string parent_id = 3; + + // Optional exception information + ExceptionInfo exception = 5; +} + +message ContextMessageInfo { + string context_id = 1; + + bytes payload = 2; + + int64 sequence_number = 3; + + string message_source_id = 4; +} + +message TaskInfo { + string task_id = 1; + + string context_id = 2; + + bytes result = 3; + + ExceptionInfo exception = 5; +} + +message TaskMessageInfo { + string task_id = 1; + + bytes payload = 2; + + int64 sequence_number = 3; + + string context_id = 4; + + string message_source_id = 5; +} + +message ClientMessageInfo { + bytes payload = 1; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverCommonProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverCommonProtocol.proto b/lang/common/proto/bridge/DriverCommonProtocol.proto new file mode 100644 index 0000000..7ec8905 --- /dev/null +++ b/lang/common/proto/bridge/DriverCommonProtocol.proto @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +syntax = "proto3"; + +// option java_generic_services = true; +option java_multiple_files = true; +option java_package = "org.apache.reef.bridge.proto"; +option csharp_namespace = "Org.Apache.REEF.Bridge.Proto"; + +package driverbridge; + +// Void message type +message Void {} + +message ExceptionInfo { + // Exception name/type + string name = 1; + + // Exception message + string message = 2; + + // Stack trace + repeated string stack_trace = 3; + + // Data associated with exception + bytes data = 4; +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/common/proto/bridge/DriverServiceProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/DriverServiceProtocol.proto b/lang/common/proto/bridge/DriverServiceProtocol.proto new file mode 100644 index 0000000..7f6da24 --- /dev/null +++ b/lang/common/proto/bridge/DriverServiceProtocol.proto @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +syntax = "proto3"; + +// option java_generic_services = true; +option java_multiple_files = true; +option java_package = "org.apache.reef.bridge.proto"; +option java_outer_classname = "DriverBridgeProtocol"; +option csharp_namespace = "Org.Apache.REEF.Bridge.Proto"; + +package driverbridge; + +import "DriverCommonProtocol.proto"; + +// The java driver service definition. +service DriverService { + // Driver client registration + rpc RegisterDriverClient (DriverClientRegistration) returns (Void) {} + + // Request for resources + rpc RequestResources (ResourceRequest) returns (Void) {} + + // Request system shutdown + rpc Shutdown (ShutdownRequest) returns (Void) {} + + // Request for setting an alarm + rpc SetAlarm (AlarmRequest) returns (Void) {} + + // Request operation on an allocated evaluator + rpc AllocatedEvaluatorOp (AllocatedEvaluatorRequest) returns (Void) {} + + // Request operation on an active context + rpc ActiveContextOp (ActiveContextRequest) returns (Void) {} + + // Request operation on a running task + rpc RunningTaskOp (RunningTaskRequest) returns (Void) {} +} + +message DriverClientRegistration { + // The client's host + string host = 1; + + // The client's server port + int32 port = 2; +} + +// The request message containing resource request. +message ResourceRequest { + repeated string node_name_list = 1; + + repeated string rack_name_list = 2; + + int32 resource_count = 3; + + int32 memory_size = 4; + + int32 priority = 5; + + int32 cores = 6; + + bool relax_locality = 7; + + string runtime_name = 8; +} + +// Request for an alarm to be set +message AlarmRequest { + // used to uniquely identify the alarm + string alarm_id = 1; + + // timeout in milliseconds + int32 timeout_ms = 2; +} + +message ShutdownRequest { + ExceptionInfo exception = 1; +} + +message AllocatedEvaluatorRequest { + // The evaluator used to submit + string evaluator_id = 1; + + bool close_evaluator = 2; + + repeated string add_files = 3; + + repeated string add_libraries = 4; + + // Evaluator configuration + string evaluator_configuration = 5; + + // Context configuration + string context_configuration = 6; + + // Task configuration + string task_configuration = 7; + + message EvaluatorProcessRequest { + int32 memory_mb = 1; + + string configuration_file_name = 2; + + string standard_out = 3; + + string standard_err = 4; + + repeated string options = 5; + } + EvaluatorProcessRequest set_process = 8; +} + +message ActiveContextRequest { + string context_id = 1; + + oneof operation { + // close the context + bool close_context = 2; + + // send message to the context + bytes message = 3; + + // create a child context + string new_context_request = 4; + + // launch a task + string new_task_request = 5; + } +} + +message RunningTaskRequest { + string task_id = 1; + + // close the task + bool close_task = 2; + + // send task a message + bytes message = 3; +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/pom.xml b/lang/java/reef-bridge-proto-java/pom.xml new file mode 100644 index 0000000..f177b7c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/pom.xml @@ -0,0 +1,255 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <artifactId>reef-bridge-proto-java</artifactId> + <name>REEF Bridge Protobuf Java</name> + <description>Protocol Buffer Bridge between JVM and CLR.</description> + + <parent> + <groupId>org.apache.reef</groupId> + <artifactId>reef-project</artifactId> + <version>0.17.0-SNAPSHOT</version> + <relativePath>../../..</relativePath> + </parent> + + <properties> + <rootPath>${basedir}/../../..</rootPath> + <protoPath>${rootPath}/lang/common/proto/bridge</protoPath> + <!-- protobuf paths --> + <protobuf.input.directory>${protoPath}</protobuf.input.directory> + <protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory> + + <!-- library versions --> + <maven.assembly>3.1.0</maven.assembly> + <grpc.version>1.10.0</grpc.version><!-- CURRENT_GRPC_VERSION --> + <netty.version>4.1.17.Final</netty.version> + <build-helper-maven-plugin.version>1.9.1</build-helper-maven-plugin.version> + <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version> + <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version> + <maven-shade-plugin.version>2.4.2</maven-shade-plugin.version> + <os-maven-plugin.version>1.4.1.Final</os-maven-plugin.version> + <protobuf.version>3.5.1</protobuf.version> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-webserver</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.9</version> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-all</artifactId> + <version>${grpc.version}</version> + </dependency> + <!-- netty version used by grpc --> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-runtime-yarn</artifactId> + <version>0.17.0-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <extensions> + <!-- provides os.detected.classifier (i.e. linux-x86_64, osx-x86_64) property --> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>${os-maven-plugin.version}</version> + </extension> + </extensions> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <configLocation>lang/java/reef-common/src/main/resources/checkstyle-strict.xml</configLocation> + </configuration> + </plugin> + <!-- Unfortunately, we need to run this plugin twice (seperately) to generate gRPC service definitions + and protocol buffer message definitions. Combining them in one call has a bug in <clearOutputDirectory> + which we do only on the first run. + --> + <!-- Generate gRPC definitions --> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.1</version> + <configuration> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + <protoSourceRoot>${protobuf.input.directory}</protoSourceRoot> + <outputDirectory>${protobuf.output.directory}</outputDirectory> + <clearOutputDirectory>true</clearOutputDirectory> + </configuration> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- Generate protocol buffer message definitions --> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.1</version> + <configuration> + <pluginId>grpc-java</pluginId> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <protoSourceRoot>${protobuf.input.directory}</protoSourceRoot> + <outputDirectory>${protobuf.output.directory}</outputDirectory> + <clearOutputDirectory>false</clearOutputDirectory> + </configuration> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- add generated grpc classes into the package --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <id>add-classes</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>${protobuf.output.directory}</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <!-- shade protobuf to avoid version conflicts --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven-shade-plugin.version}</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <relocations> + <relocation> + <pattern>com.google.protobuf</pattern> + <shadedPattern>${project.groupId}.${project.artifactId}.shaded.protobuf</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven-shade-plugin.version}</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <relocations> + <relocation> + <pattern>io.netty</pattern> + <shadedPattern>${project.groupId}.${project.artifactId}.shaded.netty</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven.assembly}</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> <!-- bind to the packaging phase --> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java new file mode 100644 index 0000000..8636f7a --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DefaultDriverClientStopHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default java client driver stop handler. + */ +public final class DefaultDriverClientStopHandler implements EventHandler<StopTime> { + + private static final Logger LOG = Logger.getLogger(DefaultDriverClientStopHandler.class.getName()); + + @Inject + private DefaultDriverClientStopHandler() {} + + @Override + public void onNext(final StopTime value) { + LOG.log(Level.FINEST, "Stop time {0}", value); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java new file mode 100644 index 0000000..162cbe5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientClock.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.runtime.Timer; +import org.apache.reef.wake.time.runtime.event.ClientAlarm; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The bridge driver client clock. + */ +public final class DriverClientClock implements Clock, IAlarmDispatchHandler { + + private static final Logger LOG = Logger.getLogger(DriverClientClock.class.getName()); + + private final IDriverClientService driverClientService; + + private final IDriverServiceClient driverServiceClient; + + private final Timer timer; + + private final Map<String, ClientAlarm> alarmMap = new HashMap<>(); + + private boolean closed = false; + + @Inject + private DriverClientClock( + final Timer timer, + final IDriverClientService driverClientService, + final IDriverServiceClient driverServiceClient) { + this.timer = timer; + this.driverClientService = driverClientService; + this.driverServiceClient = driverServiceClient; + } + + @Override + public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + final ClientAlarm alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); + final String alarmId = UUID.randomUUID().toString(); + this.alarmMap.put(alarmId, alarm); + this.driverServiceClient.onSetAlarm(alarmId, offset); + return alarm; + } + + @Override + public void close() { + stop(); + } + + @Override + public void stop() { + if (!closed) { + this.closed = true; + this.driverServiceClient.onShutdown(); + } + } + + @Override + public void stop(final Throwable exception) { + if (!closed) { + this.closed = true; + this.driverServiceClient.onShutdown(exception); + } + } + + @Override + public boolean isIdle() { + return this.closed && this.alarmMap.isEmpty(); + } + + @Override + public boolean isClosed() { + return this.closed; + } + + @Override + public void run() { + try { + this.driverClientService.start(); + this.driverClientService.awaitTermination(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Alarm clock event handler. + * @param alarmId alarm identifier + */ + @Override + public void onNext(final String alarmId) { + if (this.alarmMap.containsKey(alarmId)) { + final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId); + clientAlarm.run(); + } else { + LOG.log(Level.SEVERE, "Unknown alarm id {0}", alarmId); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java new file mode 100644 index 0000000..50da3ce --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientConfiguration.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount; +import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.task.*; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalImpl; +import org.apache.reef.tang.formats.RequiredImpl; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +/** + * Driver client configuration. + */ +public final class DriverClientConfiguration extends ConfigurationModuleBuilder { + + /** + * The event handler invoked right after the driver boots up. + */ + public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>(); + + /** + * The event handler invoked right before the driver shuts down. Defaults to ignore. + */ + public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>(); + + // ***** EVALUATOR HANDLER BINDINGS: + + /** + * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound. + */ + public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>(); + + /** + * Event handler for completed evaluators. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>(); + + /** + * Event handler for failed evaluators. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>(); + + // ***** TASK HANDLER BINDINGS: + + /** + * Event handler for task messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>(); + + /** + * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound. + */ + public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>(); + + /** + * Event handler for failed tasks. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>(); + + /** + * Event handler for running tasks. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>(); + + /** + * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support + * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then. + */ + public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>(); + + // ***** CLIENT HANDLER BINDINGS: + + /** + * Event handler for client messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>(); + + /** + * Event handler for close messages sent by the client. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>(); + + /** + * Event handler for close messages sent by the client. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>(); + + // ***** CONTEXT HANDLER BINDINGS: + + /** + * Event handler for active context. Defaults to closing the context if not bound. + */ + public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>(); + + /** + * Event handler for closed context. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>(); + + /** + * Event handler for closed context. Defaults to job failure if not bound. + */ + public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>(); + + /** + * Event handler for context messages. Defaults to logging if not bound. + */ + public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>(); + + /** + * Number of dispatch threads to use. + */ + public static final OptionalImpl<Integer> CLIENT_DRIVER_DISPATCH_THREAD_COUNT = new OptionalImpl<>(); + + /** + * Alarm dispatch handler. + */ + public static final OptionalImpl<IAlarmDispatchHandler> ALARM_DISPATCH_HANDLER = new OptionalImpl<>(); + + /** + * Default to gRPC Driver Client Service. + */ + public static final OptionalImpl<IDriverClientService> DRIVER_CLIENT_SERVICE = new OptionalImpl<>(); + + /** + * Default to gRPC Driver Service Client. + */ + public static final OptionalImpl<IDriverServiceClient> DRIVER_SERVICE_CLIENT = new OptionalImpl<>(); + + /** + * ConfigurationModule to fill out to get a legal Driver Configuration. + */ + public static final ConfigurationModule CONF = new DriverClientConfiguration() + .bindImplementation(Clock.class, DriverClientClock.class) + .bindImplementation(EvaluatorRequestor.class, DriverClientEvaluatorRequestor.class) + .bindImplementation(IAlarmDispatchHandler.class, ALARM_DISPATCH_HANDLER) + .bindImplementation(IDriverClientService.class, DRIVER_CLIENT_SERVICE) + .bindImplementation(IDriverServiceClient.class, DRIVER_SERVICE_CLIENT) + + .bindNamedParameter(DriverClientDispatchThreadCount.class, CLIENT_DRIVER_DISPATCH_THREAD_COUNT) + + // Driver start/stop handlers + .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED) + .bindSetEntry(ClientDriverStopHandler.class, ON_DRIVER_STOP) + + // Evaluator handlers + .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED) + .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED) + .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED) + + // Task handlers + .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING) + .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED) + .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE) + .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED) + .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED) + + // Context handlers + .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE) + .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED) + .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE) + .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED) + + // Client handlers + .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE) + .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED) + .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE) + + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java new file mode 100644 index 0000000..3dd9b88 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientDispatcher.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import com.google.common.collect.Sets; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.bridge.client.parameters.DriverClientDispatchThreadCount; +import org.apache.reef.bridge.client.parameters.ClientDriverStopHandler; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.context.ClosedContext; +import org.apache.reef.driver.context.ContextMessage; +import org.apache.reef.driver.context.FailedContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.CompletedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; +import org.apache.reef.driver.parameters.*; +import org.apache.reef.driver.task.*; +import org.apache.reef.runtime.common.utils.DispatchingEStage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; +import org.apache.reef.wake.time.event.StopTime; + +import javax.inject.Inject; +import java.util.Set; + +/** + * Async dispatch of client driver events. + */ +@Private +public final class DriverClientDispatcher { + + /** + * Dispatcher used for application provided event handlers. + */ + private final DispatchingEStage applicationDispatcher; + + /** + * Dispatcher for client close events. + */ + private final DispatchingEStage clientCloseDispatcher; + + /** + * Dispatcher for client close with message events. + */ + private final DispatchingEStage clientCloseWithMessageDispatcher; + + /** + * Dispatcher for client messages. + */ + private final DispatchingEStage clientMessageDispatcher; + + /** + * The alarm dispatcher. + */ + private final DispatchingEStage alarmDispatcher; + + @Inject + private DriverClientDispatcher( + final DriverClientExceptionHandler driverExceptionHandler, + final IAlarmDispatchHandler alarmDispatchHandler, + @Parameter(DriverClientDispatchThreadCount.class) + final Integer numberOfThreads, + // Application-provided start and stop handlers + @Parameter(DriverStartHandler.class) + final Set<EventHandler<StartTime>> startHandlers, + @Parameter(ClientDriverStopHandler.class) + final Set<EventHandler<StopTime>> stopHandlers, + // Application-provided Context event handlers + @Parameter(ContextActiveHandlers.class) + final Set<EventHandler<ActiveContext>> contextActiveHandlers, + @Parameter(ContextClosedHandlers.class) + final Set<EventHandler<ClosedContext>> contextClosedHandlers, + @Parameter(ContextFailedHandlers.class) + final Set<EventHandler<FailedContext>> contextFailedHandlers, + @Parameter(ContextMessageHandlers.class) + final Set<EventHandler<ContextMessage>> contextMessageHandlers, + // Application-provided Task event handlers + @Parameter(TaskRunningHandlers.class) + final Set<EventHandler<RunningTask>> taskRunningHandlers, + @Parameter(TaskCompletedHandlers.class) + final Set<EventHandler<CompletedTask>> taskCompletedHandlers, + @Parameter(TaskSuspendedHandlers.class) + final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, + @Parameter(TaskMessageHandlers.class) + final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, + @Parameter(TaskFailedHandlers.class) + final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, + // Application-provided Evaluator event handlers + @Parameter(EvaluatorAllocatedHandlers.class) + final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, + @Parameter(EvaluatorFailedHandlers.class) + final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, + @Parameter(EvaluatorCompletedHandlers.class) + final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, + // Client handlers + @Parameter(ClientCloseHandlers.class) + final Set<EventHandler<Void>> clientCloseHandlers, + @Parameter(ClientCloseWithMessageHandlers.class) + final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, + @Parameter(ClientMessageHandlers.class) + final Set<EventHandler<byte[]>> clientMessageHandlers) { + + this.applicationDispatcher = new DispatchingEStage( + driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher"); + // Application start and stop handlers + this.applicationDispatcher.register(StartTime.class, startHandlers); + this.applicationDispatcher.register(StopTime.class, stopHandlers); + // Application Context event handlers + this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); + this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); + this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); + this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); + + // Application Task event handlers. + this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); + this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); + this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); + this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); + this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); + + // Application Evaluator event handlers + this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); + this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); + this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); + + // Client event handlers; + this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientCloseDispatcher.register(Void.class, clientCloseHandlers); + + this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers); + + this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers); + + // Alarm event handlers + this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher); + this.alarmDispatcher.register(String.class, + Sets.newHashSet((EventHandler<String>)alarmDispatchHandler)); + } + + public void dispatch(final StartTime startTime) { + this.applicationDispatcher.onNext(StartTime.class, startTime); + } + + public void dispatch(final StopTime stopTime) { + this.applicationDispatcher.onNext(StopTime.class, stopTime); + } + + public void dispatch(final ActiveContext context) { + this.applicationDispatcher.onNext(ActiveContext.class, context); + } + + public void dispatch(final ClosedContext context) { + this.applicationDispatcher.onNext(ClosedContext.class, context); + } + + public void dispatch(final FailedContext context) { + this.applicationDispatcher.onNext(FailedContext.class, context); + } + + public void dispatch(final ContextMessage message) { + this.applicationDispatcher.onNext(ContextMessage.class, message); + } + + public void dispatch(final AllocatedEvaluator evaluator) { + this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator); + } + + public void dispatch(final FailedEvaluator evaluator) { + this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator); + } + + public void dispatch(final CompletedEvaluator evaluator) { + this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator); + } + + public void dispatch(final RunningTask task) { + this.applicationDispatcher.onNext(RunningTask.class, task); + } + + public void dispatch(final CompletedTask task) { + this.applicationDispatcher.onNext(CompletedTask.class, task); + } + + public void dispatch(final FailedTask task) { + this.applicationDispatcher.onNext(FailedTask.class, task); + } + + public void dispatch(final SuspendedTask task) { + this.applicationDispatcher.onNext(SuspendedTask.class, task); + } + + public void dispatch(final TaskMessage message) { + this.applicationDispatcher.onNext(TaskMessage.class, message); + } + + public void clientCloseDispatch() { + this.clientCloseDispatcher.onNext(Void.class, null); + } + + public void clientCloseWithMessageDispatch(final byte[] message) { + this.clientCloseWithMessageDispatcher.onNext(byte[].class, message); + } + + public void clientMessageDispatch(final byte[] message) { + this.clientMessageDispatcher.onNext(byte[].class, message); + } + + public void dispatchAlarm(final String alarmId) { + this.alarmDispatcher.onNext(String.class, alarmId); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java new file mode 100644 index 0000000..a774b2f --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientEvaluatorRequestor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; + +import javax.inject.Inject; + +/** + * Driver Client evaluator requestor. + */ +public final class DriverClientEvaluatorRequestor implements EvaluatorRequestor { + + private final IDriverServiceClient driverServiceClient; + + @Inject + private DriverClientEvaluatorRequestor(final IDriverServiceClient driverServiceClient) { + this.driverServiceClient = driverServiceClient; + } + + @Override + public void submit(final EvaluatorRequest req) { + this.driverServiceClient.onEvaluatorRequest(req); + } + + @Override + public EvaluatorRequest.Builder newRequest() { + return new DriverClientEvaluatorRequestor.Builder(); + } + + /** + * {@link DriverClientEvaluatorRequestor.Builder} extended with a new submit method. + * {@link EvaluatorRequest}s are built using this builder. + */ + public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> { + @Override + public synchronized void submit() { + DriverClientEvaluatorRequestor.this.submit(this.build()); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java new file mode 100644 index 0000000..d09ce41 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverClientExceptionHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver client exception handler. + */ +public final class DriverClientExceptionHandler implements EventHandler<Throwable> { + private static final Logger LOG = Logger.getLogger(DriverClientExceptionHandler.class.getName()); + + @Inject + private DriverClientExceptionHandler() { + LOG.log(Level.FINE, "Instantiated 'DriverExceptionHandler'"); + } + + + @Override + public void onNext(final Throwable throwable) { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java new file mode 100644 index 0000000..a9fee48 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IAlarmDispatchHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EventHandler; + +/** + * Alarm dispatch handler. + */ +@DefaultImplementation(DriverClientClock.class) +public interface IAlarmDispatchHandler extends EventHandler<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java new file mode 100644 index 0000000..c71b554 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverClientService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.client.grpc.DriverClientService; +import org.apache.reef.tang.annotations.DefaultImplementation; + +import java.io.IOException; + +/** + * Interface that driver client services implement. + */ +@DefaultImplementation(DriverClientService.class) +public interface IDriverClientService { + + /** + * Start the DriverClient service. + * @throws IOException when unable to start service + */ + void start() throws IOException; + + + /** + * Wait for termination of driver client service. + */ + void awaitTermination() throws InterruptedException; + +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java new file mode 100644 index 0000000..e1f8cb7 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/IDriverServiceClient.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.client.grpc.DriverServiceClient; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +import java.io.File; +import java.util.List; + +/** + * Forwards application requests to driver server. + */ +@DefaultImplementation(DriverServiceClient.class) +public interface IDriverServiceClient { + + /** + * Initiate shutdown. + */ + void onShutdown(); + + /** + * Initiate shutdown with error. + * @param ex exception error + */ + void onShutdown(final Throwable ex); + + /** + * Set alarm. + * @param alarmId alarm identifier + * @param timeoutMS timeout in milliseconds + */ + void onSetAlarm(final String alarmId, final int timeoutMS); + + /** + * Request evaluators. + * @param evaluatorRequest event + */ + void onEvaluatorRequest(final EvaluatorRequest evaluatorRequest); + + /** + * Close evaluator. + * @param evalautorId to close + */ + void onEvaluatorClose(final String evalautorId); + + /** + * Submit context and/or task. + * @param evaluatorId to submit against + * @param contextConfiguration context configuration + * @param taskConfiguration task configuration + * @param evaluatorProcess evaluator process + * @param addFileList to include + * @param addLibraryList to include + */ + void onEvaluatorSubmit( + final String evaluatorId, + final Optional<Configuration> contextConfiguration, + final Optional<Configuration> taskConfiguration, + final Optional<JVMClientProcess> evaluatorProcess, + final Optional<List<File>> addFileList, + final Optional<List<File>> addLibraryList); + + // Context Operations + + /** + * Close context. + * @param contextId to close + */ + void onContextClose(final String contextId); + + /** + * Submit child context. + * @param contextId to submit against + * @param contextConfiguration for child context + */ + void onContextSubmitContext( + final String contextId, + final Configuration contextConfiguration); + + /** + * Submit task. + * @param contextId to submit against + * @param taskConfiguration for task + */ + void onContextSubmitTask( + final String contextId, + final Configuration taskConfiguration); + + /** + * Send message to context. + * @param contextId to destination context + * @param message to send + */ + void onContextMessage(final String contextId, final byte[] message); + + // Task operations + + /** + * Close the task. + * @param taskId to close + * @param message optional message to include + */ + void onTaskClose(final String taskId, final Optional<byte[]> message); + + /** + * Send task a message. + * @param taskId of destination task + * @param message to send + */ + void onTaskMessage(final String taskId, final byte[] message); +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java new file mode 100644 index 0000000..cdcb9b5 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JVMClientProcess.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.driver.evaluator.EvaluatorType; + +import java.util.ArrayList; +import java.util.List; + +/** + * Stub class for Evaluator Process on driver client. + */ +@Private +public final class JVMClientProcess implements EvaluatorProcess { + + private boolean optionSet = false; + + private int megaBytes = 0; + + private String configurationFileName = null; + + private String standardOut = null; + + private String standardErr = null; + + private final List<String> optionList = new ArrayList<>(); + + public JVMClientProcess() { + } + + @Override + public List<String> getCommandLine() { + throw new UnsupportedOperationException(); + } + + @Override + public EvaluatorType getType() { + return EvaluatorType.JVM; + } + + @Override + public JVMClientProcess setMemory(final int mb) { + this.megaBytes = mb; + this.optionSet = true; + return this; + } + + public int getMemory() { + return this.megaBytes; + } + + @Override + public boolean isOptionSet() { + return optionSet; + } + + @Override + public JVMClientProcess setConfigurationFileName(final String configurationFileName) { + this.configurationFileName = configurationFileName; + return this; + } + + public String getConfigurationFileName() { + return this.configurationFileName; + } + + @Override + public JVMClientProcess setStandardOut(final String standardOut) { + this.standardOut = standardOut; + return this; + } + + public String getStandardOut() { + return this.standardOut; + } + + @Override + public JVMClientProcess setStandardErr(final String standardErr) { + this.standardErr = standardErr; + return this; + } + + public String getStandardErr() { + return this.standardErr; + } + + /** + * Add a JVM option. + * @param option The full option, e.g. "-XX:+PrintGCDetails", "-Xms500m" + * @return this + */ + public JVMClientProcess addOption(final String option) { + this.optionList.add(option); + optionSet = true; + return this; + } + + public List<String> getOptions() { + return this.optionList; + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java new file mode 100644 index 0000000..a7bf37c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/JavaDriverClientLauncher.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.bridge.client.grpc.DriverClientGrpcConfiguration; +import org.apache.reef.bridge.client.grpc.parameters.DriverServicePort; +import org.apache.reef.runtime.common.REEFLauncher; +import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; +import org.apache.reef.runtime.common.launch.REEFErrorHandler; +import org.apache.reef.runtime.common.launch.REEFMessageCodec; +import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler; +import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.util.ThreadLogger; +import org.apache.reef.util.logging.LoggingSetup; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.time.Clock; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver client launcher. + */ +public final class JavaDriverClientLauncher { + + private static final Logger LOG = Logger.getLogger(REEFLauncher.class.getName()); + + private static final Tang TANG = Tang.Factory.getTang(); + + private static final Configuration LAUNCHER_STATIC_CONFIG = + TANG.newConfigurationBuilder() + .bindNamedParameter(RemoteConfiguration.ManagerName.class, "DRIVER_CLIENT_LAUNCHER") + .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) + .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) + .build(); + + static { + LoggingSetup.setupCommonsLogging(); + } + + /** + * Main configuration object of the REEF component we are launching here. + */ + private final Configuration envConfig; + + /** + * REEFLauncher is instantiated in the main() method below using + * Tang configuration file provided as a command line argument. + * @param configurationPath Path to the serialized Tang configuration file. + * (The file must be in the local file system). + * @param configurationSerializer Serializer used to read the configuration file. + * We currently use Avro to serialize Tang configs. + */ + @Inject + private JavaDriverClientLauncher( + @Parameter(DriverServicePort.class) final Integer driverServicePort, + @Parameter(ClockConfigurationPath.class) final String configurationPath, + final ConfigurationSerializer configurationSerializer) { + + this.envConfig = Configurations.merge( + LAUNCHER_STATIC_CONFIG, + DriverClientGrpcConfiguration.CONF + .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) + .build(), + readConfigurationFromDisk(configurationPath, configurationSerializer)); + } + + /** + * Instantiate REEF DriverServiceLauncher. This method is called from REEFLauncher.main(). + * @param clockConfigPath Path to the local file that contains serialized configuration + * for the driver client. + * @return An instance of the configured REEFLauncher object. + */ + private static JavaDriverClientLauncher getLauncher(final String clockConfigPath, final int driverServicePort) { + + try { + + final Configuration clockArgConfig = Configurations.merge( + LAUNCHER_STATIC_CONFIG, + DriverClientGrpcConfiguration.CONF + .set(DriverClientGrpcConfiguration.DRIVER_SERVICE_PORT, driverServicePort) + .build(), + TANG.newConfigurationBuilder() + .bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) + .build()); + + return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class); + + } catch (final BindException ex) { + throw fatal("Error in parsing the command line", ex); + } catch (final InjectionException ex) { + throw fatal("Unable to instantiate REEFLauncher.", ex); + } + } + + /** + * Read configuration from a given file and deserialize it + * into Tang configuration object that can be used for injection. + * Configuration is currently serialized using Avro. + * This method also prints full deserialized configuration into log. + * @param configPath Path to the local file that contains serialized configuration + * of a REEF component to launch (can be either Driver or Evaluator). + * @param serializer An object to deserialize the configuration file. + * @return Tang configuration read and deserialized from a given file. + */ + private static Configuration readConfigurationFromDisk( + final String configPath, final ConfigurationSerializer serializer) { + + LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); + + final File evaluatorConfigFile = new File(configPath); + + if (!evaluatorConfigFile.exists()) { + throw fatal( + "Configuration file " + configPath + " does not exist. Can be an issue in job submission.", + new FileNotFoundException(configPath)); + } + + if (!evaluatorConfigFile.canRead()) { + throw fatal( + "Configuration file " + configPath + " exists, but can't be read.", + new IOException(configPath)); + } + + try { + + final Configuration config = serializer.fromFile(evaluatorConfigFile); + LOG.log(Level.FINEST, "The configuration file loaded: {0}", configPath); + + return config; + + } catch (final IOException e) { + throw fatal("Unable to parse the configuration file: " + configPath, e); + } + } + + /** + * Launches a REEF client process (Driver or Evaluator). + * @param args Command-line arguments. + * Must be a single element containing local path to the configuration file. + */ + @SuppressWarnings("checkstyle:illegalcatch") + public static void main(final String[] args) { + + LOG.log(Level.INFO, "Entering JavaDriverClientLauncher.main()."); + + LOG.log(Level.FINE, "JavaDriverClientLauncher started with user name [{0}]", System.getProperty("user.name")); + LOG.log(Level.FINE, "JavaDriverClientLauncher started. Assertions are {0} in this process.", + EnvironmentUtils.areAssertionsEnabled() ? "ENABLED" : "DISABLED"); + + if (args.length != 2) { + final String message = "JavaDriverClientLauncher have two and only two arguments to specify the runtime clock " + + "configuration path and driver service port"; + + throw fatal(message, new IllegalArgumentException(message)); + } + + final JavaDriverClientLauncher launcher = getLauncher(args[0], Integer.parseInt(args[1])); + + Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.envConfig)); + final Injector injector = Tang.Factory.getTang().newInjector(launcher.envConfig); + try (final Clock reef = injector.getInstance(Clock.class)) { + reef.run(); + } catch (final Throwable ex) { + throw fatal("Unable to configure and start Clock.", ex); + } + + ThreadLogger.logThreads(LOG, Level.FINEST, "Threads running after Clock.close():"); + + LOG.log(Level.INFO, "Exiting REEFLauncher.main()"); + + System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() + } + + /** + * Wrap an exception into RuntimeException with a given message, + * and write the same message and exception to the log. + * @param msg an error message to log and pass into the RuntimeException. + * @param t A Throwable exception to log and wrap. + * @return a new Runtime exception wrapping a Throwable. + */ + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ea249f7f/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java new file mode 100644 index 0000000..54645a0 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/events/ActiveContextBridge.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client.events; + +import org.apache.reef.bridge.client.IDriverServiceClient; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.Optional; + +/** + * Active context bridge. + */ +public final class ActiveContextBridge implements ActiveContext { + + private final IDriverServiceClient driverServiceClient; + + private final String contextId; + + private final Optional<String> parentId; + + private final String evaluatorId; + + private final EvaluatorDescriptor evaluatorDescriptor; + + public ActiveContextBridge( + final IDriverServiceClient driverServiceClient, + final String contextId, + final Optional<String> parentId, + final String evaluatorId, + final EvaluatorDescriptor evaluatorDescriptor) { + this.driverServiceClient = driverServiceClient; + this.contextId = contextId; + this.parentId = parentId; + this.evaluatorId = evaluatorId; + this.evaluatorDescriptor = evaluatorDescriptor; + } + + @Override + public void close() { + this.driverServiceClient.onContextClose(this.contextId); + } + + @Override + public void submitTask(final Configuration taskConf) { + this.driverServiceClient.onContextSubmitTask(this.contextId, taskConf); + } + + @Override + public void submitContext(final Configuration contextConfiguration) { + this.driverServiceClient.onContextSubmitContext(this.contextId, contextConfiguration); + } + + @Override + public void submitContextAndService( + final Configuration contextConfiguration, + final Configuration serviceConfiguration) { + throw new UnsupportedOperationException("Service not supported"); + } + + @Override + public void sendMessage(final byte[] message) { + this.driverServiceClient.onContextMessage(this.contextId, message); + } + + @Override + public String getEvaluatorId() { + return this.evaluatorId; + } + + @Override + public Optional<String> getParentId() { + return this.parentId; + } + + @Override + public EvaluatorDescriptor getEvaluatorDescriptor() { + return this.evaluatorDescriptor; + } + + @Override + public String getId() { + return this.contextId; + } +}