This is an automated email from the ASF dual-hosted git repository.
ztang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 4f5e01c SUBMARINE-356. ParameterProto should support getOptionValue
method
4f5e01c is described below
commit 4f5e01c4b475b1657504b6dc9390f869502f5459
Author: Zac Zhou <[email protected]>
AuthorDate: Tue Jan 21 17:27:13 2020 +0800
SUBMARINE-356. ParameterProto should support getOptionValue method
### What is this PR for?
To enable yaml configuration, the method of getOptionValue in
ParameterHolder is used to keep job configuration. ParameterProto should
support this method as well.
### What type of PR is it?
Bug Fix
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-356
### How should this be tested?
https://travis-ci.org/yuanzac/hadoop-submarine/builds/639866700?utm_source=github_status&utm_medium=notification
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Zac Zhou <[email protected]>
Closes #161 from yuanzac/topic/SUBMARINE-356 and squashes the following
commits:
b984967 [Zac Zhou] SUBMARINE-356. ParameterProto should support
getOptionValue method.
---
.../client/cli/param/ParametersHolder.java | 28 +++++++++
.../submarine/client/cli/remote/ClientProto.java | 52 ++++++++++++++++
.../submarine/client/cli/runjob/RunJobCli.java | 1 +
.../src/main/proto/SubmarineServerProtocol.proto | 18 ++++++
.../server/rpc/SubmarineRpcServerProto.java | 70 ++++++++++++++++++++++
.../apache/submarine/server/rpc/MockRpcServer.java | 23 ++++++-
6 files changed, 191 insertions(+), 1 deletion(-)
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
index 0c1c21e..44891f7 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
@@ -461,4 +461,32 @@ public final class ParametersHolder implements Parameter {
this.parameters = parameters;
return this;
}
+
+ public CommandLine getParsedCommandLine() {
+ return parsedCommandLine;
+ }
+
+ public Parameter setParsedCommandLine(CommandLine parsedCommandLine) {
+ this.parsedCommandLine = parsedCommandLine;
+ return this;
+ }
+
+ public Map<String, String> getYamlStringConfigs() {
+ return yamlStringConfigs;
+ }
+
+ public Parameter setYamlStringConfigs(Map<String, String> yamlStringConfigs)
{
+ this.yamlStringConfigs = yamlStringConfigs;
+ return this;
+ }
+
+ public Map<String, List<String>> getYamlListConfigs() {
+ return yamlListConfigs;
+ }
+
+ public Parameter setYamlListConfigs(
+ Map<String, List<String>> yamlListConfigs) {
+ this.yamlListConfigs = yamlListConfigs;
+ return this;
+ }
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
index c531978..dedb64f 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
@@ -19,10 +19,13 @@
package org.apache.submarine.client.cli.remote;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.submarine.client.cli.CliUtils;
import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.ParametersHolder;
import org.apache.submarine.client.cli.param.Quicklink;
import org.apache.submarine.client.cli.param.RunParameters;
import org.apache.submarine.client.cli.param.ShowJobParameters;
@@ -31,7 +34,10 @@ import
org.apache.submarine.client.cli.param.runjob.RunJobParameters;
import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
import org.apache.submarine.client.cli.runjob.RoleParameters;
import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.CommandLineProto;
+import org.apache.submarine.commons.rpc.ListOfString;
import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.OptionProto;
import org.apache.submarine.commons.rpc.ParameterProto;
import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
import org.apache.submarine.commons.rpc.QuicklinkProto;
@@ -45,6 +51,7 @@ import org.apache.submarine.commons.runtime.param.Parameter;
import org.apache.submarine.commons.runtime.resource.ResourceUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -63,6 +70,7 @@ public class ClientProto {
// Handle tensorflow job parameters
proto = convertTensorFlowRunJobToParameterProto(parameters,
rpcContext);
}
+ setCommandLineYamlConfig((ParametersHolder) parameters, proto);
} else if (baseParameters instanceof ShowJobParameters) {
// Handle show job parameters
proto = convertShowJobToParameterProto(parameters, rpcContext);
@@ -70,6 +78,30 @@ public class ClientProto {
return proto;
}
+ private static ParameterProto setCommandLineYamlConfig(
+ ParametersHolder parameters, ParameterProto proto) {
+ CommandLineProto commandLineProto =
+ convertCommandLineToCommandLineProto(parameters
+ .getParsedCommandLine());
+ proto = proto.toBuilder()
+ .setCommandLine(commandLineProto)
+ .putAllYamlStringConfigs(parameters.getYamlStringConfigs())
+ .putAllYamlListConfigs(
+ covertYamlListConfigs(parameters.getYamlListConfigs())).build();
+ return proto;
+ }
+
+ public static Map<String, ListOfString> covertYamlListConfigs(
+ Map<String, List<String>> yamlListConfigs) {
+ Map<String, ListOfString> map = new HashMap<>();
+ for(Map.Entry<String, List<String>> entry : yamlListConfigs.entrySet()) {
+ ListOfString value =
+ ListOfString.newBuilder().addAllValues(entry.getValue()).build();
+ map.put(entry.getKey(), value);
+ }
+ return map;
+ }
+
public static ParameterProto convertPyTorchRunJobToParameterProto(
Parameter parameters, RpcContext rpcContext) {
PyTorchRunJobParameterProto pytorchProto =
@@ -99,14 +131,34 @@ public class ClientProto {
.setTensorBoardParameter(convertRoleParametersToRoleParameterProto(
tensorFlowRunJobParameters.getTensorBoardParameters()))
.build();
+ CommandLineProto commandLineProto =
+ convertCommandLineToCommandLineProto(((ParametersHolder)parameters)
+ .getParsedCommandLine());
ParameterProto parameterProto = ParameterProto.newBuilder()
.setTensorflowRunJobParameter(tfProto)
.setFramework(parameters.getFramework().getValue())
+ .setCommandLine(commandLineProto)
.putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap())
.build();
return parameterProto;
}
+ public static CommandLineProto convertCommandLineToCommandLineProto(
+ CommandLine parsedCommandLine) {
+ List<OptionProto> optionProtos = new ArrayList<>();
+
+ for (Option option : parsedCommandLine.getOptions()) {
+ OptionProto optionProto = OptionProto.newBuilder()
+ .setOpt(option.getOpt())
+ .addAllValues(option.getValuesList()).build();
+ optionProtos.add(optionProto);
+ }
+
+ CommandLineProto commandLineProto =
+ CommandLineProto.newBuilder().addAllOptions(optionProtos).build();
+ return commandLineProto;
+ }
+
public static RunParameterProto convertParameterToRunParametersProto(
Parameter parameters) {
RunJobParameters runJobParameter =
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
index 76d1c2b..69426d5 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
@@ -321,6 +321,7 @@ public class RunJobCli extends AbstractCli {
parseCommandLineAndGetRunJobParameters(args);
ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
+ LOG.info("Submarine job is submitted, the job id is " + applicationId);
RunJobParameters parameters =
(RunJobParameters) parametersHolder.getParameters();
storeJobInformation(parameters, applicationId, args);
diff --git
a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
index 27d88a3..537ce59 100644
--- a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
+++ b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
@@ -60,8 +60,16 @@ message ParameterProto {
TensorFlowRunJobParameterProto tensorflow_run_job_parameter = 3;
ShowJobParameterProto show_job_parameter = 4;
map<string, string> submarine_job_config_map = 5;
+ CommandLineProto command_line = 6;
+ map<string, string> yaml_string_configs = 7;
+ map<string, ListOfString> yaml_list_configs = 8;
}
+message ListOfString {
+ repeated string values = 1;
+}
+
+
message ResourceProto {
map<string, int64> resource_map = 1;
}
@@ -127,3 +135,13 @@ message ApplicationIdProto {
// One corner of the rectangle.
string application_id = 1;
}
+
+message CommandLineProto {
+ repeated OptionProto options = 1;
+}
+
+message OptionProto {
+ string opt = 1;
+ string long_opt = 2;
+ repeated string values = 3;
+}
diff --git
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
index 4c90cf2..c68db1e 100644
---
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
+++
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
@@ -19,6 +19,8 @@
package org.apache.submarine.server.rpc;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -32,7 +34,10 @@ import
org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
import org.apache.submarine.client.cli.remote.RpcContext;
import org.apache.submarine.client.cli.runjob.RoleParameters;
import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.CommandLineProto;
+import org.apache.submarine.commons.rpc.ListOfString;
import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.OptionProto;
import org.apache.submarine.commons.rpc.ParameterProto;
import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
import org.apache.submarine.commons.rpc.QuicklinkProto;
@@ -48,8 +53,12 @@ import
org.apache.submarine.commons.runtime.api.TensorFlowRole;
import org.apache.submarine.commons.runtime.param.Parameter;
import org.apache.submarine.commons.runtime.resource.ResourceUtils;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class SubmarineRpcServerProto {
@@ -63,9 +72,70 @@ public class SubmarineRpcServerProto {
} else if (parameterProto.hasShowJobParameter()) {
parameter = convertParameterProtoToShowJob(parameterProto);
}
+ setCommandLineYamlConfigIfNeeded(parameter, parameterProto);
return parameter;
}
+ public static void setCommandLineYamlConfigIfNeeded(
+ Parameter parameter, ParameterProto parameterProto) {
+ if(parameter instanceof ParametersHolder) {
+ ParametersHolder parametersHolder = ((ParametersHolder) parameter);
+ CommandLine commandLine = convertCommandLineProtoToCommandLine(
+ parameterProto.getCommandLine());
+ parametersHolder.setParsedCommandLine(commandLine);
+ parametersHolder.setYamlStringConfigs(
+ parameterProto.getYamlStringConfigsMap());
+ parametersHolder.setYamlListConfigs(
+ covertYamlListConfigs(parameterProto.getYamlListConfigsMap()));
+ }
+ }
+
+ public static Map<String, List<String>> covertYamlListConfigs(
+ Map<String, ListOfString> yamlListConfigs) {
+ Map<String, List<String>> map = new HashMap<>();
+ for(Map.Entry<String, ListOfString> entry : yamlListConfigs.entrySet()) {
+ List<String> value =
+ entry.getValue().getValuesList();
+ map.put(entry.getKey(), value);
+ }
+ return map;
+ }
+
+ public static CommandLine convertCommandLineProtoToCommandLine(
+ CommandLineProto commandLineProto) {
+ CommandLine commandLine;
+ Class<CommandLine> clz = CommandLine.class;
+ try {
+ Constructor<CommandLine> c = clz.getDeclaredConstructor();
+ c.setAccessible(true);
+ commandLine = c.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ for (OptionProto optionProto : commandLineProto.getOptionsList()) {
+ Option option = new Option(optionProto.getOpt(), "");
+ try {
+ Class optionClass = Option.class;
+ Method add = optionClass.getDeclaredMethod("add", String.class);
+ add.setAccessible(true);
+ for(String value : optionProto.getValuesList()) {
+ add.invoke(option, value);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e.getCause());
+ }
+ try {
+ Method getOption = clz.getDeclaredMethod("addOption", Option.class);
+ getOption.setAccessible(true);
+ getOption.invoke(commandLine, option);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ return commandLine;
+ }
+
public static Parameter convertParameterProtoToPyTorchRunJob(
ParameterProto parameterProto) {
Framework framework =
Framework.parseByValue(parameterProto.getFramework());
diff --git
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
index 13858c6..df08233 100644
---
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
+++
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
@@ -19,11 +19,16 @@ package org.apache.submarine.server.rpc;
import io.grpc.ServerBuilder;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.submarine.client.cli.CliConstants;
import org.apache.submarine.client.cli.CliUtils;
+import org.apache.submarine.client.cli.param.ParametersHolder;
+import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
import org.apache.submarine.commons.runtime.ClientContext;
import org.apache.submarine.commons.runtime.param.Parameter;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.submarine.commons.utils.SubmarineConfVars;
+import org.junit.Assert;
import java.io.IOException;
@@ -41,11 +46,27 @@ public class MockRpcServer extends SubmarineRpcServer {
extends SubmarineServerRpcService {
@Override
protected ApplicationId run(ClientContext clientContext,
- Parameter parameter) {
+ Parameter parameter) throws YarnException {
+ ParametersHolder parametersHolder = (ParametersHolder) parameter;
+ // Add protobuf conversion check
+ checkProtoConversion(parametersHolder);
return CliUtils.fromString("application_1_123");
}
}
+ private static void checkProtoConversion(ParametersHolder parametersHolder)
throws YarnException {
+ if(parametersHolder.getParameters()
+ instanceof TensorFlowRunJobParameters) {
+ TensorFlowRunJobParameters tensorParameter =
+ (TensorFlowRunJobParameters) parametersHolder.getParameters();
+ if (tensorParameter.getNumWorkers() != 0) {
+ Assert.assertEquals(Long.valueOf(tensorParameter.getNumWorkers()),
+ Long.valueOf(
+ parametersHolder.getOptionValue(CliConstants.N_WORKERS)));
+ }
+ }
+ }
+
public static void main(String[] args) throws Exception {
SubmarineRpcServer server = startRpcServer();
server.blockUntilShutdown();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]