[REEF-1211] multi-runtime implementation This changeset addresses the issue by:
1. Providing multi-runtime implementation 2. Unit test for multi runtime implementation 3. Yarn test for Yarn/Multi runtime 4. HelloREEF example for Yarn/Local multi runtime JIRA: [REEF-1211](https://issues.apache.org/jira/browse/REEF-1211) Pull Request: This closes #851 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/07874cbb Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/07874cbb Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/07874cbb Branch: refs/heads/master Commit: 07874cbb604efde4c05bbb44a8e5bb48d081ee49 Parents: 6f02cb7 Author: Boris Shulman <[email protected]> Authored: Mon Feb 22 21:45:11 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Thu Mar 3 10:12:54 2016 -0800 ---------------------------------------------------------------------- lang/java/reef-examples/pom.xml | 5 + .../HelloMultiRuntimeDriver.java | 96 +++++ .../hellomultiruntime/HelloREEFMultiYarn.java | 91 +++++ .../hellomultiruntime/package-info.java | 22 ++ lang/java/reef-runtime-multi/pom.xml | 34 +- .../src/main/avro/RuntimeDefinition.avsc | 65 ++++ .../MultiRuntimeConfigurationBuilder.java | 151 ++++++++ .../client/MultiRuntimeDefinitionBuilder.java | 90 +++++ .../client/MultiRuntimeDefinitionGenerator.java | 42 +++ .../MultiRuntimeDefinitionGeneratorImpl.java | 153 ++++++++ ...MultiRuntimeDriverConfigurationProvider.java | 70 ++++ .../client/MultiRuntimeHelperConfiguration.java | 28 ++ .../reef/runtime/multi/client/package-info.java | 22 ++ .../client/parameters/DefaultRuntimeName.java | 29 ++ .../multi/client/parameters/RuntimeNames.java | 31 ++ .../parameters/SerializedRuntimeDefinition.java | 29 ++ .../multi/client/parameters/package-info.java | 22 ++ .../driver/MultiRuntimeDriverConfiguration.java | 74 ++++ .../MultiRuntimeResourceLaunchHandler.java | 46 +++ ...MultiRuntimeResourceManagerStartHandler.java | 42 +++ .../MultiRuntimeResourceManagerStopHandler.java | 42 +++ .../MultiRuntimeResourceReleaseHandler.java | 45 +++ .../MultiRuntimeResourceRequestHandler.java | 46 +++ .../reef/runtime/multi/driver/Runtime.java | 37 ++ .../reef/runtime/multi/driver/RuntimeImpl.java | 85 +++++ .../reef/runtime/multi/driver/RuntimesHost.java | 173 +++++++++ .../reef/runtime/multi/driver/package-info.java | 22 ++ .../multi/driver/parameters/RuntimeName.java | 29 ++ .../multi/driver/parameters/package-info.java | 22 ++ .../utils/MultiRuntimeDefinitionSerializer.java | 73 ++++ .../reef/runtime/multi/utils/package-info.java | 22 ++ .../runtime/multi/driver/RuntimesHostTest.java | 359 +++++++++++++++++++ .../reef/runtime/multi/driver/package-info.java | 22 ++ .../MultiRuntimeDefinitionSerializerTests.java | 52 +++ .../reef/runtime/multi/utils/package-info.java | 22 ++ .../ExtensibleYarnClientConfiguration.java | 26 +- .../examples/TestHelloREEFMultiRuntime.java | 65 ++++ 37 files changed, 2268 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-examples/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/pom.xml b/lang/java/reef-examples/pom.xml index ee8c5fd..a596976 100644 --- a/lang/java/reef-examples/pom.xml +++ b/lang/java/reef-examples/pom.xml @@ -102,6 +102,11 @@ under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-runtime-multi</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloMultiRuntimeDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloMultiRuntimeDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloMultiRuntimeDriver.java new file mode 100644 index 0000000..55313cd --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloMultiRuntimeDriver.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hellomultiruntime; + +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorRequest; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.driver.task.TaskConfiguration; +import org.apache.reef.examples.hello.HelloTask; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.StartTime; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Driver code for the Hello REEF Application with Multi Runtime. + */ +@Unit +public final class HelloMultiRuntimeDriver { + + private static final Logger LOG = Logger.getLogger(HelloMultiRuntimeDriver.class.getName()); + + private final EvaluatorRequestor requestor; + + /** + * Job driver constructor - instantiated via TANG. + * + * @param requestor evaluator requestor object used to create new evaluator containers. + */ + @Inject + private HelloMultiRuntimeDriver(final EvaluatorRequestor requestor) { + this.requestor = requestor; + LOG.log(Level.FINE, "Instantiated 'HelloDriver'"); + } + + /** + * Handles the StartTime event: Request as single Evaluator. + */ + public final class StartHandler implements EventHandler<StartTime> { + @Override + public void onNext(final StartTime startTime) { + HelloMultiRuntimeDriver.this.requestor.submit(EvaluatorRequest.newBuilder() + .setNumber(1) + .setMemory(64) + .setNumberOfCores(1) + .setRuntimeName(org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME) + .build()); + + LOG.log(Level.INFO, "Requested Local Evaluator ."); + + HelloMultiRuntimeDriver.this.requestor.submit(EvaluatorRequest.newBuilder() + .setNumber(1) + .setMemory(64) + .setNumberOfCores(1) + .setRuntimeName(org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME) + .build()); + + LOG.log(Level.INFO, "Requested Yarn Evaluator."); + } + } + + /** + * Handles AllocatedEvaluator: Submit the HelloTask. + */ + public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> { + @Override + public void onNext(final AllocatedEvaluator allocatedEvaluator) { + LOG.log(Level.INFO, "Submitting HelloREEF task to AllocatedEvaluator: {0}", allocatedEvaluator); + final Configuration taskConfiguration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, "HelloREEFTask") + .set(TaskConfiguration.TASK, HelloTask.class) + .build(); + allocatedEvaluator.submitTask(taskConfiguration); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloREEFMultiYarn.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloREEFMultiYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloREEFMultiYarn.java new file mode 100644 index 0000000..5df7b25 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/HelloREEFMultiYarn.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hellomultiruntime; + +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.multi.client.MultiRuntimeConfigurationBuilder; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.exceptions.InjectionException; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Client for running HelloREEFMulti on YARN. + */ +public final class HelloREEFMultiYarn { + + private static final Logger LOG = Logger.getLogger(HelloREEFMultiYarn.class.getName()); + + /** + * Number of milliseconds to wait for the job to complete. + * Setting to 100 sec because running on RM HA clusters take around + * 50 seconds to set the job to running. + */ + private static final int JOB_TIMEOUT = 100000; // 100 sec. + + /** + * @return the configuration of the HelloREEF driver. + */ + private static Configuration getDriverConfiguration() { + return DriverConfiguration.CONF + .set(DriverConfiguration.GLOBAL_LIBRARIES, + HelloREEFMultiYarn.class.getProtectionDomain().getCodeSource().getLocation().getFile()) + .set(DriverConfiguration.DRIVER_IDENTIFIER, "HelloREEF") + .set(DriverConfiguration.ON_DRIVER_STARTED, HelloMultiRuntimeDriver.StartHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, HelloMultiRuntimeDriver.EvaluatorAllocatedHandler.class) + .build(); + } + + private static Configuration getHybridYarnSubmissionRuntimeConfiguration() { + return new MultiRuntimeConfigurationBuilder() + .setDefaultRuntime(org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME) + .setSubmissionRuntime(org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME) + .addRuntime(org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME) + .addRuntime(org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME) + .setMaxEvaluatorsNumberForLocalRuntime(1) + .build(); + } + + /** + * Start Hello REEF job. + * + * @param args command line parameters. + * @throws BindException configuration error. + * @throws InjectionException configuration error. + */ + public static void main(final String[] args) throws BindException, InjectionException { + final Configuration runtimeConf = getHybridYarnSubmissionRuntimeConfiguration(); + final Configuration driverConf = getDriverConfiguration(); + + final LauncherStatus status = DriverLauncher + .getLauncher(runtimeConf) + .run(driverConf, JOB_TIMEOUT); + LOG.log(Level.INFO, "REEF job completed: {0}", status); + } + + /** + * Empty private constructor to prohibit instantiation of utility class. + */ + private HelloREEFMultiYarn() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/package-info.java new file mode 100644 index 0000000..1c5d4d1 --- /dev/null +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/hellomultiruntime/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * The Hello REEF Multi Runtime example. + */ +package org.apache.reef.examples.hellomultiruntime; http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/pom.xml b/lang/java/reef-runtime-multi/pom.xml index 252e00d..95f6785 100644 --- a/lang/java/reef-runtime-multi/pom.xml +++ b/lang/java/reef-runtime-multi/pom.xml @@ -28,7 +28,6 @@ under the License. </parent> <name>REEF Runtime for multiple runtime scenarios</name> <artifactId>reef-runtime-multi</artifactId> - <properties> <rootPath>${basedir}/../../..</rootPath> </properties> @@ -45,6 +44,24 @@ under the License. </plugin> </plugins> </pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> </build> <dependencies> @@ -59,6 +76,11 @@ under the License. <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.microsoft.windowsazure.storage</groupId> <artifactId>microsoft-windowsazure-storage-sdk</artifactId> </dependency> @@ -100,6 +122,14 @@ under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-runtime-hdinsight</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/avro/RuntimeDefinition.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/avro/RuntimeDefinition.avsc b/lang/java/reef-runtime-multi/src/main/avro/RuntimeDefinition.avsc new file mode 100644 index 0000000..2e81ffa --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/avro/RuntimeDefinition.avsc @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + [ +/* + * Defines the schema for runtime definition. This avro object is used to pass runtimes definitions + * to the runtimes host + */ + { + "namespace":"org.apache.reef.runtime.multi.utils.avro", + "type":"record", + "name":"RuntimeDefinition", + "doc":"Defines the schema for runtime definition. This avro object is used to pass runtimes definitions to the runtimes host", + "fields":[ + { + "name":"runtimeName", + "type":"string", + "doc":"The name of the runtime, that will be provided with the resource allocation requests" + }, + { + "name":"serializedConfiguration", + "type":"string", + "doc":"Serialized Tang configuration" + } + ] + }, +/* + * Defines the schema for runtime definition. This avro object is used to pass runtimes definitions + * to the runtimes host + */ + { + "namespace":"org.apache.reef.runtime.multi.utils.avro", + "type":"record", + "name":"MultiRuntimeDefinition", + "doc":"Defines the schema for multi runtime definition. This avro object is used to pass multi runtime definition to the runtimes host", + "fields":[ + { + "name":"defaultRuntimeName", + "type":"string", + "doc":"The name of the default runtime" + }, + { + "name":"runtimes", + "type":{"type":"array", "items":"RuntimeDefinition"}, + "doc":"defined runtimes" + } + ] + } + +] http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeConfigurationBuilder.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeConfigurationBuilder.java new file mode 100644 index 0000000..9851cc9 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeConfigurationBuilder.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client; + +import org.apache.commons.lang.Validate; +import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; +import org.apache.reef.runtime.multi.client.parameters.DefaultRuntimeName; +import org.apache.reef.runtime.multi.client.parameters.RuntimeNames; +import org.apache.reef.runtime.yarn.client.ExtensibleYarnClientConfiguration; +import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.util.Optional; + +import java.util.*; + +/** + * A builder for Multi Runtime Configuration. + */ +public final class MultiRuntimeConfigurationBuilder { + private static final Set<String> SUPPORTED_RUNTIMES = new HashSet<>(Arrays.asList( + org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME, + org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME)); + private static final Set<String> SUPPORTED_SUBMISSION_RUNTIMES = new HashSet<>(Arrays.asList( + org.apache.reef.runtime.yarn.driver.RuntimeIdentifier.RUNTIME_NAME)); + + private final HashMap<Class, Object> namedParameters = new HashMap<>(); + + private Set<String> runtimeNames = new HashSet<>(); + private Optional<String> defaultRuntime = Optional.empty(); + private String submissionRunitme; + + private void addNamedParameter(final Class namedParameter, + final Object namedParameterValue) { + Validate.notNull(namedParameterValue); + + this.namedParameters.put(namedParameter, namedParameterValue); + } + + /** + * Adds runtime name to the builder. + * @param runtimeName The name to add + * @return The builder instance + */ + public MultiRuntimeConfigurationBuilder addRuntime(final String runtimeName) { + Validate.isTrue(SUPPORTED_RUNTIMES.contains(runtimeName), "unsupported runtime " + runtimeName); + + this.runtimeNames.add(runtimeName); + return this; + } + + /** + * Sets default runtime. Default runtime is used when no runtime was specified for evaluator + * @param runtimeName the default runtime name + * @return The builder instance + */ + public MultiRuntimeConfigurationBuilder setDefaultRuntime(final String runtimeName) { + Validate.isTrue(SUPPORTED_RUNTIMES.contains(runtimeName), "Unsupported runtime " + runtimeName); + Validate.isTrue(!this.defaultRuntime.isPresent(), "Default runtime was already added"); + + this.defaultRuntime = Optional.of(runtimeName); + return this; + } + + /** + * Sets the submission runtime. Submission runtime is used for launching the job driver. + * @param runtimeName the submission runtime name + * @return The builder instance + */ + public MultiRuntimeConfigurationBuilder setSubmissionRuntime(final String runtimeName) { + Validate.isTrue(SUPPORTED_SUBMISSION_RUNTIMES.contains(runtimeName), "Unsupported submission runtime " + + runtimeName); + Validate.isTrue(this.submissionRunitme == null, "Submission runtime was already added"); + + this.submissionRunitme = runtimeName; + return this; + } + + /** + * Sets the max number of local evaluators for local runtime. This parameter is ignored when local runtime is not used + * @param maxLocalEvaluators The max evaluators number + * @return The builder instance + */ + public MultiRuntimeConfigurationBuilder setMaxEvaluatorsNumberForLocalRuntime(final int maxLocalEvaluators) { + Validate.isTrue(maxLocalEvaluators > 0, "Max evaluators number should be greater then 0"); + + addNamedParameter(MaxNumberOfEvaluators.class, maxLocalEvaluators); + return this; + } + + /** + * Builds the configuration. + * @return The built configuration + */ + public Configuration build() { + Validate.notNull(this.submissionRunitme, "Default Runtime was not defined"); + + if(!this.defaultRuntime.isPresent() || this.runtimeNames.size() == 1){ + this.defaultRuntime = Optional.of(this.runtimeNames.toArray(new String[0])[0]); + } + + Validate.isTrue(this.defaultRuntime.isPresent(), + "Default runtime was not defined, and multiple runtimes were specified"); + + if(!this.runtimeNames.contains(this.defaultRuntime.get())){ + this.runtimeNames.add(this.defaultRuntime.get()); + } + + ConfigurationModuleBuilder conf = new MultiRuntimeHelperConfiguration(); + + for(Map.Entry<Class, Object> entry: this.namedParameters.entrySet()){ + conf = conf.bindNamedParameter(entry.getKey(), entry.getValue().toString()); + } + + conf = conf.bindNamedParameter(DefaultRuntimeName.class, this.defaultRuntime.get()); + + for(final String runtimeName : this.runtimeNames){ + conf = conf.bindSetEntry(RuntimeNames.class, runtimeName); + } + + conf = conf.bindImplementation( + MultiRuntimeDefinitionGenerator.class, MultiRuntimeDefinitionGeneratorImpl.class); + + if(!this.submissionRunitme.equalsIgnoreCase(RuntimeIdentifier.RUNTIME_NAME)){ + throw new RuntimeException("Unsupported submission runtime " + this.submissionRunitme); + } + + // Currently only local runtime is supported as a secondary runtime + return Configurations.merge(conf.build().build(), + ExtensibleYarnClientConfiguration.CONF + .set(ExtensibleYarnClientConfiguration.DRIVER_CONFIGURATION_PROVIDER, + MultiRuntimeDriverConfigurationProvider.class).build()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionBuilder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionBuilder.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionBuilder.java new file mode 100644 index 0000000..edb0667 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionBuilder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client; + +import org.apache.commons.lang.Validate; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.runtime.multi.utils.avro.MultiRuntimeDefinition; +import org.apache.reef.runtime.multi.utils.avro.RuntimeDefinition; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +/** + * Builder for multi runtime definition. + */ +public final class MultiRuntimeDefinitionBuilder { + private Map<String, RuntimeDefinition> runtimes = new HashMap<>(); + private String defaultRuntime; + + private static RuntimeDefinition createRuntimeDefinition(final Configuration configModule, + final String runtimeName) { + final Configuration localDriverConfiguration = configModule; + final AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + final String serializedConfig = serializer.toString(localDriverConfiguration); + return new RuntimeDefinition(runtimeName, serializedConfig); + } + + /** + * Adds runtime configuration module to the builder. + * @param config The configuration module + * @param runtimeName The name of the runtime + * @return The builder instance + */ + public MultiRuntimeDefinitionBuilder addRuntime(final Configuration config, final String runtimeName){ + Validate.notNull(config, "runtime configuration module should not be null"); + Validate.isTrue(StringUtils.isNotBlank(runtimeName), + "runtimeName should be non empty and non blank string"); + final RuntimeDefinition rd = createRuntimeDefinition(config, runtimeName); + this.runtimes.put(runtimeName, rd); + return this; + } + + /** + * Sets default runtime name. + * @param runtimeName The name of the default runtime + * @return The builder instance + */ + public MultiRuntimeDefinitionBuilder setDefaultRuntimeName(final String runtimeName){ + Validate.isTrue(StringUtils.isNotBlank(runtimeName), + "runtimeName should be non empty and non blank string"); + this.defaultRuntime = runtimeName; + return this; + } + + /** + * Builds multi runtime definition. + * @return The populated definition object + */ + public MultiRuntimeDefinition build(){ + Validate.isTrue(this.runtimes.size() == 1 || !StringUtils.isEmpty(this.defaultRuntime), "Default runtime " + + "should be set if more than a single runtime provided"); + + if(StringUtils.isEmpty(this.defaultRuntime)){ + // we have single runtime configured, take its name as a default + this.defaultRuntime = this.runtimes.keySet().iterator().next(); + } + + Validate.isTrue(this.runtimes.containsKey(this.defaultRuntime), "Default runtime should be configured"); + return new MultiRuntimeDefinition(defaultRuntime, new ArrayList<>(this.runtimes.values())); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGenerator.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGenerator.java new file mode 100644 index 0000000..faf4901 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGenerator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.runtime.multi.client; + +import org.apache.reef.runtime.multi.utils.avro.MultiRuntimeDefinition; + +import java.net.URI; + +/** + * Defines a contract for a multi runtime definition generator. + */ +public interface MultiRuntimeDefinitionGenerator { + /** + * Generates needed driver configuration modules. + * + * @param jobFolder the job folder + * @param clientRemoteId the client remote id + * @param jobId the job id + * @return Instance of <code>MultiRuntimeDefinition</code> + */ + MultiRuntimeDefinition getMultiRuntimeDefinition(final URI jobFolder, + final String clientRemoteId, + final String jobId); + +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGeneratorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGeneratorImpl.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGeneratorImpl.java new file mode 100644 index 0000000..b44c72d --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDefinitionGeneratorImpl.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; +import org.apache.reef.runtime.local.client.parameters.RackNames; +import org.apache.reef.runtime.local.driver.LocalDriverConfiguration; +import org.apache.reef.runtime.multi.client.parameters.DefaultRuntimeName; +import org.apache.reef.runtime.multi.client.parameters.RuntimeNames; +import org.apache.reef.runtime.multi.utils.avro.MultiRuntimeDefinition; +import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; +import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationModule; + +import javax.inject.Inject; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * MultiRuntime configuration provider. + */ +@Private +@RuntimeAuthor +final class MultiRuntimeDefinitionGeneratorImpl implements MultiRuntimeDefinitionGenerator { + private final double jvmSlack; + private final int maxEvaluators; + private final Set<String> rackNames; + private final Map<String, ConfigurationModuleBuilder> configModulesCreators = new HashMap<>(); + private final String defaultRuntimeName; + private final Set<String> runtimeNames; + + @Inject + private MultiRuntimeDefinitionGeneratorImpl( + @Parameter(JVMHeapSlack.class) final double jvmSlack, + @Parameter(MaxNumberOfEvaluators.class) final int maxEvaluators, + @Parameter(RackNames.class) final Set<String> rackNames, + @Parameter(RuntimeNames.class) final Set<String> runtimeNames, + @Parameter(DefaultRuntimeName.class) final String defaultRuntimeName) { + this.jvmSlack = jvmSlack; + this.maxEvaluators = maxEvaluators; + this.rackNames = rackNames; + + Validate.notNull(runtimeNames, "Runtimes should contain at least one element"); + Validate.notEmpty(runtimeNames, "Runtimes should contain at least one element"); + Validate.isTrue( + !StringUtils.isEmpty(defaultRuntimeName) && !StringUtils.isBlank(defaultRuntimeName), + "Default runtime name should not be empty"); + Validate.isTrue(runtimeNames.contains(defaultRuntimeName), String.format("No runtime found for default runtime " + + "name %s. Defined runtimes %s", defaultRuntimeName, StringUtils.join(runtimeNames, ","))); + + this.runtimeNames = runtimeNames; + this.defaultRuntimeName = defaultRuntimeName; + + this.configModulesCreators.put(RuntimeIdentifier.RUNTIME_NAME, new ConfigurationModuleBuilder() { + @Override + public Configuration getConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + return getYarnConfiguration(jobFolder, clientRemoteId, jobId); + } + } + ); + + this.configModulesCreators.put( + org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME, + new ConfigurationModuleBuilder() { + @Override + public Configuration getConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + return getLocalConfiguration(jobFolder, clientRemoteId, jobId); + } + } + ); + + } + + private Configuration getYarnConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + return YarnDriverConfiguration.CONF + .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolder.toString()) + .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobId) + .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) + .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack).build(); + + } + + private Configuration getLocalConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + + ConfigurationModule localModule = LocalDriverConfiguration.CONF + .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, this.maxEvaluators) + // ROOT FOLDER will point to the current runtime directory + .set(LocalDriverConfiguration.ROOT_FOLDER, ".") + .set(LocalDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack) + .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) + .set(LocalDriverConfiguration.JOB_IDENTIFIER, jobId); + for (final String rackName : rackNames) { + localModule = localModule.set(LocalDriverConfiguration.RACK_NAMES, rackName); + } + + return localModule.build(); + } + + + public MultiRuntimeDefinition getMultiRuntimeDefinition(final URI jobFolder, + final String clientRemoteId, + final String jobId) { + + + MultiRuntimeDefinitionBuilder builder = new MultiRuntimeDefinitionBuilder(); + for (final String runtime : this.runtimeNames) { + builder.addRuntime( + this.configModulesCreators.get(runtime).getConfiguration(jobFolder, clientRemoteId, jobId), + runtime); + } + + return builder.setDefaultRuntimeName(this.defaultRuntimeName).build(); + } + + private interface ConfigurationModuleBuilder { + Configuration getConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDriverConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDriverConfigurationProvider.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDriverConfigurationProvider.java new file mode 100644 index 0000000..fba4b04 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeDriverConfigurationProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client; + +import org.apache.reef.runtime.common.client.DriverConfigurationProvider; +import org.apache.reef.runtime.multi.driver.MultiRuntimeDriverConfiguration; +import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer; +import org.apache.reef.runtime.multi.utils.avro.MultiRuntimeDefinition; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; + +import javax.inject.Inject; +import java.net.URI; + +/** + * Provides base class for driver configuration providers for multi runtimes. + */ +final class MultiRuntimeDriverConfigurationProvider implements DriverConfigurationProvider { + private final MultiRuntimeDefinitionSerializer runtimeDefinitionSerializer = new MultiRuntimeDefinitionSerializer(); + private MultiRuntimeDefinitionGenerator definitionGenerator; + + @Inject + MultiRuntimeDriverConfigurationProvider(final MultiRuntimeDefinitionGenerator definitionGenerator) { + this.definitionGenerator = definitionGenerator; + } + + /** + * Assembles the driver configuration. + * + * @param jobFolder The folder in which the local runtime will execute this job. + * @param clientRemoteId the remote identifier of the client. It is used by the Driver to establish a + * connection back to the client. + * @param jobId The identifier of the job. + * @param applicationConfiguration The configuration of the application, e.g. a filled out DriverConfiguration + * @return The Driver configuration to be used to instantiate the Driver. + */ + @Override + public Configuration getDriverConfiguration(final URI jobFolder, + final String clientRemoteId, + final String jobId, + final Configuration applicationConfiguration) { + MultiRuntimeDefinition runtimeDefinitions = this.definitionGenerator.getMultiRuntimeDefinition( + jobFolder, + clientRemoteId, + jobId); + + return Configurations.merge(applicationConfiguration, + MultiRuntimeDriverConfiguration.CONF + .set(MultiRuntimeDriverConfiguration.JOB_IDENTIFIER, jobId) + .set(MultiRuntimeDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId) + .set(MultiRuntimeDriverConfiguration.SERIALIZED_RUNTIME_DEFINITION, + this.runtimeDefinitionSerializer.toString(runtimeDefinitions)).build()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeHelperConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeHelperConfiguration.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeHelperConfiguration.java new file mode 100644 index 0000000..1a5ff69 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/MultiRuntimeHelperConfiguration.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client; + +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; + +/** + * A ConfigurationModule to bind arbitrary named parameters for runtimes. + */ +final class MultiRuntimeHelperConfiguration extends ConfigurationModuleBuilder { +} + http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/package-info.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/package-info.java new file mode 100644 index 0000000..d9a721a --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * client side for the multi runtime implementation. + */ +package org.apache.reef.runtime.multi.client; http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/DefaultRuntimeName.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/DefaultRuntimeName.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/DefaultRuntimeName.java new file mode 100644 index 0000000..2f9f304 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/DefaultRuntimeName.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Serialized tang configuration for a runtime. + */ +@NamedParameter(doc = "The default runtime name", short_name = "default_runtime") +public final class DefaultRuntimeName implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/RuntimeNames.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/RuntimeNames.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/RuntimeNames.java new file mode 100644 index 0000000..48df886 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/RuntimeNames.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +import java.util.Set; + +/** + * Serialized tang configuration for a runtime. + */ +@NamedParameter(doc = "The runtime names", short_name = "runtime_names") +public final class RuntimeNames implements Name<Set<String>> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/SerializedRuntimeDefinition.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/SerializedRuntimeDefinition.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/SerializedRuntimeDefinition.java new file mode 100644 index 0000000..c9bc0b8 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/SerializedRuntimeDefinition.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.client.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Serialized MultiRuntimeDefinition. + */ +@NamedParameter(doc = "The multi runtime definition", short_name = "multi_runtime_definition") +public final class SerializedRuntimeDefinition implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/package-info.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/package-info.java new file mode 100644 index 0000000..1f7f6ef --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/client/parameters/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Parameters for the multi runtime. + */ +package org.apache.reef.runtime.multi.client.parameters; http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeDriverConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeDriverConfiguration.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeDriverConfiguration.java new file mode 100644 index 0000000..882a7f5 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeDriverConfiguration.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.common.launch.parameters.LaunchID; +import org.apache.reef.runtime.multi.client.parameters.SerializedRuntimeDefinition; +import org.apache.reef.runtime.yarn.YarnClasspathProvider; +import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.OptionalParameter; +import org.apache.reef.tang.formats.RequiredParameter; + +/** + * ConfigurationModule for the multi-runtime Driver. + */ +@Private +@RuntimeAuthor +public class MultiRuntimeDriverConfiguration extends ConfigurationModuleBuilder { + + /** + * Serialized runtime configuration. + */ + public static final RequiredParameter<String> SERIALIZED_RUNTIME_DEFINITION = new RequiredParameter<>(); + + /** + * The identifier of the Job submitted. + */ + public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>(); + + /** + * The identifier of the Job submitted. + */ + public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>(); + + /** + * Hybrid driver configuration. + */ + public static final ConfigurationModule CONF = new MultiRuntimeDriverConfiguration() + .bindImplementation(ResourceLaunchHandler.class, MultiRuntimeResourceLaunchHandler.class) + .bindImplementation(ResourceRequestHandler.class, MultiRuntimeResourceRequestHandler.class) + .bindImplementation(ResourceReleaseHandler.class, MultiRuntimeResourceReleaseHandler.class) + .bindImplementation(ResourceManagerStartHandler.class, MultiRuntimeResourceManagerStartHandler.class) + .bindImplementation(ResourceManagerStopHandler.class, MultiRuntimeResourceManagerStopHandler.class) + .bindNamedParameter(SerializedRuntimeDefinition.class, SERIALIZED_RUNTIME_DEFINITION) + .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER) + .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER) + .bindNamedParameter(ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceLaunchHandler.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceLaunchHandler.java new file mode 100644 index 0000000..aca5c02 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceLaunchHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; + +import javax.inject.Inject; + +/** + * Takes resource launch events and delegates them to the runtimes host. + */ +@Private +@DriverSide +final class MultiRuntimeResourceLaunchHandler implements ResourceLaunchHandler { + + private final RuntimesHost runtimesHost; + + @Inject + private MultiRuntimeResourceLaunchHandler(final RuntimesHost runtimesHost) { + this.runtimesHost = runtimesHost; + } + + @Override + public void onNext(final ResourceLaunchEvent value) { + this.runtimesHost.onResourceLaunch(value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStartHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStartHandler.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStartHandler.java new file mode 100644 index 0000000..aea6b3b --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStartHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; + +import javax.inject.Inject; + +/** + * This event handler delegates runtime start events to the runtimes host. + */ +final class MultiRuntimeResourceManagerStartHandler implements ResourceManagerStartHandler { + + private final RuntimesHost runtimesHost; + + @Inject + private MultiRuntimeResourceManagerStartHandler(final RuntimesHost runtimesHost) { + this.runtimesHost = runtimesHost; + } + + @Override + public void onNext(final RuntimeStart value) { + this.runtimesHost.onRuntimeStart(value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStopHandler.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStopHandler.java new file mode 100644 index 0000000..1343410 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceManagerStopHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +import javax.inject.Inject; + +/** + * This event handler delegates runtime stop events to the runtimes host. + */ +final class MultiRuntimeResourceManagerStopHandler implements ResourceManagerStopHandler { + + private final RuntimesHost runtimesHost; + + @Inject + private MultiRuntimeResourceManagerStopHandler(final RuntimesHost runtimesHost) { + this.runtimesHost = runtimesHost; + } + + @Override + public void onNext(final RuntimeStop value) { + this.runtimesHost.onRuntimeStop(value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceReleaseHandler.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceReleaseHandler.java new file mode 100644 index 0000000..5617ffe --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceReleaseHandler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; +import javax.inject.Inject; + +/** + * This event handler delegates resource release events to the runtimes host. + */ +@Private +@DriverSide +final class MultiRuntimeResourceReleaseHandler implements ResourceReleaseHandler { + + private RuntimesHost runtimesHost; + + @Inject + private MultiRuntimeResourceReleaseHandler(final RuntimesHost runtimesHost) { + this.runtimesHost = runtimesHost; + } + + @Override + public void onNext(final ResourceReleaseEvent t) { + this.runtimesHost.onResourceRelease(t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceRequestHandler.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceRequestHandler.java new file mode 100644 index 0000000..caa48e5 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/MultiRuntimeResourceRequestHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; + +import javax.inject.Inject; + +/** + * This event handler delegates resource request events to the runtimes host. + */ +@Private +@DriverSide +final class MultiRuntimeResourceRequestHandler implements ResourceRequestHandler { + + private final RuntimesHost runtimesHost; + + @Inject + private MultiRuntimeResourceRequestHandler(final RuntimesHost runtimesHost) { + this.runtimesHost = runtimesHost; + } + + @Override + public void onNext(final ResourceRequestEvent t) { + this.runtimesHost.onResourceRequest(t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/Runtime.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/Runtime.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/Runtime.java new file mode 100644 index 0000000..683f8bc --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/Runtime.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +/** + * An interface for a runtime. + */ +interface Runtime { + String getRuntimeName(); + void onResourceLaunch(final ResourceLaunchEvent value); + void onRuntimeStart(final RuntimeStart value); + void onRuntimeStop(final RuntimeStop value); + void onResourceRelease(final ResourceReleaseEvent value); + void onResourceRequest(final ResourceRequestEvent value); +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimeImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimeImpl.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimeImpl.java new file mode 100644 index 0000000..28097bb --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimeImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.multi.driver.parameters.RuntimeName; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +import javax.inject.Inject; + +/** + * Implementation of a runtime. + */ +final class RuntimeImpl implements Runtime { + private final String name; + private final ResourceLaunchHandler resourceLaunchHandler; + private final ResourceManagerStartHandler resourceManagerStartHandler; + private final ResourceManagerStopHandler resourceManagerStopHandler; + private final ResourceReleaseHandler resourceReleaseHandler; + private final ResourceRequestHandler resourceRequestHandler; + + @Inject + private RuntimeImpl( + @Parameter(RuntimeName.class) final String name, + final ResourceLaunchHandler resourceLaunchHandler, + final ResourceManagerStartHandler resourceManagerStartHandler, + final ResourceManagerStopHandler resourceManagerStopHandler, + final ResourceReleaseHandler resourceReleaseHandler, + final ResourceRequestHandler resourceRequestHandler) { + this.name = name; + this.resourceLaunchHandler = resourceLaunchHandler; + this.resourceManagerStartHandler = resourceManagerStartHandler; + this.resourceManagerStopHandler = resourceManagerStopHandler; + this.resourceReleaseHandler = resourceReleaseHandler; + this.resourceRequestHandler = resourceRequestHandler; + } + + @Override + public String getRuntimeName() { + return this.name; + } + + @Override + public void onResourceLaunch(final ResourceLaunchEvent value) { + this.resourceLaunchHandler.onNext(value); + } + + @Override + public void onRuntimeStart(final RuntimeStart value) { + this.resourceManagerStartHandler.onNext(value); + } + + @Override + public void onRuntimeStop(final RuntimeStop value) { + this.resourceManagerStopHandler.onNext(value); + } + + @Override + public void onResourceRelease(final ResourceReleaseEvent value) { + this.resourceReleaseHandler.onNext(value); + } + + @Override + public void onResourceRequest(final ResourceRequestEvent value) { + this.resourceRequestHandler.onNext(value); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java new file mode 100644 index 0000000..00d3a62 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/RuntimesHost.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.reef.runtime.common.driver.api.*; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; +import org.apache.reef.runtime.multi.client.parameters.SerializedRuntimeDefinition; +import org.apache.reef.runtime.multi.driver.parameters.RuntimeName; +import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer; +import org.apache.reef.runtime.multi.utils.avro.MultiRuntimeDefinition; +import org.apache.reef.runtime.multi.utils.avro.RuntimeDefinition; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.AvroConfigurationSerializer; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Hosts the actual runtime implementations and delegates invocations to them. + */ +final class RuntimesHost { + private final MultiRuntimeDefinition runtimeDefinition; + private final Injector originalInjector; + private final String defaultRuntimeName; + private final MultiRuntimeDefinitionSerializer runtimeDefinitionSerializer = new MultiRuntimeDefinitionSerializer(); + private Map<String, Runtime> runtimes; + + @Inject + private RuntimesHost(final Injector injector, + @Parameter(SerializedRuntimeDefinition.class) final String serializedRuntimeDefinition) { + this.originalInjector = injector; + try { + this.runtimeDefinition = this.runtimeDefinitionSerializer.fromString(serializedRuntimeDefinition); + } catch (IOException e) { + throw new RuntimeException("Unable to read runtime configuration.", e); + } + + this.defaultRuntimeName = runtimeDefinition.getDefaultRuntimeName().toString(); + } + + /** + * Initializes the configured runtimes. + */ + private synchronized void initialize() { + if (this.runtimes != null) { + return; + } + + this.runtimes = new HashMap<>(); + + for (final RuntimeDefinition rd : runtimeDefinition.getRuntimes()) { + try { + + // We need to create different injector for each runtime as they define conflicting bindings. Also we cannot + // fork the original injector because of the same reason. + // We create new injectors and copy form the original injector what we need. + // rootInjector is an emptyInjector that we copy bindings from the original injector into. Then we fork + //it to instantiate the actual runtime. + Injector rootInjector = Tang.Factory.getTang().newInjector(); + initializeInjector(rootInjector); + final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder(); + cb.bindNamedParameter(RuntimeName.class, rd.getRuntimeName().toString()); + cb.bindImplementation(Runtime.class, RuntimeImpl.class); + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + Configuration config = serializer.fromString(rd.getSerializedConfiguration().toString()); + final Injector runtimeInjector = rootInjector.forkInjector(config, cb.build()); + this.runtimes.put(rd.getRuntimeName().toString(), runtimeInjector.getInstance(Runtime.class)); + } catch (InjectionException e) { + throw new RuntimeException("Unable to initialize runtimes.", e); + } catch (IOException e) { + throw new RuntimeException("Unable to initialize runtimes.", e); + } + } + } + + /** + * Initializes injector by copying needed handlers. + * @param runtimeInjector The injector to initialize + * @throws InjectionException + */ + private void initializeInjector(final Injector runtimeInjector) throws InjectionException { + final EventHandler<ResourceStatusEvent> statusEventHandler = + this.originalInjector.getNamedInstance(RuntimeParameters.ResourceStatusHandler.class); + runtimeInjector.bindVolatileParameter(RuntimeParameters.ResourceStatusHandler.class, statusEventHandler); + final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler = + this.originalInjector.getNamedInstance(RuntimeParameters.NodeDescriptorHandler.class); + runtimeInjector.bindVolatileParameter(RuntimeParameters.NodeDescriptorHandler.class, nodeDescriptorEventHandler); + final EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler = + this.originalInjector.getNamedInstance(RuntimeParameters.ResourceAllocationHandler.class); + runtimeInjector.bindVolatileParameter( + RuntimeParameters.ResourceAllocationHandler.class, + resourceAllocationEventHandler); + final EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler = + this.originalInjector.getNamedInstance(RuntimeParameters.RuntimeStatusHandler.class); + runtimeInjector.bindVolatileParameter( + RuntimeParameters.RuntimeStatusHandler.class, + runtimeStatusEventHandler); + } + + /** + * Retrieves requested runtime, if requested name is empty a default runtime will be used. + * @param requestedRuntimeName the requested runtime name + * @return + */ + private Runtime getRuntime(final String requestedRuntimeName) { + String runtimeName = requestedRuntimeName; + if (StringUtils.isBlank(runtimeName)) { + runtimeName = this.defaultRuntimeName; + } + + Runtime runtime = this.runtimes.get(runtimeName); + + Validate.notNull(runtime, "Couldn't find runtime for name " + runtimeName); + return runtime; + } + + void onResourceLaunch(final ResourceLaunchEvent value) { + getRuntime(value.getRuntimeName()).onResourceLaunch(value); + } + + void onRuntimeStart(final RuntimeStart value) { + initialize(); + for (Runtime runtime : this.runtimes.values()) { + runtime.onRuntimeStart(value); + } + } + + void onRuntimeStop(final RuntimeStop value) { + for (Runtime runtime : this.runtimes.values()) { + runtime.onRuntimeStop(value); + } + } + + void onResourceRelease(final ResourceReleaseEvent value) { + getRuntime(value.getRuntimeName()).onResourceRelease(value); + } + + void onResourceRequest(final ResourceRequestEvent value) { + getRuntime(value.getRuntimeName()).onResourceRequest(value); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/package-info.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/package-info.java new file mode 100644 index 0000000..ceb353c --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * A multi runtime implementation of REEF that uses several runtime implementationsi for execution. + */ +package org.apache.reef.runtime.multi.driver; http://git-wip-us.apache.org/repos/asf/reef/blob/07874cbb/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/parameters/RuntimeName.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/parameters/RuntimeName.java b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/parameters/RuntimeName.java new file mode 100644 index 0000000..2c11900 --- /dev/null +++ b/lang/java/reef-runtime-multi/src/main/java/org/apache/reef/runtime/multi/driver/parameters/RuntimeName.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.multi.driver.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Serialized tang configuration for a runtime. + */ +@NamedParameter(doc = "The name of the runtime", short_name = "runtime_names") +public final class RuntimeName implements Name<String> { +}
