This is an automated email from the ASF dual-hosted git repository. ztang pushed a commit to annotated tag release-0.3.0-RC0 in repository https://gitbox.apache.org/repos/asf/submarine.git
commit 2baa5c479c51ca3cde302233b967f3a179e61c51 Author: Zac Zhou <[email protected]> AuthorDate: Tue Jan 21 17:27:13 2020 +0800 SUBMARINE-356. ParameterProto should support getOptionValue method ### What is this PR for? To enable yaml configuration, the method of getOptionValue in ParameterHolder is used to keep job configuration. ParameterProto should support this method as well. ### What type of PR is it? Bug Fix ### What is the Jira issue? https://issues.apache.org/jira/browse/SUBMARINE-356 ### How should this be tested? https://travis-ci.org/yuanzac/hadoop-submarine/builds/639866700?utm_source=github_status&utm_medium=notification ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Zac Zhou <[email protected]> Closes #161 from yuanzac/topic/SUBMARINE-356 and squashes the following commits: b984967 [Zac Zhou] SUBMARINE-356. ParameterProto should support getOptionValue method. (cherry picked from commit 4f5e01c4b475b1657504b6dc9390f869502f5459) --- .../client/cli/param/ParametersHolder.java | 28 +++++++++ .../submarine/client/cli/remote/ClientProto.java | 52 ++++++++++++++++ .../submarine/client/cli/runjob/RunJobCli.java | 1 + .../src/main/proto/SubmarineServerProtocol.proto | 18 ++++++ .../server/rpc/SubmarineRpcServerProto.java | 70 ++++++++++++++++++++++ .../apache/submarine/server/rpc/MockRpcServer.java | 23 ++++++- 6 files changed, 191 insertions(+), 1 deletion(-) diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java index 0c1c21e..44891f7 100644 --- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java +++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java @@ -461,4 +461,32 @@ public final class ParametersHolder implements Parameter { this.parameters = parameters; return this; } + + public CommandLine getParsedCommandLine() { + return parsedCommandLine; + } + + public Parameter setParsedCommandLine(CommandLine parsedCommandLine) { + this.parsedCommandLine = parsedCommandLine; + return this; + } + + public Map<String, String> getYamlStringConfigs() { + return yamlStringConfigs; + } + + public Parameter setYamlStringConfigs(Map<String, String> yamlStringConfigs) { + this.yamlStringConfigs = yamlStringConfigs; + return this; + } + + public Map<String, List<String>> getYamlListConfigs() { + return yamlListConfigs; + } + + public Parameter setYamlListConfigs( + Map<String, List<String>> yamlListConfigs) { + this.yamlListConfigs = yamlListConfigs; + return this; + } } diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java index c531978..dedb64f 100644 --- a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java +++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java @@ -19,10 +19,13 @@ package org.apache.submarine.client.cli.remote; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.submarine.client.cli.CliUtils; import org.apache.submarine.client.cli.param.Localization; +import org.apache.submarine.client.cli.param.ParametersHolder; import org.apache.submarine.client.cli.param.Quicklink; import org.apache.submarine.client.cli.param.RunParameters; import org.apache.submarine.client.cli.param.ShowJobParameters; @@ -31,7 +34,10 @@ import org.apache.submarine.client.cli.param.runjob.RunJobParameters; import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters; import org.apache.submarine.client.cli.runjob.RoleParameters; import org.apache.submarine.commons.rpc.ApplicationIdProto; +import org.apache.submarine.commons.rpc.CommandLineProto; +import org.apache.submarine.commons.rpc.ListOfString; import org.apache.submarine.commons.rpc.LocalizationProto; +import org.apache.submarine.commons.rpc.OptionProto; import org.apache.submarine.commons.rpc.ParameterProto; import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto; import org.apache.submarine.commons.rpc.QuicklinkProto; @@ -45,6 +51,7 @@ import org.apache.submarine.commons.runtime.param.Parameter; import org.apache.submarine.commons.runtime.resource.ResourceUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,6 +70,7 @@ public class ClientProto { // Handle tensorflow job parameters proto = convertTensorFlowRunJobToParameterProto(parameters, rpcContext); } + setCommandLineYamlConfig((ParametersHolder) parameters, proto); } else if (baseParameters instanceof ShowJobParameters) { // Handle show job parameters proto = convertShowJobToParameterProto(parameters, rpcContext); @@ -70,6 +78,30 @@ public class ClientProto { return proto; } + private static ParameterProto setCommandLineYamlConfig( + ParametersHolder parameters, ParameterProto proto) { + CommandLineProto commandLineProto = + convertCommandLineToCommandLineProto(parameters + .getParsedCommandLine()); + proto = proto.toBuilder() + .setCommandLine(commandLineProto) + .putAllYamlStringConfigs(parameters.getYamlStringConfigs()) + .putAllYamlListConfigs( + covertYamlListConfigs(parameters.getYamlListConfigs())).build(); + return proto; + } + + public static Map<String, ListOfString> covertYamlListConfigs( + Map<String, List<String>> yamlListConfigs) { + Map<String, ListOfString> map = new HashMap<>(); + for(Map.Entry<String, List<String>> entry : yamlListConfigs.entrySet()) { + ListOfString value = + ListOfString.newBuilder().addAllValues(entry.getValue()).build(); + map.put(entry.getKey(), value); + } + return map; + } + public static ParameterProto convertPyTorchRunJobToParameterProto( Parameter parameters, RpcContext rpcContext) { PyTorchRunJobParameterProto pytorchProto = @@ -99,14 +131,34 @@ public class ClientProto { .setTensorBoardParameter(convertRoleParametersToRoleParameterProto( tensorFlowRunJobParameters.getTensorBoardParameters())) .build(); + CommandLineProto commandLineProto = + convertCommandLineToCommandLineProto(((ParametersHolder)parameters) + .getParsedCommandLine()); ParameterProto parameterProto = ParameterProto.newBuilder() .setTensorflowRunJobParameter(tfProto) .setFramework(parameters.getFramework().getValue()) + .setCommandLine(commandLineProto) .putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap()) .build(); return parameterProto; } + public static CommandLineProto convertCommandLineToCommandLineProto( + CommandLine parsedCommandLine) { + List<OptionProto> optionProtos = new ArrayList<>(); + + for (Option option : parsedCommandLine.getOptions()) { + OptionProto optionProto = OptionProto.newBuilder() + .setOpt(option.getOpt()) + .addAllValues(option.getValuesList()).build(); + optionProtos.add(optionProto); + } + + CommandLineProto commandLineProto = + CommandLineProto.newBuilder().addAllOptions(optionProtos).build(); + return commandLineProto; + } + public static RunParameterProto convertParameterToRunParametersProto( Parameter parameters) { RunJobParameters runJobParameter = diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java index 76d1c2b..69426d5 100644 --- a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java +++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java @@ -321,6 +321,7 @@ public class RunJobCli extends AbstractCli { parseCommandLineAndGetRunJobParameters(args); ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder); + LOG.info("Submarine job is submitted, the job id is " + applicationId); RunJobParameters parameters = (RunJobParameters) parametersHolder.getParameters(); storeJobInformation(parameters, applicationId, args); diff --git a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto index 27d88a3..537ce59 100644 --- a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto +++ b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto @@ -60,8 +60,16 @@ message ParameterProto { TensorFlowRunJobParameterProto tensorflow_run_job_parameter = 3; ShowJobParameterProto show_job_parameter = 4; map<string, string> submarine_job_config_map = 5; + CommandLineProto command_line = 6; + map<string, string> yaml_string_configs = 7; + map<string, ListOfString> yaml_list_configs = 8; } +message ListOfString { + repeated string values = 1; +} + + message ResourceProto { map<string, int64> resource_map = 1; } @@ -127,3 +135,13 @@ message ApplicationIdProto { // One corner of the rectangle. string application_id = 1; } + +message CommandLineProto { + repeated OptionProto options = 1; +} + +message OptionProto { + string opt = 1; + string long_opt = 2; + repeated string values = 3; +} diff --git a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java index 4c90cf2..c68db1e 100644 --- a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java +++ b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java @@ -19,6 +19,8 @@ package org.apache.submarine.server.rpc; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -32,7 +34,10 @@ import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters; import org.apache.submarine.client.cli.remote.RpcContext; import org.apache.submarine.client.cli.runjob.RoleParameters; import org.apache.submarine.commons.rpc.ApplicationIdProto; +import org.apache.submarine.commons.rpc.CommandLineProto; +import org.apache.submarine.commons.rpc.ListOfString; import org.apache.submarine.commons.rpc.LocalizationProto; +import org.apache.submarine.commons.rpc.OptionProto; import org.apache.submarine.commons.rpc.ParameterProto; import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto; import org.apache.submarine.commons.rpc.QuicklinkProto; @@ -48,8 +53,12 @@ import org.apache.submarine.commons.runtime.api.TensorFlowRole; import org.apache.submarine.commons.runtime.param.Parameter; import org.apache.submarine.commons.runtime.resource.ResourceUtils; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class SubmarineRpcServerProto { @@ -63,9 +72,70 @@ public class SubmarineRpcServerProto { } else if (parameterProto.hasShowJobParameter()) { parameter = convertParameterProtoToShowJob(parameterProto); } + setCommandLineYamlConfigIfNeeded(parameter, parameterProto); return parameter; } + public static void setCommandLineYamlConfigIfNeeded( + Parameter parameter, ParameterProto parameterProto) { + if(parameter instanceof ParametersHolder) { + ParametersHolder parametersHolder = ((ParametersHolder) parameter); + CommandLine commandLine = convertCommandLineProtoToCommandLine( + parameterProto.getCommandLine()); + parametersHolder.setParsedCommandLine(commandLine); + parametersHolder.setYamlStringConfigs( + parameterProto.getYamlStringConfigsMap()); + parametersHolder.setYamlListConfigs( + covertYamlListConfigs(parameterProto.getYamlListConfigsMap())); + } + } + + public static Map<String, List<String>> covertYamlListConfigs( + Map<String, ListOfString> yamlListConfigs) { + Map<String, List<String>> map = new HashMap<>(); + for(Map.Entry<String, ListOfString> entry : yamlListConfigs.entrySet()) { + List<String> value = + entry.getValue().getValuesList(); + map.put(entry.getKey(), value); + } + return map; + } + + public static CommandLine convertCommandLineProtoToCommandLine( + CommandLineProto commandLineProto) { + CommandLine commandLine; + Class<CommandLine> clz = CommandLine.class; + try { + Constructor<CommandLine> c = clz.getDeclaredConstructor(); + c.setAccessible(true); + commandLine = c.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + for (OptionProto optionProto : commandLineProto.getOptionsList()) { + Option option = new Option(optionProto.getOpt(), ""); + try { + Class optionClass = Option.class; + Method add = optionClass.getDeclaredMethod("add", String.class); + add.setAccessible(true); + for(String value : optionProto.getValuesList()) { + add.invoke(option, value); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e.getCause()); + } + try { + Method getOption = clz.getDeclaredMethod("addOption", Option.class); + getOption.setAccessible(true); + getOption.invoke(commandLine, option); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + return commandLine; + } + public static Parameter convertParameterProtoToPyTorchRunJob( ParameterProto parameterProto) { Framework framework = Framework.parseByValue(parameterProto.getFramework()); diff --git a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java index 13858c6..df08233 100644 --- a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java +++ b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java @@ -19,11 +19,16 @@ package org.apache.submarine.server.rpc; import io.grpc.ServerBuilder; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.submarine.client.cli.CliConstants; import org.apache.submarine.client.cli.CliUtils; +import org.apache.submarine.client.cli.param.ParametersHolder; +import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters; import org.apache.submarine.commons.runtime.ClientContext; import org.apache.submarine.commons.runtime.param.Parameter; import org.apache.submarine.commons.utils.SubmarineConfiguration; import org.apache.submarine.commons.utils.SubmarineConfVars; +import org.junit.Assert; import java.io.IOException; @@ -41,11 +46,27 @@ public class MockRpcServer extends SubmarineRpcServer { extends SubmarineServerRpcService { @Override protected ApplicationId run(ClientContext clientContext, - Parameter parameter) { + Parameter parameter) throws YarnException { + ParametersHolder parametersHolder = (ParametersHolder) parameter; + // Add protobuf conversion check + checkProtoConversion(parametersHolder); return CliUtils.fromString("application_1_123"); } } + private static void checkProtoConversion(ParametersHolder parametersHolder) throws YarnException { + if(parametersHolder.getParameters() + instanceof TensorFlowRunJobParameters) { + TensorFlowRunJobParameters tensorParameter = + (TensorFlowRunJobParameters) parametersHolder.getParameters(); + if (tensorParameter.getNumWorkers() != 0) { + Assert.assertEquals(Long.valueOf(tensorParameter.getNumWorkers()), + Long.valueOf( + parametersHolder.getOptionValue(CliConstants.N_WORKERS))); + } + } + } + public static void main(String[] args) throws Exception { SubmarineRpcServer server = startRpcServer(); server.blockUntilShutdown(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
