Repository: reef Updated Branches: refs/heads/master 913c66cc7 -> cafda7df5
[REEF-1354] Yarn bootstrap for c# multi-runtime client This addresses the issue by: 1) Adding bootstrap mechanism for multiruntime JIRA: [REEF-1354](https://issues.apache.org/jira/browse/REEF-1354) Pull Request: Closes #966 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/cafda7df Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/cafda7df Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/cafda7df Branch: refs/heads/master Commit: cafda7df5488a02f250379e2da7c445ecc324f82 Parents: 913c66c Author: Boris Shulman <[email protected]> Authored: Sat Apr 23 09:05:04 2016 -0700 Committer: Andrew Chung <[email protected]> Committed: Mon May 9 14:22:37 2016 -0700 ---------------------------------------------------------------------- lang/java/reef-bridge-client/pom.xml | 5 + .../src/main/avro/AppSubmissionParameters.avsc | 16 +- ...untimeAppSubmissionParametersSerializer.java | 68 +++++ ...roYarnJobSubmissionParametersSerializer.java | 69 +++++ ...ntimeYarnBootstrapDriverConfigGenerator.java | 287 +++++++++++++++++++ .../MultiRuntimeYarnBootstrapREEFLauncher.java | 83 ++++++ ...SubmissionParametersSerializationFromCS.java | 252 +++++++++++++++- 7 files changed, 775 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/pom.xml b/lang/java/reef-bridge-client/pom.xml index 1363942..b2bb759 100644 --- a/lang/java/reef-bridge-client/pom.xml +++ b/lang/java/reef-bridge-client/pom.xml @@ -80,6 +80,11 @@ under the License. <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-multi</artifactId> + <version>${project.version}</version> + </dependency> <!-- Adding Hadoop to make sure we have it in the shaded jar --> <dependency> http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc index d4926e9..7cc52d6 100644 --- a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc +++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc @@ -47,5 +47,19 @@ { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" }, { "name": "driverRecoveryTimeout", "type": "int" } ] - } + }, + { + "namespace": "org.apache.reef.reef.bridge.client.avro", + "type": "record", + "name": "AvroMultiRuntimeAppSubmissionParameters", + "doc": "General cross-language application submission parameters to the YARN runtime", + "fields": [ + { "name": "sharedAppSubmissionParameters", "type": "AvroAppSubmissionParameters" }, + { "name": "localRuntimeAppParameters", "type": ["null", "AvroLocalAppSubmissionParameters"], "default": + null }, + { "name": "yarnRuntimeAppParameters", "type": ["null", "AvroYarnAppSubmissionParameters"], "default": null }, + { "name": "defaultRuntimeName", "type":"string", "doc":"The name of the default runtime" }, + { "name": "runtimes", "type": { "type" :"array", "items": "string"}, "doc":"defined runtimes" } + ] + } ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java new file mode 100644 index 0000000..af497cc --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroMultiRuntimeAppSubmissionParametersSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.JsonDecoder; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.reef.reef.bridge.client.avro.AvroMultiRuntimeAppSubmissionParameters; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Searializer class for the AvroMultiRuntimeAppSubmissionParameters. + */ +final class AvroMultiRuntimeAppSubmissionParametersSerializer { + @Inject + private AvroMultiRuntimeAppSubmissionParametersSerializer(){ + } + + /** + * Reads avro object from file. + * + * @param file The file to read from + * @return Avro object + * @throws IOException + */ + AvroMultiRuntimeAppSubmissionParameters fromFile(final File file) throws IOException { + try (final FileInputStream fileInputStream = new FileInputStream(file)) { + // This is mainly a test hook. + return fromInputStream(fileInputStream); + } + } + + /** + * Reads avro object from input stream. + * + * @param inputStream The input stream to read from + * @return Avro object + * @throws IOException + */ + AvroMultiRuntimeAppSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException { + final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( + AvroMultiRuntimeAppSubmissionParameters.getClassSchema(), inputStream); + final SpecificDatumReader<AvroMultiRuntimeAppSubmissionParameters> reader = new SpecificDatumReader<>( + AvroMultiRuntimeAppSubmissionParameters.class); + return reader.read(null, decoder); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java new file mode 100644 index 0000000..84f0a19 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AvroYarnJobSubmissionParametersSerializer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.JsonDecoder; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Searializer class for the AvroYarnAppSubmissionParameters. + */ +final class AvroYarnJobSubmissionParametersSerializer { + @Inject + private AvroYarnJobSubmissionParametersSerializer(){ + } + + /** + * Reads avro object from file. + * + * @param file The file to read from + * @return Avro object + * @throws IOException + */ + AvroYarnJobSubmissionParameters fromFile(final File file) throws IOException { + try (final FileInputStream fileInputStream = new FileInputStream(file)) { + // This is mainly a test hook. + return fromInputStream(fileInputStream); + } + } + + /** + * Reads avro object from input stream. + * + * @param inputStream The input stream to read from + * @return Avro object + * @throws IOException + */ + AvroYarnJobSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException { + final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( + AvroYarnJobSubmissionParameters.getClassSchema(), inputStream); + final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>( + AvroYarnJobSubmissionParameters.class); + return reader.read(null, decoder); + } + +} http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java new file mode 100644 index 0000000..c2fc144 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapDriverConfigGenerator.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.client; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.client.DriverRestartConfiguration; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; +import org.apache.reef.javabridge.generic.JobDriver; +import org.apache.reef.reef.bridge.client.avro.*; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; +import org.apache.reef.runtime.local.driver.LocalDriverConfiguration; +import org.apache.reef.runtime.multi.client.*; +import org.apache.reef.runtime.multi.driver.MultiRuntimeDriverConfiguration; +import org.apache.reef.runtime.multi.utils.MultiRuntimeDefinitionSerializer; +import org.apache.reef.runtime.yarn.YarnClasspathProvider; +import org.apache.reef.runtime.yarn.driver.*; +import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; +import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; + +import javax.inject.Inject; +import java.io.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is the Java Driver configuration generator for .NET Drivers that generates + * the Driver configuration at runtime for multiruntime. Called by {@link MultiRuntimeYarnBootstrapREEFLauncher}. + */ +@DriverSide +final class MultiRuntimeYarnBootstrapDriverConfigGenerator { + private static final Logger LOG = Logger.getLogger(MultiRuntimeYarnBootstrapDriverConfigGenerator.class.getName()); + private static final String DUMMY_YARN_RUNTIME = "DummyYarnRuntime"; + + private final MultiRuntimeDefinitionSerializer runtimeDefinitionSerializer = new MultiRuntimeDefinitionSerializer(); + + private final REEFFileNames reefFileNames; + private final ConfigurationSerializer configurationSerializer; + private final AvroMultiRuntimeAppSubmissionParametersSerializer avroMultiRuntimeAppSubmissionParametersSerializer; + private final AvroYarnJobSubmissionParametersSerializer avroYarnJobSubmissionParametersSerializer; + + @Inject + private MultiRuntimeYarnBootstrapDriverConfigGenerator(final REEFFileNames reefFileNames, + final ConfigurationSerializer configurationSerializer, + final AvroMultiRuntimeAppSubmissionParametersSerializer + avroMultiRuntimeAppSubmissionParameters, + final AvroYarnJobSubmissionParametersSerializer + avroYarnJobSubmissionParametersSerializer) { + this.configurationSerializer = configurationSerializer; + this.reefFileNames = reefFileNames; + this.avroYarnJobSubmissionParametersSerializer = avroYarnJobSubmissionParametersSerializer; + this.avroMultiRuntimeAppSubmissionParametersSerializer = avroMultiRuntimeAppSubmissionParameters; + } + + /** + * Adds yarn runtime definitions to the builder. + * @param yarnJobSubmissionParams Yarn job submission parameters + * @param jobSubmissionParameters Generic job submission parameters + * @param builder The multi runtime builder + */ + private void addYarnRuntimeDefinition( + final AvroYarnJobSubmissionParameters yarnJobSubmissionParams, + final AvroJobSubmissionParameters jobSubmissionParameters, + final MultiRuntimeDefinitionBuilder builder) { + // create and serialize yarn configuration if defined + final Configuration yarnDriverConfiguration = + createYarnConfiguration(yarnJobSubmissionParams, jobSubmissionParameters); + + // add yarn runtime to the builder + builder.addRuntime(yarnDriverConfiguration, RuntimeIdentifier.RUNTIME_NAME); + } + + private Configuration createYarnConfiguration( + final AvroYarnJobSubmissionParameters yarnJobSubmissionParams, + final AvroJobSubmissionParameters jobSubmissionParameters) { + return YarnDriverConfiguration.CONF + .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, + yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString()) + .set(YarnDriverConfiguration.JOB_IDENTIFIER, + jobSubmissionParameters.getJobId().toString()) + .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, + ClientRemoteIdentifier.NONE) + .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0) + .set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME) + .build(); + } + + /** + * Adds yarn runtime definitions to the builder, with a dummy name. + * This is needed to initialze yarn runtme that registers with RM but does not allows submitting evaluators + * as evaluator submissions submits to Yarn runtime. + * @param yarnJobSubmissionParams Yarn job submission parameters + * @param jobSubmissionParameters Generic job submission parameters + * @param builder The multi runtime builder + */ + private void addDummyYarnRuntimeDefinition( + final AvroYarnJobSubmissionParameters yarnJobSubmissionParams, + final AvroJobSubmissionParameters jobSubmissionParameters, + final MultiRuntimeDefinitionBuilder builder) { + // create and serialize yarn configuration if defined + final Configuration yarnDriverConfiguration = + createYarnConfiguration(yarnJobSubmissionParams, jobSubmissionParameters); + // add yarn runtime to the builder + builder.addRuntime(yarnDriverConfiguration, DUMMY_YARN_RUNTIME); + } + /** + * Adds local runtime definitions to the builder. + * @param localAppSubmissionParams Local app submission parameters + * @param jobSubmissionParameters Generic job submission parameters + * @param builder The multi runtime builder + */ + private void addLocalRuntimeDefinition( + final AvroLocalAppSubmissionParameters localAppSubmissionParams, + final AvroJobSubmissionParameters jobSubmissionParameters, + final MultiRuntimeDefinitionBuilder builder) { + // create and serialize local configuration if defined + final Configuration localModule = LocalDriverConfiguration.CONF + .set(LocalDriverConfiguration.MAX_NUMBER_OF_EVALUATORS, + localAppSubmissionParams.getMaxNumberOfConcurrentEvaluators()) + // ROOT FOLDER will point to the current runtime directory + .set(LocalDriverConfiguration.ROOT_FOLDER, ".") + .set(LocalDriverConfiguration.JVM_HEAP_SLACK, 0.0) + .set(LocalDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) + .set(LocalDriverConfiguration.JOB_IDENTIFIER, + jobSubmissionParameters.getJobId().toString()) + .set(LocalDriverConfiguration.RUNTIME_NAMES, + org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME) + .build(); + + // add local runtime to the builder + builder.addRuntime(localModule, org.apache.reef.runtime.local.driver.RuntimeIdentifier.RUNTIME_NAME); + } + + private Configuration getMultiRuntimeDriverConfiguration( + final AvroYarnJobSubmissionParameters yarnJobSubmissionParams, + final AvroMultiRuntimeAppSubmissionParameters multiruntimeAppSubmissionParams) { + + if (multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters() == null && + multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() == null){ + throw new IllegalArgumentException("At least on execution runtime has to be provided"); + } + + // read yarn job submission parameters + final AvroJobSubmissionParameters jobSubmissionParameters = + yarnJobSubmissionParams.getSharedJobSubmissionParameters(); + + // generate multi runtime definition + final MultiRuntimeDefinitionBuilder multiRuntimeDefinitionBuilder = new MultiRuntimeDefinitionBuilder(); + + if (multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters() != null){ + addLocalRuntimeDefinition( + multiruntimeAppSubmissionParams.getLocalRuntimeAppParameters(), + jobSubmissionParameters, multiRuntimeDefinitionBuilder); + } + + if (multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() != null){ + addYarnRuntimeDefinition( + yarnJobSubmissionParams, + jobSubmissionParameters, + multiRuntimeDefinitionBuilder); + } else { + addDummyYarnRuntimeDefinition( + yarnJobSubmissionParams, + jobSubmissionParameters, + multiRuntimeDefinitionBuilder); + } + + multiRuntimeDefinitionBuilder.setDefaultRuntimeName( + multiruntimeAppSubmissionParams.getDefaultRuntimeName().toString()); + + // generate multi runtime configuration + ConfigurationModule multiRuntimeDriverConfiguration = MultiRuntimeDriverConfiguration.CONF + .set(MultiRuntimeDriverConfiguration.JOB_IDENTIFIER, jobSubmissionParameters.getJobId().toString()) + .set(MultiRuntimeDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) + .set(MultiRuntimeDriverConfiguration.SERIALIZED_RUNTIME_DEFINITION, + this.runtimeDefinitionSerializer.toString(multiRuntimeDefinitionBuilder.build())); + + for (final CharSequence runtimeName : multiruntimeAppSubmissionParams.getRuntimes()){ + multiRuntimeDriverConfiguration = multiRuntimeDriverConfiguration.set( + MultiRuntimeDriverConfiguration.RUNTIME_NAMES, runtimeName.toString()); + } + + final AvroAppSubmissionParameters appSubmissionParams = + multiruntimeAppSubmissionParams.getSharedAppSubmissionParameters(); + + // generate yarn related driver configuration + final Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(appSubmissionParams.getTcpBeginPort())) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(appSubmissionParams.getTcpRangeCount())) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(appSubmissionParams.getTcpTryCount())) + .bindNamedParameter(JobSubmissionDirectoryPrefix.class, + yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString()) + .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class) + .build(); + + final Configuration driverConfiguration = Configurations.merge( + Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, + multiRuntimeDriverConfiguration.build(), + providerConfig); + + // add restart configuration if needed + if (multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters() != null && + multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() > 0) { + LOG.log(Level.FINE, "Driver restart is enabled."); + + final Configuration yarnDriverRestartConfiguration = + YarnDriverRestartConfiguration.CONF.build(); + + final Configuration driverRestartConfiguration = + DriverRestartConfiguration.CONF + .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, + JobDriver.DriverRestartActiveContextHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, + JobDriver.DriverRestartRunningTaskHandler.class) + .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS, + multiruntimeAppSubmissionParams.getYarnRuntimeAppParameters().getDriverRecoveryTimeout()) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED, + JobDriver.DriverRestartCompletedHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED, + JobDriver.DriverRestartFailedEvaluatorHandler.class) + .build(); + + return Configurations.merge(driverConfiguration, yarnDriverRestartConfiguration, driverRestartConfiguration); + } + + return driverConfiguration; + } + + + /** + * Writes the driver configuration files to the provided location. + * @param bootstrapJobArgsLocation The path for the job args file + * @param bootstrapAppArgsLocation The path for the app args file + * @throws IOException + * @return A path to the written driver configuration + */ + String writeDriverConfigurationFile(final String bootstrapJobArgsLocation, + final String bootstrapAppArgsLocation) throws IOException { + final File bootstrapJobArgsFile = new File(bootstrapJobArgsLocation).getCanonicalFile(); + final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation); + + final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs = + this.avroYarnJobSubmissionParametersSerializer.fromFile(bootstrapJobArgsFile); + + final AvroMultiRuntimeAppSubmissionParameters multiruntimeBootstrapAppArgs = + this.avroMultiRuntimeAppSubmissionParametersSerializer.fromFile(bootstrapAppArgsFile); + + final String driverConfigPath = reefFileNames.getDriverConfigurationPath(); + + this.configurationSerializer.toFile( + getMultiRuntimeDriverConfiguration( + yarnBootstrapJobArgs, multiruntimeBootstrapAppArgs), + new File(driverConfigPath)); + + return driverConfigPath; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java new file mode 100644 index 0000000..6e96439 --- /dev/null +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/MultiRuntimeYarnBootstrapREEFLauncher.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.client; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.Interop; +import org.apache.reef.runtime.common.REEFLauncher; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is a bootstrap launcher for YARN for submission of multiruntime jobs from C#. It allows for Java Driver + * configuration generation directly on the Driver without need of Java dependency if REST + * submission is used. Note that the name of the class must contain "REEFLauncher" for the time + * being in order for the Interop code to discover the class. + */ +// TODO[JIRA REEF-1382]: This class does things both for client and driver need to split it +@Unstable +@Interop(CppFiles = "DriverLauncher.cpp") +public final class MultiRuntimeYarnBootstrapREEFLauncher { + private static final Logger LOG = Logger.getLogger(MultiRuntimeYarnBootstrapREEFLauncher.class.getName()); + + public static void main(final String[] args) throws IOException, InjectionException { + LOG.log(Level.INFO, "Entering BootstrapLauncher.main()."); + if (args.length != 2) { + final StringBuilder sb = new StringBuilder(); + sb.append("[ "); + for (String arg : args) { + sb.append(arg); + sb.append(" "); + } + + sb.append("]"); + final String message = "Bootstrap launcher should have two configuration file inputs, one specifying the" + + " application submission parameters to be deserialized and the other specifying the job" + + " submission parameters to be deserialized to create the YarnDriverConfiguration on the fly." + + " Current args are " + sb.toString(); + + throw fatal(message, new IllegalArgumentException(message)); + } + + try { + final MultiRuntimeYarnBootstrapDriverConfigGenerator driverConfigurationGenerator = + Tang.Factory.getTang().newInjector().getInstance(MultiRuntimeYarnBootstrapDriverConfigGenerator.class); + REEFLauncher.main(new String[]{driverConfigurationGenerator.writeDriverConfigurationFile(args[0], args[1])}); + } catch (final Exception exception) { + if (!(exception instanceof RuntimeException)) { + throw fatal("Failed to initialize configurations.", exception); + } + + throw exception; + } + } + + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } + + private MultiRuntimeYarnBootstrapREEFLauncher(){ + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/cafda7df/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java index afa49be..490ed2e 100644 --- a/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java +++ b/lang/java/reef-bridge-client/src/test/java/org/apache/reef/bridge/client/TestAvroJobSubmissionParametersSerializationFromCS.java @@ -18,10 +18,7 @@ */ package org.apache.reef.bridge.client; -import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters; -import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters; +import org.apache.reef.reef.bridge.client.avro.*; import org.apache.reef.runtime.common.driver.parameters.JobIdentifier; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory; import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectoryPrefix; @@ -36,6 +33,8 @@ import org.junit.Test; import java.io.*; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; /** * Tests for generating Driver configuration by bootstrapping the process to the @@ -80,6 +79,89 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { "\"driverRecoveryTimeout\":" + NUMBER_REP + "}"; + private static final String AVRO_YARN_MULTIRUNTIME_APP_PARAMETERS_SERIALIZED_STRING = + "{" + + "\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"localRuntimeAppParameters\":" + + "{\"org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters\":" + + "{\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"maxNumberOfConcurrentEvaluators\":" + NUMBER_REP + + "}" + + "}," + + "\"yarnRuntimeAppParameters\":" + + "{\"org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters\":" + + "{\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"driverRecoveryTimeout\":" + NUMBER_REP + + "}" + + "}," + + "\"defaultRuntimeName\":\"Local\"" + "," + + "\"runtimes\":[\"Local\", \"Yarn\" ]" + + "}"; + + private static final String AVRO_YARN_MULTIRUNTIME_LOCALONLY_APP_PARAMETERS_SERIALIZED_STRING = + "{" + + "\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"localRuntimeAppParameters\":" + + "{\"org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters\":" + + "{\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"maxNumberOfConcurrentEvaluators\":" + NUMBER_REP + + "}" + + "}," + + "\"yarnRuntimeAppParameters\":null," + + "\"defaultRuntimeName\":\"Local\"" + "," + + "\"runtimes\":[\"Local\" ]" + + "}"; + + + private static final String AVRO_YARN_MULTIRUNTIME_YARNONLY_APP_PARAMETERS_SERIALIZED_STRING = + "{" + + "\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"localRuntimeAppParameters\":null," + + "\"yarnRuntimeAppParameters\":" + + "{\"org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters\":" + + "{\"sharedAppSubmissionParameters\":" + + "{" + + "\"tcpBeginPort\":" + NUMBER_REP + "," + + "\"tcpRangeCount\":" + NUMBER_REP + "," + + "\"tcpTryCount\":" + NUMBER_REP + + "}," + + "\"driverRecoveryTimeout\":" + NUMBER_REP + + "}" + + "}," + + "\"defaultRuntimeName\":\"Yarn\"" + "," + + "\"runtimes\":[\"Yarn\" ]" + + "}"; + /** * Tests deserialization of the Avro parameters for submission from the cluster from C#. * @throws IOException @@ -111,6 +193,39 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { } /** + * Tests deserialization of the Avro parameters for multiruntime from C#. + * @throws IOException + */ + @Test + public void testAvroMultiruntimeParametersDeserialization() throws IOException, InjectionException { + verifyYarnMultiRuntimeJobSubmissionParams( + createAvroYarnJobSubmissionParameters(), + createAvroMultiruntimeAppSubmissionParameters()); + } + + /** + * Tests deserialization of the Avro parameters for multiruntime from C#. + * @throws IOException + */ + @Test + public void testAvroMultiruntimeYarnOnlyParametersDeserialization() throws IOException, InjectionException { + verifyYarnOnlyMultiRuntimeJobSubmissionParams( + createAvroYarnJobSubmissionParameters(), + createAvroMultiruntimeYarnOnlyAppSubmissionParameters()); + } + + /** + * Tests deserialization of the Avro parameters for multiruntime from C#. + * @throws IOException + */ + @Test + public void testAvroMultiruntimeLocalOnlyParametersDeserialization() throws IOException, InjectionException { + verifyLocalOnlyMultiRuntimeJobSubmissionParams( + createAvroYarnJobSubmissionParameters(), + createAvroMultiruntimeLocalOnlyAppSubmissionParameters()); + } + + /** * Tests a round-trip serialization deserialization process of the Avro parameters from C#. * @throws IOException */ @@ -165,6 +280,38 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { } } + private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeAppSubmissionParameters() + throws IOException, InjectionException { + try (final InputStream stream = + new ByteArrayInputStream( + AVRO_YARN_MULTIRUNTIME_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { + return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class) + .fromInputStream(stream); + } + } + + private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeYarnOnlyAppSubmissionParameters() + throws IOException, InjectionException { + try (final InputStream stream = + new ByteArrayInputStream( + AVRO_YARN_MULTIRUNTIME_YARNONLY_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets + .UTF_8))) { + return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class) + .fromInputStream(stream); + } + } + + private static AvroMultiRuntimeAppSubmissionParameters createAvroMultiruntimeLocalOnlyAppSubmissionParameters() + throws IOException, InjectionException { + try (final InputStream stream = + new ByteArrayInputStream( + AVRO_YARN_MULTIRUNTIME_LOCALONLY_APP_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets + .UTF_8))) { + return Tang.Factory.getTang().newInjector().getInstance(AvroMultiRuntimeAppSubmissionParametersSerializer.class) + .fromInputStream(stream); + } + } + private static AvroYarnJobSubmissionParameters createAvroYarnJobSubmissionParameters() throws IOException { try (final InputStream stream = new ByteArrayInputStream(AVRO_YARN_JOB_PARAMETERS_SERIALIZED_STRING.getBytes(StandardCharsets.UTF_8))) { @@ -200,4 +347,101 @@ public final class TestAvroJobSubmissionParametersSerializationFromCS { assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); } + + private static void verifyYarnMultiRuntimeJobSubmissionParams( + final AvroYarnJobSubmissionParameters jobSubmissionParameters, + final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) { + final AvroAppSubmissionParameters sharedAppSubmissionParams = + appSubmissionParameters.getSharedAppSubmissionParameters(); + + final AvroJobSubmissionParameters sharedJobSubmissionParams = + jobSubmissionParameters.getSharedJobSubmissionParameters(); + + assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP; + assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP); + assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); + assert appSubmissionParameters.getLocalRuntimeAppParameters() != null; + assert appSubmissionParameters. + getLocalRuntimeAppParameters().getMaxNumberOfConcurrentEvaluators() == NUMBER_REP; + assert appSubmissionParameters.getYarnRuntimeAppParameters() != null; + assert appSubmissionParameters.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() == NUMBER_REP; + assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Local"); + assert appSubmissionParameters.getRuntimes().size() == 2; + + List<String> lst = new ArrayList<>(); + // create list of string + for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){ + lst.add(charSeq.toString()); + } + + assert lst.contains("Local"); + assert lst.contains("Yarn"); + } + + private static void verifyYarnOnlyMultiRuntimeJobSubmissionParams( + final AvroYarnJobSubmissionParameters jobSubmissionParameters, + final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) { + final AvroAppSubmissionParameters sharedAppSubmissionParams = + appSubmissionParameters.getSharedAppSubmissionParameters(); + + final AvroJobSubmissionParameters sharedJobSubmissionParams = + jobSubmissionParameters.getSharedJobSubmissionParameters(); + + assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP; + assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP); + assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); + assert appSubmissionParameters.getLocalRuntimeAppParameters() == null; + assert appSubmissionParameters.getYarnRuntimeAppParameters() != null; + assert appSubmissionParameters.getYarnRuntimeAppParameters().getDriverRecoveryTimeout() == NUMBER_REP; + assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Yarn"); + assert appSubmissionParameters.getRuntimes().size() == 1; + + List<String> lst = new ArrayList<>(); + // create list of string + for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){ + lst.add(charSeq.toString()); + } + + assert lst.contains("Yarn"); + } + + private static void verifyLocalOnlyMultiRuntimeJobSubmissionParams( + final AvroYarnJobSubmissionParameters jobSubmissionParameters, + final AvroMultiRuntimeAppSubmissionParameters appSubmissionParameters) { + final AvroAppSubmissionParameters sharedAppSubmissionParams = + appSubmissionParameters.getSharedAppSubmissionParameters(); + + final AvroJobSubmissionParameters sharedJobSubmissionParams = + jobSubmissionParameters.getSharedJobSubmissionParameters(); + + assert sharedAppSubmissionParams.getTcpBeginPort() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpRangeCount() == NUMBER_REP; + assert sharedAppSubmissionParams.getTcpTryCount() == NUMBER_REP; + assert sharedJobSubmissionParams.getJobId().toString().equals(STRING_REP); + assert sharedJobSubmissionParams.getJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getDfsJobSubmissionFolder().toString().equals(STRING_REP); + assert jobSubmissionParameters.getJobSubmissionDirectoryPrefix().toString().equals(STRING_REP); + assert appSubmissionParameters.getLocalRuntimeAppParameters() != null; + assert appSubmissionParameters. + getLocalRuntimeAppParameters().getMaxNumberOfConcurrentEvaluators() == NUMBER_REP; + assert appSubmissionParameters.getYarnRuntimeAppParameters() == null; + assert appSubmissionParameters.getDefaultRuntimeName().toString().equals("Local"); + assert appSubmissionParameters.getRuntimes().size() == 1; + + List<String> lst = new ArrayList<>(); + // create list of string + for (CharSequence charSeq : appSubmissionParameters.getRuntimes()){ + lst.add(charSeq.toString()); + } + + assert lst.contains("Local"); + } }
