This is an automated email from the ASF dual-hosted git repository.

ztang pushed a commit to annotated tag release-0.3.0-RC0
in repository https://gitbox.apache.org/repos/asf/submarine.git

commit 2baa5c479c51ca3cde302233b967f3a179e61c51
Author: Zac Zhou <[email protected]>
AuthorDate: Tue Jan 21 17:27:13 2020 +0800

    SUBMARINE-356. ParameterProto should support getOptionValue method
    
    ### What is this PR for?
    To enable yaml configuration, the method of getOptionValue in 
ParameterHolder is used to keep job configuration.  ParameterProto should 
support this method as well.
    
    ### What type of PR is it?
    Bug Fix
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-356
    
    ### How should this be tested?
    
https://travis-ci.org/yuanzac/hadoop-submarine/builds/639866700?utm_source=github_status&utm_medium=notification
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Zac Zhou <[email protected]>
    
    Closes #161 from yuanzac/topic/SUBMARINE-356 and squashes the following 
commits:
    
    b984967 [Zac Zhou] SUBMARINE-356. ParameterProto should support 
getOptionValue method.
    
    (cherry picked from commit 4f5e01c4b475b1657504b6dc9390f869502f5459)
---
 .../client/cli/param/ParametersHolder.java         | 28 +++++++++
 .../submarine/client/cli/remote/ClientProto.java   | 52 ++++++++++++++++
 .../submarine/client/cli/runjob/RunJobCli.java     |  1 +
 .../src/main/proto/SubmarineServerProtocol.proto   | 18 ++++++
 .../server/rpc/SubmarineRpcServerProto.java        | 70 ++++++++++++++++++++++
 .../apache/submarine/server/rpc/MockRpcServer.java | 23 ++++++-
 6 files changed, 191 insertions(+), 1 deletion(-)

diff --git 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
index 0c1c21e..44891f7 100644
--- 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
+++ 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
@@ -461,4 +461,32 @@ public final class ParametersHolder implements Parameter {
     this.parameters = parameters;
     return this;
   }
+
+  public CommandLine getParsedCommandLine() {
+    return parsedCommandLine;
+  }
+
+  public Parameter setParsedCommandLine(CommandLine parsedCommandLine) {
+    this.parsedCommandLine = parsedCommandLine;
+    return this;
+  }
+
+  public Map<String, String> getYamlStringConfigs() {
+    return yamlStringConfigs;
+  }
+
+  public Parameter setYamlStringConfigs(Map<String, String> yamlStringConfigs) 
{
+    this.yamlStringConfigs = yamlStringConfigs;
+    return this;
+  }
+
+  public Map<String, List<String>> getYamlListConfigs() {
+    return yamlListConfigs;
+  }
+
+  public Parameter setYamlListConfigs(
+      Map<String, List<String>> yamlListConfigs) {
+    this.yamlListConfigs = yamlListConfigs;
+    return this;
+  }
 }
diff --git 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
index c531978..dedb64f 100644
--- 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
+++ 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
@@ -19,10 +19,13 @@
 
 package org.apache.submarine.client.cli.remote;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.submarine.client.cli.CliUtils;
 import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.ParametersHolder;
 import org.apache.submarine.client.cli.param.Quicklink;
 import org.apache.submarine.client.cli.param.RunParameters;
 import org.apache.submarine.client.cli.param.ShowJobParameters;
@@ -31,7 +34,10 @@ import 
org.apache.submarine.client.cli.param.runjob.RunJobParameters;
 import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
 import org.apache.submarine.client.cli.runjob.RoleParameters;
 import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.CommandLineProto;
+import org.apache.submarine.commons.rpc.ListOfString;
 import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.OptionProto;
 import org.apache.submarine.commons.rpc.ParameterProto;
 import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
 import org.apache.submarine.commons.rpc.QuicklinkProto;
@@ -45,6 +51,7 @@ import org.apache.submarine.commons.runtime.param.Parameter;
 import org.apache.submarine.commons.runtime.resource.ResourceUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -63,6 +70,7 @@ public class ClientProto {
         // Handle tensorflow job parameters
         proto = convertTensorFlowRunJobToParameterProto(parameters, 
rpcContext);
       }
+      setCommandLineYamlConfig((ParametersHolder) parameters, proto);
     } else if (baseParameters instanceof ShowJobParameters) {
       // Handle show job parameters
       proto = convertShowJobToParameterProto(parameters, rpcContext);
@@ -70,6 +78,30 @@ public class ClientProto {
     return proto;
   }
 
+  private static ParameterProto setCommandLineYamlConfig(
+      ParametersHolder parameters, ParameterProto proto) {
+    CommandLineProto commandLineProto =
+        convertCommandLineToCommandLineProto(parameters
+            .getParsedCommandLine());
+    proto = proto.toBuilder()
+        .setCommandLine(commandLineProto)
+        .putAllYamlStringConfigs(parameters.getYamlStringConfigs())
+        .putAllYamlListConfigs(
+            covertYamlListConfigs(parameters.getYamlListConfigs())).build();
+    return proto;
+  }
+
+  public static Map<String, ListOfString> covertYamlListConfigs(
+      Map<String, List<String>> yamlListConfigs) {
+    Map<String, ListOfString> map = new HashMap<>();
+    for(Map.Entry<String, List<String>> entry : yamlListConfigs.entrySet()) {
+      ListOfString value =
+          ListOfString.newBuilder().addAllValues(entry.getValue()).build();
+      map.put(entry.getKey(), value);
+    }
+    return map;
+  }
+
   public static ParameterProto convertPyTorchRunJobToParameterProto(
       Parameter parameters, RpcContext rpcContext) {
     PyTorchRunJobParameterProto pytorchProto =
@@ -99,14 +131,34 @@ public class ClientProto {
             .setTensorBoardParameter(convertRoleParametersToRoleParameterProto(
                 tensorFlowRunJobParameters.getTensorBoardParameters()))
             .build();
+    CommandLineProto commandLineProto =
+        convertCommandLineToCommandLineProto(((ParametersHolder)parameters)
+            .getParsedCommandLine());
     ParameterProto parameterProto = ParameterProto.newBuilder()
         .setTensorflowRunJobParameter(tfProto)
         .setFramework(parameters.getFramework().getValue())
+        .setCommandLine(commandLineProto)
         .putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap())
         .build();
     return parameterProto;
   }
 
+  public static CommandLineProto convertCommandLineToCommandLineProto(
+      CommandLine parsedCommandLine) {
+    List<OptionProto> optionProtos = new ArrayList<>();
+
+    for (Option option : parsedCommandLine.getOptions()) {
+      OptionProto optionProto = OptionProto.newBuilder()
+          .setOpt(option.getOpt())
+          .addAllValues(option.getValuesList()).build();
+      optionProtos.add(optionProto);
+    }
+
+    CommandLineProto commandLineProto =
+        CommandLineProto.newBuilder().addAllOptions(optionProtos).build();
+    return commandLineProto;
+  }
+
   public static RunParameterProto convertParameterToRunParametersProto(
       Parameter parameters) {
     RunJobParameters runJobParameter =
diff --git 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
index 76d1c2b..69426d5 100644
--- 
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
+++ 
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
@@ -321,6 +321,7 @@ public class RunJobCli extends AbstractCli {
 
     parseCommandLineAndGetRunJobParameters(args);
     ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
+    LOG.info("Submarine job is submitted, the job id is " + applicationId);
     RunJobParameters parameters =
         (RunJobParameters) parametersHolder.getParameters();
     storeJobInformation(parameters, applicationId, args);
diff --git 
a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto 
b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
index 27d88a3..537ce59 100644
--- a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
+++ b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
@@ -60,8 +60,16 @@ message ParameterProto {
   TensorFlowRunJobParameterProto tensorflow_run_job_parameter = 3;
   ShowJobParameterProto show_job_parameter = 4;
   map<string, string> submarine_job_config_map = 5;
+  CommandLineProto command_line = 6;
+  map<string, string> yaml_string_configs = 7;
+  map<string, ListOfString> yaml_list_configs = 8;
 }
 
+message ListOfString {
+   repeated string values = 1;
+}
+
+
 message ResourceProto {
   map<string, int64> resource_map = 1;
 }
@@ -127,3 +135,13 @@ message ApplicationIdProto {
   // One corner of the rectangle.
   string application_id = 1;
 }
+
+message CommandLineProto {
+  repeated OptionProto options = 1;
+}
+
+message OptionProto {
+  string opt = 1;
+  string long_opt = 2;
+  repeated string values = 3;
+}
diff --git 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
index 4c90cf2..c68db1e 100644
--- 
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
+++ 
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
@@ -19,6 +19,8 @@
 
 package org.apache.submarine.server.rpc;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -32,7 +34,10 @@ import 
org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
 import org.apache.submarine.client.cli.remote.RpcContext;
 import org.apache.submarine.client.cli.runjob.RoleParameters;
 import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.CommandLineProto;
+import org.apache.submarine.commons.rpc.ListOfString;
 import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.OptionProto;
 import org.apache.submarine.commons.rpc.ParameterProto;
 import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
 import org.apache.submarine.commons.rpc.QuicklinkProto;
@@ -48,8 +53,12 @@ import 
org.apache.submarine.commons.runtime.api.TensorFlowRole;
 import org.apache.submarine.commons.runtime.param.Parameter;
 import org.apache.submarine.commons.runtime.resource.ResourceUtils;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SubmarineRpcServerProto {
 
@@ -63,9 +72,70 @@ public class SubmarineRpcServerProto {
     } else if (parameterProto.hasShowJobParameter()) {
       parameter = convertParameterProtoToShowJob(parameterProto);
     }
+    setCommandLineYamlConfigIfNeeded(parameter, parameterProto);
     return parameter;
   }
 
+  public static void setCommandLineYamlConfigIfNeeded(
+      Parameter parameter, ParameterProto parameterProto) {
+    if(parameter instanceof ParametersHolder) {
+      ParametersHolder parametersHolder = ((ParametersHolder) parameter);
+      CommandLine commandLine = convertCommandLineProtoToCommandLine(
+          parameterProto.getCommandLine());
+      parametersHolder.setParsedCommandLine(commandLine);
+      parametersHolder.setYamlStringConfigs(
+          parameterProto.getYamlStringConfigsMap());
+      parametersHolder.setYamlListConfigs(
+          covertYamlListConfigs(parameterProto.getYamlListConfigsMap()));
+    }
+  }
+
+  public static Map<String, List<String>> covertYamlListConfigs(
+      Map<String, ListOfString> yamlListConfigs) {
+    Map<String, List<String>> map = new HashMap<>();
+    for(Map.Entry<String, ListOfString> entry : yamlListConfigs.entrySet()) {
+      List<String> value =
+          entry.getValue().getValuesList();
+      map.put(entry.getKey(), value);
+    }
+    return map;
+  }
+
+  public static CommandLine convertCommandLineProtoToCommandLine(
+      CommandLineProto commandLineProto) {
+    CommandLine commandLine;
+    Class<CommandLine> clz = CommandLine.class;
+    try {
+      Constructor<CommandLine> c = clz.getDeclaredConstructor();
+      c.setAccessible(true);
+      commandLine = c.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
+
+    for (OptionProto optionProto : commandLineProto.getOptionsList()) {
+      Option option = new Option(optionProto.getOpt(), "");
+      try {
+        Class optionClass = Option.class;
+        Method add = optionClass.getDeclaredMethod("add", String.class);
+        add.setAccessible(true);
+        for(String value : optionProto.getValuesList()) {
+          add.invoke(option, value);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e.getMessage(), e.getCause());
+      }
+      try {
+        Method getOption = clz.getDeclaredMethod("addOption", Option.class);
+        getOption.setAccessible(true);
+        getOption.invoke(commandLine, option);
+      } catch (Exception e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
+    }
+    return commandLine;
+  }
+
   public static Parameter convertParameterProtoToPyTorchRunJob(
       ParameterProto parameterProto) {
     Framework framework = 
Framework.parseByValue(parameterProto.getFramework());
diff --git 
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
 
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
index 13858c6..df08233 100644
--- 
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
+++ 
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
@@ -19,11 +19,16 @@ package org.apache.submarine.server.rpc;
 
 import io.grpc.ServerBuilder;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.submarine.client.cli.CliConstants;
 import org.apache.submarine.client.cli.CliUtils;
+import org.apache.submarine.client.cli.param.ParametersHolder;
+import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
 import org.apache.submarine.commons.runtime.ClientContext;
 import org.apache.submarine.commons.runtime.param.Parameter;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
 import org.apache.submarine.commons.utils.SubmarineConfVars;
+import org.junit.Assert;
 
 import java.io.IOException;
 
@@ -41,11 +46,27 @@ public class MockRpcServer extends SubmarineRpcServer {
       extends SubmarineServerRpcService {
     @Override
     protected ApplicationId run(ClientContext clientContext,
-        Parameter parameter) {
+        Parameter parameter) throws YarnException {
+      ParametersHolder parametersHolder = (ParametersHolder) parameter;
+      // Add protobuf conversion check
+      checkProtoConversion(parametersHolder);
       return CliUtils.fromString("application_1_123");
     }
   }
 
+  private static void checkProtoConversion(ParametersHolder parametersHolder) 
throws YarnException {
+    if(parametersHolder.getParameters()
+        instanceof TensorFlowRunJobParameters) {
+      TensorFlowRunJobParameters tensorParameter =
+          (TensorFlowRunJobParameters) parametersHolder.getParameters();
+      if (tensorParameter.getNumWorkers() != 0) {
+        Assert.assertEquals(Long.valueOf(tensorParameter.getNumWorkers()),
+            Long.valueOf(
+                parametersHolder.getOptionValue(CliConstants.N_WORKERS)));
+      }
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     SubmarineRpcServer server = startRpcServer();
     server.blockUntilShutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to