Repository: reef Updated Branches: refs/heads/master e6ba12a08 -> 30b57ae45
[REEF-1137] REEF Standalone Runtime This addressed the issue by * Is based on REEF Runtime Local and YARN, and uses some of its classes for Standalone Runtime. * Implements ssh connections and remote submission to make HelloREEFStandalone work, using the JSch library. * Assumes that the user has the `authorized_keys` set up with `id_dsa` on the remote machine. * Requires `node_list_file_path` parameter specifying the file containing the list of remote machines. * Saves the results under the REEF_LOCAL_RUNTIME folder. * Credits to Dongjin(@LastOne817) for starting off the job. JIRA: [REEF-1137](https://issues.apache.org/jira/browse/REEF-1137) Pull Request: Closes #1054 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/30b57ae4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/30b57ae4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/30b57ae4 Branch: refs/heads/master Commit: 30b57ae452c6262f8c950da4f96bd42549fac313 Parents: e6ba12a Author: Won Wook SONG <[email protected]> Authored: Fri Jun 24 16:52:22 2016 +0900 Committer: John Yang <[email protected]> Committed: Fri Aug 19 13:10:31 2016 +0900 ---------------------------------------------------------------------- lang/java/reef-examples/pom.xml | 5 + .../examples/hello/HelloREEFStandalone.java | 122 ++++++++ .../local/client/LocalJobSubmissionHandler.java | 4 +- .../local/client/parameters/RootFolder.java | 2 +- .../reef/runtime/local/driver/Container.java | 2 +- .../runtime/local/driver/ProcessContainer.java | 2 +- .../runtime/local/process/RunnableProcess.java | 17 + lang/java/reef-runtime-standalone/pom.xml | 20 ++ ...andaloneDriverConfigurationProviderImpl.java | 108 +++++++ .../client/StandaloneRuntimeConfiguration.java | 48 ++- .../client/parameters/NodeFolder.java | 37 +++ .../client/parameters/NodeInfoSet.java | 31 ++ .../client/parameters/NodeListFilePath.java | 4 +- .../client/parameters/RootFolder.java | 4 +- .../client/parameters/SshPortNum.java | 34 ++ .../standalone/driver/RemoteNodeManager.java | 309 +++++++++++++++++++ .../standalone/driver/SshProcessContainer.java | 261 ++++++++++++++++ .../driver/StandaloneDriverConfiguration.java | 100 ++++++ .../driver/StandaloneResourceLaunchHandler.java | 42 +++ .../StandaloneResourceManagerStartHandler.java | 44 +++ .../StandaloneResourceManagerStopHandler.java | 39 +++ .../StandaloneResourceReleaseHandler.java | 46 +++ .../StandaloneResourceRequestHandler.java | 43 +++ .../runtime/standalone/driver/package-info.java | 22 ++ .../runtime/yarn/driver/REEFEventHandlers.java | 8 +- 25 files changed, 1328 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-examples/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml index 54042b7..2fce95a 100644 --- a/lang/java/reef-examples/pom.xml +++ b/lang/java/reef-examples/pom.xml @@ -48,6 +48,11 @@ under the License. </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-standalone</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>reef-runtime-yarn</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFStandalone.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFStandalone.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFStandalone.java new file mode 100644 index 0000000..c07394d --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hello/HelloREEFStandalone.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hello; + +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.standalone.client.StandaloneRuntimeConfiguration; +import org.apache.reef.runtime.standalone.client.parameters.NodeListFilePath; +import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.CommandLine; +import org.apache.reef.util.EnvironmentUtils; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Client for Hello REEF example on standalone environment. + * This can be run with the command: `java -cp lang/java/reef-examples/target/reef-examples-*-SNAPSHOT-shaded.jar + * org.apache.reef.examples.hello.HelloREEFStandalone -nodelist ../NodeList.txt -port 22` + * Here, we assume that the list of nodes is saved in the ../Nodelist.txt file, with each line containing ssh addresses + * (i.e. `[email protected]`), and `~/.ssh/id_dsa` is set up on your local, with `~/.ssh/authorized_keys` containing + * the contents of your `~/.ssh/id_dsa.pub`. + * The port parameter is optional. + */ +public final class HelloREEFStandalone { + private static final Logger LOG = Logger.getLogger(HelloREEFStandalone.class.getName()); + + /** + * Number of milliseconds to wait for the job to complete. + */ + private static final int JOB_TIMEOUT = 10000; // 10 sec. + + + /** + * @return the configuration of the runtime + */ + private static Configuration getRuntimeConfiguration(final String nodeListFileName, final int sshPortNum) { + return StandaloneRuntimeConfiguration.CONF + .set(StandaloneRuntimeConfiguration.NODE_LIST_FILE_PATH, nodeListFileName) + .set(StandaloneRuntimeConfiguration.SSH_PORT_NUM, sshPortNum) + .build(); + } + + /** + * @return the configuration of the HelloREEF driver. + */ + private static Configuration getDriverConfiguration() { + return DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(HelloDriver.class)) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEFStandalone") + .set(DriverConfiguration.ON_DRIVER_STARTED, HelloDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloDriver.EvaluatorAllocatedHandler.class) + .build(); + } + + /** + * Start Hello REEF job. + * + * @param args command line parameters. + * @throws BindException configuration error. + * @throws InjectionException configuration error. + */ + public static void main(final String[] args) throws BindException, InjectionException { + + final Tang tang = Tang.Factory.getTang(); + + final JavaConfigurationBuilder cb = tang.newConfigurationBuilder(); + + try{ + new CommandLine(cb) + .registerShortNameOfClass(NodeListFilePath.class) + .registerShortNameOfClass(SshPortNum.class) + .processCommandLine(args); + } catch(final IOException ex) { + LOG.log(Level.SEVERE, "Missing parameter 'nodelist' or wrong parameter input."); + throw new RuntimeException("Missing parameter 'nodelist' or wrong parameter input: ", ex); + } + + final Injector injector = tang.newInjector(cb.build()); + + final String nodeListFilePath = injector.getNamedInstance(NodeListFilePath.class); + final int sshPortNum = injector.getNamedInstance(SshPortNum.class); + + final Configuration runtimeConf = getRuntimeConfiguration(nodeListFilePath, sshPortNum); + final Configuration driverConf = getDriverConfiguration(); + + final LauncherStatus status = DriverLauncher + .getLauncher(runtimeConf) + .run(driverConf, JOB_TIMEOUT); + LOG.log(Level.INFO, "REEF job completed: {0}", status); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private HelloREEFStandalone() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java index 3951c73..be41c94 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java @@ -38,11 +38,11 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * Handles Job Submissions for the Local Runtime. + * Handles Job Submissions for the Local and the Standalone Runtime. */ @Private @ClientSide -final class LocalJobSubmissionHandler implements JobSubmissionHandler { +public final class LocalJobSubmissionHandler implements JobSubmissionHandler { private static final Logger LOG = Logger.getLogger(LocalJobSubmissionHandler.class.getName()); http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/parameters/RootFolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/parameters/RootFolder.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/parameters/RootFolder.java index 9dcdf35..052cb59 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/parameters/RootFolder.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/parameters/RootFolder.java @@ -24,7 +24,7 @@ import org.apache.reef.tang.annotations.NamedParameter; /** * The folder where logs etc. shall be stored. */ -@NamedParameter(default_value = RootFolder.DEFAULT_VALUE, doc = "The folder where logs etc. shall be stored.") +@NamedParameter(default_value = RootFolder.DEFAULT_VALUE, doc = "The folder where the logs and results are stored.") public final class RootFolder implements Name<String> { public static final String DEFAULT_VALUE = "REEF_LOCAL_RUNTIME"; http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/Container.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/Container.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/Container.java index 11a5ae8..84c93de 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/Container.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/Container.java @@ -29,7 +29,7 @@ import java.util.List; * In the case of the local resourcemanager, this slice is always the one of the machine where the job was submitted. */ @Private -interface Container extends AutoCloseable { +public interface Container extends AutoCloseable { /** * Run the given commandLine in the container. http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java index 0b2ccc1..ec70502 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ProcessContainer.java @@ -39,7 +39,7 @@ import java.util.logging.Logger; */ @Private @TaskSide -final class ProcessContainer implements Container { +public final class ProcessContainer implements Container { private static final Logger LOG = Logger.getLogger(ProcessContainer.class.getName()); http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java index df27f98..12b4488 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/RunnableProcess.java @@ -243,6 +243,9 @@ public final class RunnableProcess implements Runnable { } } + /** + * @return a boolean that indicates if the process is running. + */ private boolean processIsRunning() { return this.getState() == State.RUNNING; } @@ -255,6 +258,20 @@ public final class RunnableProcess implements Runnable { } /** + * @return the ID of the process. + */ + public String getId() { + return this.id; + } + + /** + * @return the command given to the process. + */ + public List<String> getCommand() { + return this.command; + } + + /** * Sets a new state for the process. * * @param newState http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/pom.xml b/lang/java/reef-runtime-standalone/pom.xml index afb4a56..8c00059 100644 --- a/lang/java/reef-runtime-standalone/pom.xml +++ b/lang/java/reef-runtime-standalone/pom.xml @@ -38,6 +38,26 @@ under the License. <artifactId>reef-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-yarn</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + <version>0.1.53</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.0.1</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneDriverConfigurationProviderImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneDriverConfigurationProviderImpl.java new file mode 100644 index 0000000..7a0a694 --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneDriverConfigurationProviderImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.client; + +import org.apache.commons.io.IOUtils; +import org.apache.reef.runtime.common.client.DriverConfigurationProvider; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.standalone.client.parameters.NodeFolder; +import org.apache.reef.runtime.standalone.client.parameters.NodeListFilePath; +import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; +import org.apache.reef.runtime.standalone.driver.StandaloneDriverConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationModule; + +import javax.inject.Inject; +import java.io.*; +import java.net.URI; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Helper class that assembles the driver configuration when run on the local runtime. + */ +final class StandaloneDriverConfigurationProviderImpl implements DriverConfigurationProvider { + + private static final Logger LOG = Logger.getLogger(StandaloneDriverConfigurationProviderImpl.class.getName()); + private final double jvmHeapSlack; + private final String nodeListFilePath; + private final String nodeFolder; + private final int sshPortNum; + private final Set<String> nodeInfoSet; + + @Inject + StandaloneDriverConfigurationProviderImpl(@Parameter(JVMHeapSlack.class) final double jvmHeapSlack, + @Parameter(NodeListFilePath.class) final String nodeListFilePath, + @Parameter(NodeFolder.class) final String nodeFolder, + @Parameter(SshPortNum.class) final int sshPortNum) { + this.jvmHeapSlack = jvmHeapSlack; + this.nodeListFilePath = nodeListFilePath; + this.nodeFolder = nodeFolder; + this.sshPortNum = sshPortNum; + this.nodeInfoSet = new HashSet<>(); + + LOG.log(Level.FINEST, "Reading NodeListFilePath"); + final InputStream in; + try { + in = new FileInputStream(this.nodeListFilePath); + this.nodeInfoSet.add(IOUtils.toString(in)); + IOUtils.closeQuietly(in); + } catch(Exception e) { + throw new RuntimeException("File not found exception in StandaloneDriverConfigurationProviderImpl", e); + } + } + + private Configuration getDriverConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + ConfigurationModule configModule = StandaloneDriverConfiguration.CONF + .set(StandaloneDriverConfiguration.ROOT_FOLDER, jobFolder.getPath()) + .set(StandaloneDriverConfiguration.NODE_FOLDER, this.nodeFolder) + .set(StandaloneDriverConfiguration.NODE_LIST_FILE_PATH, this.nodeListFilePath) + .set(StandaloneDriverConfiguration.SSH_PORT_NUM, this.sshPortNum) + .set(StandaloneDriverConfiguration.JVM_HEAP_SLACK, this.jvmHeapSlack) + .set(StandaloneDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) + .set(StandaloneDriverConfiguration.JOB_IDENTIFIER, jobId); + for (final String nodeInfo : nodeInfoSet) { + configModule = configModule.set(StandaloneDriverConfiguration.NODE_INFO_SET, nodeInfo); + } + return configModule.build(); + } + + /** + * Assembles the driver configuration. + * + * @param jobFolder The folder in which the local runtime will execute this job. + * @param clientRemoteId the remote identifier of the client. It is used by the Driver to establish a + * connection back to the client. + * @param jobId The identifier of the job. + * @param applicationConfiguration The configuration of the application, e.g. a filled out DriverConfiguration + * @return The Driver configuration to be used to instantiate the Driver. + */ + public Configuration getDriverConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId, + final Configuration applicationConfiguration) { + return Configurations.merge(getDriverConfiguration(jobFolder, clientRemoteId, jobId), applicationConfiguration); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneRuntimeConfiguration.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneRuntimeConfiguration.java index 5c64539..018e6ef 100644 --- a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneRuntimeConfiguration.java +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/StandaloneRuntimeConfiguration.java @@ -18,47 +18,71 @@ */ package org.apache.reef.runtime.standalone.client; -import org.apache.reef.annotations.Unstable; +import org.apache.reef.runtime.local.LocalClasspathProvider; +import org.apache.reef.runtime.local.client.LocalJobSubmissionHandler; +import org.apache.reef.runtime.local.client.ExecutorServiceConstructor; import org.apache.reef.client.parameters.DriverConfigurationProviders; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; +import org.apache.reef.runtime.common.client.DriverConfigurationProvider; +import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.standalone.client.parameters.NodeFolder; import org.apache.reef.runtime.standalone.client.parameters.NodeListFilePath; import org.apache.reef.runtime.standalone.client.parameters.RootFolder; +import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.formats.*; +import java.util.concurrent.ExecutorService; + /** * A ConfigurationModule to configure the standalone resourcemanager. */ -@Unstable public final class StandaloneRuntimeConfiguration extends ConfigurationModuleBuilder { /** - * The folder in which the sub-folders, one per Node, will be created. Those will contain one folder per - * Evaluator instantiated on the virtual node. Those inner folders will be named by the time when the Evaluator was - * launched. - * <p> - * If none is given, a folder "REEF_STANDALONE_RUNTIME" will be created in the local directory. + * The file containing a list of remote ssh nodes (i.e. `[email protected]`), separated by newlines. + * The standalone runtime assumes the Driver to run on the same node as Client, thus a local file path is expected. + * Currently, we expect that $JAVA_HOME is specified to the same directory in each of the nodes. + */ + public static final RequiredParameter<String> NODE_LIST_FILE_PATH = new RequiredParameter<>(); + /** + * Folder to save the shaded jar for the remote nodes. + */ + public static final OptionalParameter<String> NODE_FOLDER = new OptionalParameter<>(); + + /** + * Folder to save the files generated by REEF. The corresponding sub-folders will be created for each node. */ public static final OptionalParameter<String> RUNTIME_ROOT_FOLDER = new OptionalParameter<>(); /** - * Configuration provides whose Configuration will be merged into all Driver Configuration. + * The port number to attempt the ssh connection. */ - public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); + public static final OptionalParameter<Integer> SSH_PORT_NUM = new OptionalParameter<>(); /** - * The file which will contain information of remote nodes. + * Configuration provides whose Configuration will be merged into all Driver Configuration. */ - public static final RequiredParameter<String> NODE_LIST_FILE_PATH = new RequiredParameter<>(); + public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); /** * The ConfigurationModule for the standalone resourcemanager. */ public static final ConfigurationModule CONF = new StandaloneRuntimeConfiguration() .merge(CommonRuntimeConfiguration.CONF) + // Bind the standalone runtime + .bindImplementation(JobSubmissionHandler.class, LocalJobSubmissionHandler.class) + .bindImplementation(DriverConfigurationProvider.class, StandaloneDriverConfigurationProviderImpl.class) + .bindConstructor(ExecutorService.class, ExecutorServiceConstructor.class) + .bindImplementation(RuntimeClasspathProvider.class, LocalClasspathProvider.class) // Bind parameters of the standalone runtime - .bindNamedParameter(RootFolder.class, RUNTIME_ROOT_FOLDER) .bindNamedParameter(NodeListFilePath.class, NODE_LIST_FILE_PATH) + .bindNamedParameter(NodeFolder.class, NODE_FOLDER) + .bindNamedParameter(RootFolder.class, RUNTIME_ROOT_FOLDER) + .bindNamedParameter(SshPortNum.class, SSH_PORT_NUM) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) .build(); + + } http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeFolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeFolder.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeFolder.java new file mode 100644 index 0000000..65ace5a --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeFolder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The folder where required jar files shall be stored. + */ +@NamedParameter(default_value = NodeFolder.DEFAULT_VALUE, + doc = "The folder where the shaded JAR file is stored in remote nodes.") +public final class NodeFolder implements Name<String> { + public static final String DEFAULT_VALUE = "REEF_STANDALONE_RUNTIME"; + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private NodeFolder() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeInfoSet.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeInfoSet.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeInfoSet.java new file mode 100644 index 0000000..8da523b --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeInfoSet.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +import java.util.Set; + +/** + * Information of remote nodes available in the standalone runtime. + */ +@NamedParameter(short_name = "nodes", doc = "Information of remote nodes available in the standalone runtime.") +public final class NodeInfoSet implements Name<Set<String>> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeListFilePath.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeListFilePath.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeListFilePath.java index b3809f0..f21c2b0 100644 --- a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeListFilePath.java +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/NodeListFilePath.java @@ -18,15 +18,13 @@ */ package org.apache.reef.runtime.standalone.client.parameters; -import org.apache.reef.annotations.Unstable; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; /** * The file which will contain information of remote nodes. */ -@Unstable -@NamedParameter(doc = "The file contains lines of ssh info of remote nodes", short_name = "node_list_file_path") +@NamedParameter(doc = "The file contains lines of ssh info of remote nodes", short_name = "nodelist") public final class NodeListFilePath implements Name<String> { /** * Empty private constructor to prohibit instantiation of utility class. http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/RootFolder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/RootFolder.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/RootFolder.java index 85a349a..15077be 100644 --- a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/RootFolder.java +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/RootFolder.java @@ -18,15 +18,13 @@ */ package org.apache.reef.runtime.standalone.client.parameters; -import org.apache.reef.annotations.Unstable; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; /** * The folder where logs etc. shall be stored. */ -@Unstable -@NamedParameter(default_value = RootFolder.DEFAULT_VALUE, doc = "The folder where logs etc. shall be stored.") +@NamedParameter(default_value = RootFolder.DEFAULT_VALUE, doc = "The folder where the logs and results are stored.") public final class RootFolder implements Name<String> { public static final String DEFAULT_VALUE = "REEF_STANDALONE_RUNTIME"; http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/SshPortNum.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/SshPortNum.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/SshPortNum.java new file mode 100644 index 0000000..0439428 --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/client/parameters/SshPortNum.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The port number to access the remote nodes. + */ +@NamedParameter(doc = "The port number to access remote nodes with ssh.", short_name = "port", default_value = "22") +public final class SshPortNum implements Name<Integer> { + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private SshPortNum() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java new file mode 100644 index 0000000..a66353c --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/RemoteNodeManager.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import com.jcraft.jsch.*; +import org.apache.reef.driver.evaluator.EvaluatorProcess; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.common.utils.RemoteManager; +import org.apache.reef.runtime.standalone.client.parameters.RootFolder; +import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver; +import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; +import org.apache.reef.runtime.yarn.driver.REEFEventHandlers; +import org.apache.reef.runtime.standalone.client.parameters.NodeFolder; +import org.apache.reef.runtime.standalone.client.parameters.NodeInfoSet; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.CollectionUtils; +import org.apache.reef.util.Optional; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Management module for remote nodes in standalone runtime. + */ +public final class RemoteNodeManager { + private static final Logger LOG = Logger.getLogger(RemoteNodeManager.class.getName()); + + /** + * Map from containerID -> SshProcessContainer. + */ + private final Map<String, SshProcessContainer> containers = new HashMap<>(); + + private final ConfigurationSerializer configurationSerializer; + private final REEFFileNames fileNames; + private final double jvmHeapFactor; + private final REEFEventHandlers reefEventHandlers; + private final String errorHandlerRID; + private final Set<String> nodeInfoSet; + private Iterator<String> nodeSetIterator; + private final ReefRunnableProcessObserver processObserver; + private final String rootFolder; + private final String nodeFolder; + private final int sshPortNum; + + @Inject + RemoteNodeManager(final ConfigurationSerializer configurationSerializer, + final REEFFileNames fileNames, + final RemoteManager remoteManager, + final REEFEventHandlers reefEventHandlers, + final ReefRunnableProcessObserver processObserver, + @Parameter(JVMHeapSlack.class) final double jvmHeapSlack, + @Parameter(NodeInfoSet.class) final Set<String> nodeInfoSet, + @Parameter(RootFolder.class) final String rootFolder, + @Parameter(NodeFolder.class) final String nodeFolder, + @Parameter(SshPortNum.class) final int sshPortNum) { + this.configurationSerializer = configurationSerializer; + this.fileNames = fileNames; + this.processObserver = processObserver; + this.errorHandlerRID = remoteManager.getMyIdentifier(); + this.reefEventHandlers = reefEventHandlers; + this.jvmHeapFactor = 1.0 - jvmHeapSlack; + this.nodeInfoSet = nodeInfoSet; + this.rootFolder = rootFolder; + this.nodeFolder = nodeFolder; + this.sshPortNum = sshPortNum; + + this.nodeSetIterator = this.nodeInfoSet.iterator(); + + LOG.log(Level.FINEST, "Initialized RemoteNodeManager."); + } + + private void release(final String containerID) { + synchronized (this.containers) { + final SshProcessContainer sshProcessContainer = this.containers.get(containerID); + if (null != sshProcessContainer) { + LOG.log(Level.INFO, "Releasing Container with containerId [{0}]", sshProcessContainer); + if (sshProcessContainer.isRunning()) { + sshProcessContainer.close(); + } + this.containers.remove(containerID); + } else { + LOG.log(Level.INFO, "Ignoring release request for unknown containerID [{0}]", containerID); + } + } + } + + void onResourceLaunchRequest(final ResourceLaunchEvent resourceLaunchEvent) { + LOG.log(Level.INFO, "RemoteNodeManager:onResourceLaunchRequest"); + + // connect to the remote node. + final String remoteNode; + try { + synchronized (this.nodeSetIterator) { + remoteNode = this.getNode(); + } + } catch (Exception e) { + throw new RuntimeException("Unable to get remote node", e); + } + final String username; + final String hostname; + if (remoteNode.indexOf('@') < 0) { + username = System.getProperty("user.name"); + hostname = remoteNode; + } else { + username = remoteNode.substring(0, remoteNode.indexOf('@')); + hostname = remoteNode.substring(remoteNode.indexOf('@') + 1, remoteNode.length()); + } + final String userHomeDir = System.getProperty("user.home"); + final String privatekey = userHomeDir + "/.ssh/id_dsa"; + + synchronized (this.containers) { + try { + final JSch remoteConnection = new JSch(); + remoteConnection.addIdentity(privatekey); + final Session sshSession = remoteConnection.getSession(username, hostname, sshPortNum); + + final Properties jschConfig = new Properties(); + jschConfig.put("StrictHostKeyChecking", "no"); + sshSession.setConfig(jschConfig); + + try { + sshSession.connect(); + } catch (JSchException ex) { + throw new RuntimeException("Unable to connect to " + remoteNode + ". " + + "Check your authorized_keys settings. It should contain the public key of " + privatekey, ex); + } + + LOG.log(Level.FINEST, "Established connection with {0}", hostname); + + final SshProcessContainer sshProcessContainer = this.containers.get(resourceLaunchEvent.getIdentifier()) + .withRemoteConnection(sshSession, remoteNode); + + // Add the global files and libraries. + sshProcessContainer.addGlobalFiles(this.fileNames.getGlobalFolder()); + sshProcessContainer.addLocalFiles(getLocalFiles(resourceLaunchEvent)); + + // Make the configuration file of the evaluator. + final File evaluatorConfigurationFile = + new File(sshProcessContainer.getFolder(), fileNames.getEvaluatorConfigurationPath()); + + try { + this.configurationSerializer.toFile(resourceLaunchEvent.getEvaluatorConf(), evaluatorConfigurationFile); + } catch (final IOException | BindException e) { + throw new RuntimeException("Unable to write configuration.", e); + } + + // Copy files to remote node + final Channel channel = sshSession.openChannel("exec"); + final String mkdirCommand = "mkdir " + nodeFolder; + ((ChannelExec) channel).setCommand(mkdirCommand); + channel.connect(); + + final List<String> copyCommand = + new ArrayList<>(Arrays.asList("scp", "-r", + sshProcessContainer.getFolder().toString(), + remoteNode + ":~/" + nodeFolder + "/" + sshProcessContainer.getContainerID())); + LOG.log(Level.INFO, "Copying files: {0}", copyCommand); + final Process copyProcess = new ProcessBuilder(copyCommand).start(); + try { + copyProcess.waitFor(); + } catch (final InterruptedException ex) { + throw new RuntimeException("Copying Interrupted: ", ex); + } + + final List<String> command = getLaunchCommand(resourceLaunchEvent, sshProcessContainer.getMemory()); + LOG.log(Level.FINEST, "Launching container: {0}", sshProcessContainer); + sshProcessContainer.run(command); + } catch (final JSchException | IOException ex) { + LOG.log(Level.WARNING, "Failed to establish connection with {0}@{1}:\n Exception:{2}", + new Object[]{username, hostname, ex}); + } + } + } + + private String getNode() { + if (!nodeSetIterator.hasNext()) { + nodeSetIterator = this.nodeInfoSet.iterator(); + } + return nodeSetIterator.next().trim(); + } + + private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) { + final List<File> files = new ArrayList<>(); // Libraries local to this evaluator + for (final FileResource frp : launchRequest.getFileSet()) { + files.add(new File(frp.getPath()).getAbsoluteFile()); + } + return files; + } + + private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest, + final int containerMemory) { + final EvaluatorProcess process = launchRequest.getProcess() + .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath()); + + if (process.isOptionSet()) { + return process.getCommandLine(); + } else { + return process + .setMemory((int) (this.jvmHeapFactor * containerMemory)) + .getCommandLine(); + } + } + + void onResourceRequest(final ResourceRequestEvent resourceRequestEvent) { + final Optional<String> node = selectNode(resourceRequestEvent); + final String nodeId; + + if (node.isPresent()) { + nodeId = node.get(); + } else { + // Allocate new container + nodeId = this.getNode() + ":" + String.valueOf(sshPortNum); + } + + final String processID = nodeId + "-" + String.valueOf(System.currentTimeMillis()); + final File processFolder = new File(this.rootFolder, processID); + + final SshProcessContainer sshProcessContainer = new SshProcessContainer(errorHandlerRID, nodeId, processID, + processFolder, resourceRequestEvent.getMemorySize().get(), resourceRequestEvent.getVirtualCores().get(), + null, this.fileNames, nodeFolder, processObserver); + this.containers.put(processID, sshProcessContainer); + + final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder() + .setIdentifier(processID) + .setNodeId(nodeId) + .setResourceMemory(resourceRequestEvent.getMemorySize().get()) + .setVirtualCores(resourceRequestEvent.getVirtualCores().get()) + .setRuntimeName("STANDALONE") + .build(); + reefEventHandlers.onResourceAllocation(alloc); + + // set the status as RUNNING. + updateRuntimeStatus(); + } + + void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) { + synchronized (this.containers) { + LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier()); + this.release(releaseRequest.getIdentifier()); + } + } + + public synchronized void close() { + synchronized (this.containers) { + if (this.containers.isEmpty()) { + LOG.log(Level.FINEST, "Clean shutdown with no outstanding containers."); + } else { + LOG.log(Level.WARNING, "Dirty shutdown with outstanding containers."); + for (final SshProcessContainer c : this.containers.values()) { + LOG.log(Level.WARNING, "Force shutdown of: {0}", c); + c.close(); + } + } + } + } + + private Optional<String> selectNode(final ResourceRequestEvent resourceRequestEvent) { + if (CollectionUtils.isNotEmpty(resourceRequestEvent.getNodeNameList())) { + for (final String nodeName : resourceRequestEvent.getNodeNameList()) { + return Optional.of(nodeName); + } + } + if (CollectionUtils.isNotEmpty(resourceRequestEvent.getRackNameList())) { + for (final String nodeName : resourceRequestEvent.getRackNameList()) { + return Optional.of(nodeName); + } + } + return Optional.empty(); + } + + private synchronized void updateRuntimeStatus() { + final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder() + .setName("STANDALONE") + .setState(State.RUNNING); + + this.reefEventHandlers.onRuntimeStatus(builder.build()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java new file mode 100644 index 0000000..c2c5501 --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/SshProcessContainer.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.annotations.audience.TaskSide; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.local.driver.*; +import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver; +import org.apache.reef.runtime.local.process.RunnableProcess; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A Container that runs an Evaluator in a SshProcess. + */ +@Private +@TaskSide +final class SshProcessContainer implements Container { + private static final Logger LOG = Logger.getLogger(SshProcessContainer.class.getName()); + + private final String errorHandlerRID; + private final String nodeID; + private final File folder; + private final String containedID; + private final int megaBytes; + private final int numberOfCores; + private final String rackName; + private final REEFFileNames fileNames; + private final File reefFolder; + private final File localFolder; + private final File globalFolder; + private Thread theThread; + private final ReefRunnableProcessObserver processObserver; + private RunnableProcess process; + private Session remoteSession; + private String remoteHostName; + private final String nodeFolder; + + /** + * @param errorHandlerRID the remoteID of the error handler. + * @param nodeID the ID of the (fake) node this Container is instantiated on + * @param containedID the ID used to identify this container uniquely + * @param folder the folder in which logs etc. will be deposited. + * @param nodeFolder the folder in which the shaded jar file should be stored. + */ + SshProcessContainer(final String errorHandlerRID, + final String nodeID, + final String containedID, + final File folder, + final int megaBytes, + final int numberOfCores, + final String rackName, + final REEFFileNames fileNames, + final String nodeFolder, + final ReefRunnableProcessObserver processObserver) { + this.errorHandlerRID = errorHandlerRID; + this.processObserver = processObserver; + this.nodeID = nodeID; + this.containedID = containedID; + this.folder = folder; + this.megaBytes = megaBytes; + this.numberOfCores = numberOfCores; + this.rackName = rackName; + this.fileNames = fileNames; + this.nodeFolder = nodeFolder; + this.reefFolder = new File(folder, fileNames.getREEFFolderName()); + this.localFolder = new File(reefFolder, fileNames.getLocalFolderName()); + if (!this.localFolder.exists() && !this.localFolder.mkdirs()) { + LOG.log(Level.WARNING, "Failed to create [{0}]", this.localFolder.getAbsolutePath()); + } + this.globalFolder = new File(reefFolder, fileNames.getGlobalFolderName()); + if (!this.globalFolder.exists() && !this.globalFolder.mkdirs()) { + LOG.log(Level.WARNING, "Failed to create [{0}]", this.globalFolder.getAbsolutePath()); + } + } + + @Override + public void run(final List<String> commandLine) { + this.process = new RunnableProcess(getRemoteCommand(commandLine), + this.containedID, + this.folder, + this.processObserver, + this.fileNames.getEvaluatorStdoutFileName(), + this.fileNames.getEvaluatorStderrFileName()); + this.theThread = new Thread(this.process); + this.theThread.start(); + } + + @Override + public void addLocalFiles(final Iterable<File> files) { + try { + copy(files, this.localFolder); + } catch (final IOException e) { + throw new RuntimeException("Unable to copy files to the evaluator folder.", e); + } + } + + @Override + @SuppressWarnings("checkstyle:hiddenfield") + public void addGlobalFiles(final File globalFolder) { + try { + final File[] files = globalFolder.listFiles(); + if (files != null) { + copy(Arrays.asList(files), this.globalFolder); + } + } catch (final IOException e) { + throw new RuntimeException("Unable to copy files to the evaluator folder.", e); + } + } + + private static void copy(final Iterable<File> files, final File folder) throws IOException { + for (final File sourceFile : files) { + final File destinationFile = new File(folder, sourceFile.getName()); + if (Files.isSymbolicLink(sourceFile.toPath())) { + final Path linkTargetPath = Files.readSymbolicLink(sourceFile.toPath()); + Files.createSymbolicLink(destinationFile.toPath(), linkTargetPath); + } else { + Files.copy(sourceFile.toPath(), destinationFile.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } + } + } + + @Override + public boolean isRunning() { + return null != this.theThread && this.theThread.isAlive(); + } + + @Override + public String getNodeID() { + return this.nodeID; + } + + @Override + public String getContainerID() { + return this.containedID; + } + + @Override + public int getMemory() { + return this.megaBytes; + } + + @Override + public int getNumberOfCores() { + return this.numberOfCores; + } + + @Override + public File getFolder() { + return this.folder; + } + + @Override + public String getRackName() { + return this.rackName; + } + + @Override + public void close() { + if (isRunning()) { + LOG.log(Level.WARNING, "Force-closing a container that is still running: {0}", this); + this.process.cancel(); + } + } + + @Override + public String toString() { + return "SshProcessContainer{" + + "containedID='" + containedID + "'" + + ", nodeID='" + nodeID + '\'' + + ", errorHandlerRID='" + errorHandlerRID + '\'' + + ", folder=" + folder + '\'' + + ", rack=" + rackName + + "}"; + + } + + SshProcessContainer withRemoteConnection(final Session newRemoteSession, final String newRemoteHostName) { + this.remoteSession = newRemoteSession; + this.remoteHostName = newRemoteHostName; + return this; + } + + private List<String> getRemoteCommand(final List<String> commandLine) { + final List<String> commandPrefix = new ArrayList<>(Arrays.asList("ssh", this.remoteHostName, + "cd", this.getRemoteAbsolutePath(), "&&")); + commandPrefix.addAll(commandLine); + return commandPrefix; + } + + private String getRemoteAbsolutePath() { + return getRemoteHomePath() + "/" + this.nodeFolder + "/" + this.containedID; + } + + private String getRemoteHomePath() { + final String getHomeCommand = "pwd"; + try { + final Channel channel = this.remoteSession.openChannel("exec"); + ((ChannelExec) channel).setCommand(getHomeCommand); + channel.setInputStream(null); + final InputStream stdout = channel.getInputStream(); + channel.connect(); + + byte[] tmp = new byte[1024]; + StringBuilder homePath = new StringBuilder(); + while (true) { + while (stdout.available() > 0) { + final int len = stdout.read(tmp, 0, 1024); + if (len < 0) { + break; + } + homePath = homePath.append(new String(tmp, 0, len, StandardCharsets.UTF_8)); + } + if (channel.isClosed()) { + if (stdout.available() > 0) { + continue; + } + break; + } + } + return homePath.toString().trim(); + } catch (final JSchException | IOException ex) { + throw new RuntimeException("Unable to retrieve home directory from " + + this.remoteHostName + " with the pwd command", ex); + } + } + + + +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneDriverConfiguration.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneDriverConfiguration.java new file mode 100644 index 0000000..856c93e --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneDriverConfiguration.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; +import org.apache.reef.runtime.common.launch.parameters.LaunchID; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.local.LocalClasspathProvider; +import org.apache.reef.runtime.standalone.client.parameters.RootFolder; +import org.apache.reef.runtime.standalone.client.parameters.NodeFolder; +import org.apache.reef.runtime.standalone.client.parameters.NodeListFilePath; +import org.apache.reef.runtime.standalone.client.parameters.NodeInfoSet; +import org.apache.reef.runtime.standalone.client.parameters.SshPortNum; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalParameter; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * ConfigurationModule for the Driver executed in the standalone resourcemanager. This is meant to eventually replace + * StandaloneDriverRuntimeConfiguration. + */ +public class StandaloneDriverConfiguration extends ConfigurationModuleBuilder { + /** + * The root folder of the job. Assumed to be an absolute path. + */ + public static final RequiredParameter<String> ROOT_FOLDER = new RequiredParameter<>(); + /** + * The fraction of the container memory NOT to use for the Java Heap. + */ + public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>(); + + /** + * The remote identifier to use for communications back to the client. + */ + public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>(); + + /** + * The identifier of the Job submitted. + */ + public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>(); + + /** + * The file containing a list of remote ssh nodes (i.e. `[email protected]`), separated by newlines. + */ + public static final RequiredParameter<String> NODE_LIST_FILE_PATH = new RequiredParameter<>(); + /** + * Set of objects containing information about the nodes. + */ + public static final RequiredParameter<String> NODE_INFO_SET = new RequiredParameter<>(); + + /** + * Folder to save the shaded jar for the remote nodes. + */ + public static final OptionalParameter<String> NODE_FOLDER = new OptionalParameter<>(); + + /** + * SSh port number. + */ + public static final OptionalParameter<Integer> SSH_PORT_NUM = new OptionalParameter<>(); + + public static final ConfigurationModule CONF = new StandaloneDriverConfiguration() + .bindImplementation(ResourceLaunchHandler.class, StandaloneResourceLaunchHandler.class) + .bindImplementation(ResourceRequestHandler.class, StandaloneResourceRequestHandler.class) + .bindImplementation(ResourceReleaseHandler.class, StandaloneResourceReleaseHandler.class) + .bindImplementation(ResourceManagerStartHandler.class, StandaloneResourceManagerStartHandler.class) + .bindImplementation(ResourceManagerStopHandler.class, StandaloneResourceManagerStopHandler.class) + .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER) + .bindNamedParameter(ErrorHandlerRID.class, CLIENT_REMOTE_IDENTIFIER) + .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER) + .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER) + .bindNamedParameter(RootFolder.class, ROOT_FOLDER) + .bindNamedParameter(NodeListFilePath.class, NODE_LIST_FILE_PATH) + .bindNamedParameter(NodeFolder.class, NODE_FOLDER) + .bindNamedParameter(SshPortNum.class, SSH_PORT_NUM) + .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) + .bindSetEntry(NodeInfoSet.class, NODE_INFO_SET) + .bindImplementation(RuntimeClasspathProvider.class, LocalClasspathProvider.class) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceLaunchHandler.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceLaunchHandler.java new file mode 100644 index 0000000..f82a94a --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceLaunchHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; + +import javax.inject.Inject; + +@Private +@DriverSide +final class StandaloneResourceLaunchHandler implements ResourceLaunchHandler { + private final RemoteNodeManager nodeListManager; + + @Inject + StandaloneResourceLaunchHandler(final RemoteNodeManager nodeListManager) { + this.nodeListManager = nodeListManager; + } + + @Override + public void onNext(final ResourceLaunchEvent t) { + this.nodeListManager.onResourceLaunchRequest(t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStartHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStartHandler.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStartHandler.java new file mode 100644 index 0000000..433ae72 --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStartHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Dummy class for the common REEF driver configuration. + */ +final class StandaloneResourceManagerStartHandler implements ResourceManagerStartHandler { + private static final Logger LOG = Logger.getLogger(StandaloneResourceManagerStartHandler.class.getName()); + private final RemoteNodeManager nodeListManager; + + @Inject + private StandaloneResourceManagerStartHandler(final RemoteNodeManager nodeListManager) { + this.nodeListManager = nodeListManager; + } + + @Override + public void onNext(final RuntimeStart runtimeStart) { + LOG.log(Level.FINEST, "Standalone ResourceManager Start"); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStopHandler.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStopHandler.java new file mode 100644 index 0000000..7a617bb --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceManagerStopHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +import javax.inject.Inject; + +final class StandaloneResourceManagerStopHandler implements ResourceManagerStopHandler { + + private final RemoteNodeManager nodeListManager; + + @Inject + StandaloneResourceManagerStopHandler(final RemoteNodeManager nodeListManager) { + this.nodeListManager = nodeListManager; + } + + @Override + public void onNext(final RuntimeStop value) { + this.nodeListManager.close(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceReleaseHandler.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceReleaseHandler.java new file mode 100644 index 0000000..e9aba2e --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceReleaseHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; + +import javax.inject.Inject; + +/** + * Takes Resource Release requests and closes process managers. + */ +@Private +@DriverSide +public final class StandaloneResourceReleaseHandler implements ResourceReleaseHandler { + + private final RemoteNodeManager nodeListManager; + + @Inject + StandaloneResourceReleaseHandler(final RemoteNodeManager nodeListManager) { + this.nodeListManager = nodeListManager; + } + + @Override + public void onNext(final ResourceReleaseEvent t) { + this.nodeListManager.onResourceReleaseRequest(t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceRequestHandler.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceRequestHandler.java new file mode 100644 index 0000000..c0d9dea --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/StandaloneResourceRequestHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.standalone.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; + +import javax.inject.Inject; + +@Private +@DriverSide +final class StandaloneResourceRequestHandler implements ResourceRequestHandler { + private final RemoteNodeManager nodeListManager; + + @Inject + StandaloneResourceRequestHandler(final RemoteNodeManager nodeListManager) { + this.nodeListManager = nodeListManager; + } + + @Override + public void onNext(final ResourceRequestEvent resourceRequestEvent) { + this.nodeListManager.onResourceRequest(resourceRequestEvent); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/package-info.java b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/package-info.java new file mode 100644 index 0000000..f8ab95b --- /dev/null +++ b/lang/java/reef-runtime-standalone/src/main/java/org/apache/reef/runtime/standalone/driver/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * The resource manager for the standalone resourcemanager. + */ +package org.apache.reef.runtime.standalone.driver; http://git-wip-us.apache.org/repos/asf/reef/blob/30b57ae4/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java index 35bb826..be19227 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java @@ -34,7 +34,7 @@ import javax.inject.Inject; */ // This is a great place to add a thread boundary, should that need arise. @Private -final class REEFEventHandlers implements AutoCloseable { +public final class REEFEventHandlers implements AutoCloseable { private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler; private final EventHandler<ResourceStatusEvent> resourceStatusHandler; private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler; @@ -69,7 +69,8 @@ final class REEFEventHandlers implements AutoCloseable { * * @param runtimeStatusEvent */ - void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) { + @Private + public void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) { this.runtimeStatusHandler.onNext(runtimeStatusEvent); } @@ -78,7 +79,8 @@ final class REEFEventHandlers implements AutoCloseable { * * @param resourceAllocationEvent */ - void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) { + @Private + public void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) { this.resourceAllocationHandler.onNext(resourceAllocationEvent); }
