This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 45bf826 SUBMARINE-303. Submarine client submit job to submarine
server by grpc.
45bf826 is described below
commit 45bf826104ccf0b065d7b2c981b1b55ddddedb4a
Author: Zac Zhou <[email protected]>
AuthorDate: Mon Dec 2 15:26:41 2019 +0800
SUBMARINE-303. Submarine client submit job to submarine server by grpc.
### What is this PR for?
Submarine server is supposed to manage the job's lifecycle. Submarine
client can submit a job to submarine server by using grpc, which is suitable
for bi-direction communications.
A new configuration, named submarine.server.remote.execution.enabled, is
added. If it is true,
submarine client will submit a job to submarine server instead of running a
job by itself.
### What type of PR is it?
Feature
### Todos
* - Need to add UT in the next PR.
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-303
### How should this be tested?
https://travis-ci.org/yuanzac/hadoop-submarine/builds/618607014?utm_source=github_status&utm_medium=notification
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Zac Zhou <[email protected]>
Closes #117 from yuanzac/topic/grpc and squashes the following commits:
2fef731 [Zac Zhou] Fix UT of spark interpreter.
b3af432 [Zac Zhou] SUBMARINE-303. Submarine client submit job to submarine
server by grpc.
---
.travis.yml | 23 +-
bin/submarine-daemon.sh | 5 +-
conf/submarine-env.sh.template | 9 +-
conf/submarine-site.xml | 2 +-
conf/submarine-site.xml.template | 8 +-
dev-support/mini-submarine/README.md | 21 ++
dev-support/mini-submarine/conf/bootstrap.sh | 12 +-
.../mini-submarine/conf}/submarine-env.sh | 9 +-
dev-support/mini-submarine/conf/submarine-site.xml | 101 +++++++-
.../submarine/run_submarine_mnist_tony_rpc.sh | 65 ++++++
pom.xml | 18 +-
submarine-all/pom.xml | 10 +
submarine-client/pom.xml | 29 +++
.../java/org/apache/submarine/client/cli/Cli.java | 16 +-
.../org/apache/submarine/client/cli/CliUtils.java | 30 +++
.../submarine/client/cli/param/Localization.java | 9 +-
.../client/cli/param/ParametersHolder.java | 31 ++-
.../submarine/client/cli/param/Quicklink.java | 20 ++
.../cli/param/runjob/PyTorchRunJobParameters.java | 2 +-
.../client/cli/param/runjob/RunJobParameters.java | 32 ++-
.../param/runjob/TensorFlowRunJobParameters.java | 22 +-
.../submarine/client/cli/remote/ClientProto.java | 213 +++++++++++++++++
.../client/cli/remote/JobSubmitterRpcImpl.java | 92 ++++++++
.../submarine/client/cli/remote/RpcContext.java | 60 +++++
.../client/cli/remote/RpcRuntimeFactory.java | 58 +++++
.../client/cli/runjob/RoleParameters.java | 12 +-
submarine-commons/commons-rpc/pom.xml | 137 +++++++++++
.../src/main/proto/SubmarineServerProtocol.proto | 129 +++++++++++
.../src/main/resources/log4j.properties | 17 ++
.../submarine/commons/runtime/param/Parameter.java | 4 +
.../commons/runtime/resource/ResourceUtils.java | 25 +-
.../commons/utils/SubmarineConfiguration.java | 11 +-
submarine-commons/pom.xml | 1 +
submarine-dist/src/assembly/distribution.xml | 14 +-
submarine-server/pom.xml | 1 +
submarine-server/server-core/pom.xml | 80 +++++++
.../apache/submarine/server/SubmarineServer.java | 7 +-
.../server/AbstractSubmarineServerTest.java | 3 +-
submarine-server/server-rpc/pom.xml | 111 +++++++++
.../submarine/server/rpc/SubmarineRpcServer.java | 189 +++++++++++++++
.../server/rpc/SubmarineRpcServerProto.java | 256 +++++++++++++++++++++
.../apache/submarine/server/rpc/MockRpcServer.java | 63 +++++
.../submarine/server/rpc/RpcServerTestUtils.java | 72 ++++++
.../submarine/server/rpc/SubmarineRpcClient.java | 100 ++++++++
.../server/rpc/SubmarineRpcServerTest.java | 68 ++++++
.../server-submitter/submitter-yarn/pom.xml | 74 ++++++
.../server-submitter/submitter-yarnservice/pom.xml | 14 +-
submarine-workbench/interpreter/pom.xml | 24 ++
.../interpreter/python-interpreter/pom.xml | 8 +
.../interpreter/spark-interpreter/pom.xml | 14 ++
50 files changed, 2275 insertions(+), 56 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index fcd70f1..46484e1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,11 +41,12 @@ services:
env:
global:
# If you need to compile Phadoop-3.1 or Phadoop-3.2, you need to add
`!submarine-server/server-submitter/submitter-yarnservice` in EXCLUDE_SUBMARINE
- -
EXCLUDE_SUBMARINE="!submarine-all,!submarine-client,!submarine-commons,!submarine-commons/commons-runtime,!submarine-dist,!submarine-server/server-submitter/submitter-yarn,!submarine-server/server-core"
+ -
EXCLUDE_SUBMARINE="!submarine-all,!submarine-client,!submarine-commons,!submarine-commons/commons-runtime,!submarine-dist,!submarine-server/server-submitter/submitter-yarn,!submarine-server/server-core,!submarine-server/server-rpc"
-
EXCLUDE_WORKBENCH="!submarine-workbench,!submarine-workbench/workbench-web"
-
EXCLUDE_INTERPRETER="!submarine-workbench/interpreter,!submarine-workbench/interpreter/interpreter-engine,!submarine-workbench/interpreter/python-interpreter,!submarine-workbench/interpreter/spark-interpreter""
-
EXCLUDE_SUBMODULE_TONY="!submodules/tony,!submodules/tony/tony-mini,!submodules/tony/tony-core,!submodules/tony/tony-proxy,!submodules/tony/tony-portal,!submodules/tony/tony-azkaban,!submodules/tony/tony-cli"
- EXCLUDE_K8S="!submarine-server/server-submitter/submitter-k8s"
+ - EXCLUDE_COMMON_RPC="!submarine-commons/commons-rpc"
before_install:
# maven 3.6.1 (3.6.2 build tony failed!!!)
@@ -65,6 +66,14 @@ before_install:
- mysql -e "GRANT ALL PRIVILEGES ON *.* TO 'metastore_test'@'%';"
- mysql -e "use metastore_test; source ./docs/database/metastore.sql; show
tables;"
- ./dev-support/travis/install_external_dependencies.sh
+ # protobuf 3.10.1
+ - PROTOBUF_VERSION=3.10.1
+ - PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
+ - pushd /home/travis
+ - wget
https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
+ - unzip ${PROTOC_FILENAME}
+ - bin/protoc --version
+ - popd
matrix:
include:
@@ -78,25 +87,25 @@ matrix:
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Build hadoop-2.7" PROFILE="-Phadoop-2.7" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat -am"
MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_PROJECTS=""
+ env: NAME="Build hadoop-2.7" PROFILE="-Phadoop-2.7" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},${EXCLUDE_COMMON_RPC},!submarine-dist\""
TEST_PROJECTS=""
# Build hadoop-2.9(default)
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Build hadoop-2.9" PROFILE="-Phadoop-2.9" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat -am"
MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_PROJECTS=""
+ env: NAME="Build hadoop-2.9" PROFILE="-Phadoop-2.9" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},${EXCLUDE_COMMON_RPC},!submarine-dist\""
TEST_PROJECTS=""
# Build hadoop-3.1
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Build hadoop-3.1" PROFILE="-Phadoop-3.1" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat -am"
MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_PROJECTS=""
+ env: NAME="Build hadoop-3.1" PROFILE="-Phadoop-3.1" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},${EXCLUDE_COMMON_RPC},!submarine-dist\""
TEST_PROJECTS=""
# Build hadoop-3.2
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Build hadoop-3.2" PROFILE="-Phadoop-3.2" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat -am"
MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_PROJECTS=""
+ env: NAME="Build hadoop-3.2" PROFILE="-Phadoop-3.2" BUILD_FLAG="clean
package install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},!submarine-dist\""
TEST_MODULES="-pl
\"${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER},${EXCLUDE_COMMON_RPC},!submarine-dist\""
TEST_PROJECTS=""
# Build workbench-web
- language: node_js
@@ -144,7 +153,7 @@ matrix:
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Test submarine distribution" PROFILE="-Phadoop-2.9"
BUILD_FLAG="clean package install -DskipTests" TEST_FLAG="test -DskipRat -am"
MODULES="-pl ${EXCLUDE_K8S}" TEST_MODULES="-pl ${EXCLUDE_K8S}" TEST_PROJECTS=""
+ env: NAME="Test submarine distribution" PROFILE="-Phadoop-2.9"
BUILD_FLAG="clean package install -DskipTests" TEST_FLAG="test -DskipRat"
MODULES="-pl ${EXCLUDE_K8S}" TEST_MODULES="-pl
${EXCLUDE_K8S},${EXCLUDE_COMMON_RPC}" TEST_PROJECTS=""
# Test submarine web-ng
- language: node_js
@@ -169,7 +178,7 @@ matrix:
- language: java
jdk: "openjdk8"
dist: xenial
- env: NAME="Test submarine-server" PROFILE="-Phadoop-2.9"
BUILD_FLAG="clean package install -DskipTests" TEST_FLAG="test -DskipRat -am"
MODULES="-pl ${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER}"
TEST_MODULES="-pl submarine-server/server-core" TEST_PROJECTS=""
+ env: NAME="Test submarine-server" PROFILE="-Phadoop-2.9"
BUILD_FLAG="clean package install -DskipTests" TEST_FLAG="test -DskipRat"
MODULES="-pl ${EXCLUDE_K8S},${EXCLUDE_WORKBENCH},${EXCLUDE_INTERPRETER}"
TEST_MODULES="-pl submarine-server/server-core" TEST_PROJECTS=""
- name: Submarine on Kubernetes
dist: xenial
diff --git a/bin/submarine-daemon.sh b/bin/submarine-daemon.sh
index 01aec11..4614d7f 100755
--- a/bin/submarine-daemon.sh
+++ b/bin/submarine-daemon.sh
@@ -36,9 +36,10 @@ cd ${BIN}/>/dev/null
SUBMARINE_SERVER_NAME="Submarine Server"
SUBMARINE_SERVER_LOGFILE="${SUBMARINE_LOG_DIR}/submarine.log"
SUBMARINE_SERVER_MAIN=org.apache.submarine.server.SubmarineServer
-JAVA_OPTS+=" -Dsubmarine.log.file=${SUBMARINE_SERVER_LOGFILE}"
+JAVA_OPTS+="${SUBMARINE_SERVER_JAVA_OPTS} ${SUBMARINE_SERVER_MEM}
-Dsubmarine.log.file=${SUBMARINE_SERVER_LOGFILE}"
-add_jar_in_dir "${BIN}/../lib"
+add_each_jar_in_dir "${BIN}/../lib"
+add_each_jar_in_dir "${BIN}/../lib/submitter"
function initialize_default_directories() {
if [[ ! -d "${SUBMARINE_LOG_DIR}" ]]; then
diff --git a/conf/submarine-env.sh.template b/conf/submarine-env.sh.template
index bb78ebb..5b66539 100644
--- a/conf/submarine-env.sh.template
+++ b/conf/submarine-env.sh.template
@@ -18,5 +18,12 @@
# export JAVA_HOME=java
-# export SUBMARINE_SERVER_JAVA_OPTS
+# Debug Submarine server
+# export
SUBMARINE_SERVER_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
+
+# Set Submarine server memory
# export SUBMARINE_SERVER_MEM="-Xms1024m -Xmx1024m -XX:MaxPermSize=512m"
+
+# Set Submarine server classpath. If you want to visit hdfs, just add hadoop
+# configuration path.
+# export SUBMARINE_SERVER_CLASSPATH+=":/usr/local/hadoop/etc/hadoop"
diff --git a/conf/submarine-site.xml b/conf/submarine-site.xml
index ac2eb71..bc3806d 100755
--- a/conf/submarine-site.xml
+++ b/conf/submarine-site.xml
@@ -103,7 +103,7 @@
<property>
<name>workbench.web.war</name>
- <value>../workbench/workbench-web.war</value>
+ <value>../workbench-web.war</value>
<description>Submarine workbench web war file path.</description>
</property>
diff --git a/conf/submarine-site.xml.template b/conf/submarine-site.xml.template
index ac2eb71..42dbb10 100755
--- a/conf/submarine-site.xml.template
+++ b/conf/submarine-site.xml.template
@@ -103,7 +103,7 @@
<property>
<name>workbench.web.war</name>
- <value>../workbench/workbench-web.war</value>
+ <value>../workbench-web.war</value>
<description>Submarine workbench web war file path.</description>
</property>
@@ -113,4 +113,10 @@
<description>RuntimeFactory for Submarine jobs</description>
</property>
+ <property>
+ <name>submarine.server.remote.execution.enabled</name>
+ <value>true</value>
+ <description>Run jobs using rpc server.</description>
+ </property>
+
</configuration>
diff --git a/dev-support/mini-submarine/README.md
b/dev-support/mini-submarine/README.md
index e85b01b..218e8fa 100644
--- a/dev-support/mini-submarine/README.md
+++ b/dev-support/mini-submarine/README.md
@@ -95,6 +95,7 @@ hdfs dfs -ls /user
2. Start workbench server
```
+su yarn
/opt/submarine-current/bin/workbench-daemon.sh start getMysqlJar
```
@@ -116,6 +117,26 @@ For example, if you are in mainland China, you can use the
following command
cd && cd submarine && ./run_submarine_mnist_tony.sh -d
http://yann.lecun.com/exdb/mnist/
```
+#### Run a mnist TF job via submarine server
+
+Submarine server is supposed to manage jobs lifecycle. Clients can just submit
+job parameters or yaml file to submarine server instead of submitting jobs
+directly by themselves. Submarine server can handle the rest of the work.
+
+Set submarine.server.remote.execution.enabled to true in the file of
+/opt/submarine-current/conf/submarine-site
+```
+ <property>
+ <name>submarine.server.remote.execution.enabled</name>
+ <value>true</value>
+ <description>Run jobs using rpc server.</description>
+ </property>
+```
+Run the following command to submit a job via submarine server
+```
+./run_submarine_mnist_tony.sh
+```
+
#### Try your own submarine program
Run container with your source code. You can also use "docker cp" to an
existing running container
diff --git a/dev-support/mini-submarine/conf/bootstrap.sh
b/dev-support/mini-submarine/conf/bootstrap.sh
index 68b2f11..1b1077c 100755
--- a/dev-support/mini-submarine/conf/bootstrap.sh
+++ b/dev-support/mini-submarine/conf/bootstrap.sh
@@ -27,7 +27,7 @@ CONFIG_DIR="/tmp/hadoop-config"
# Copy config files from volume mount
-for f in slaves core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml
container-executor.cfg capacity-scheduler.xml node-resources.xml
resource-types.xml submarine-site.xml; do
+for f in slaves core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml
container-executor.cfg capacity-scheduler.xml node-resources.xml
resource-types.xml; do
if [[ -e ${CONFIG_DIR}/$f ]]; then
cp ${CONFIG_DIR}/$f $HADOOP_PREFIX/etc/hadoop/$f
else
@@ -36,6 +36,16 @@ for f in slaves core-site.xml hdfs-site.xml mapred-site.xml
yarn-site.xml contai
fi
done
+# Copy submarine config
+for f in slaves submarine-site.xml submarine-env.sh; do
+ if [[ -e ${CONFIG_DIR}/$f ]]; then
+ cp ${CONFIG_DIR}/$f /opt/submarine-current/conf/$f
+ else
+ echo "ERROR: Could not find $f in $CONFIG_DIR"
+ exit 1
+ fi
+done
+
# create cgroups
mkdir -p /sys/fs/cgroup/cpu/hadoop-yarn
chown -R yarn /sys/fs/cgroup/cpu/hadoop-yarn
diff --git a/bin/submarine-env.sh
b/dev-support/mini-submarine/conf/submarine-env.sh
similarity index 73%
rename from bin/submarine-env.sh
rename to dev-support/mini-submarine/conf/submarine-env.sh
index bb78ebb..87eb221 100644
--- a/bin/submarine-env.sh
+++ b/dev-support/mini-submarine/conf/submarine-env.sh
@@ -18,5 +18,12 @@
# export JAVA_HOME=java
-# export SUBMARINE_SERVER_JAVA_OPTS
+# Debug Submarine server
+# export
SUBMARINE_SERVER_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
+
+# Set Submarine server memory
# export SUBMARINE_SERVER_MEM="-Xms1024m -Xmx1024m -XX:MaxPermSize=512m"
+
+# Set Submarine server classpath. If you want to visit hdfs, just add hadoop
+# configuration path.
+export SUBMARINE_SERVER_CLASSPATH+=":/usr/local/hadoop/etc/hadoop"
\ No newline at end of file
diff --git a/dev-support/mini-submarine/conf/submarine-site.xml
b/dev-support/mini-submarine/conf/submarine-site.xml
index 785f115..ac2d555 100644
--- a/dev-support/mini-submarine/conf/submarine-site.xml
+++ b/dev-support/mini-submarine/conf/submarine-site.xml
@@ -1,4 +1,5 @@
<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -17,11 +18,105 @@
-->
<configuration>
+
+ <property>
+ <name>submarine.cluster.addr</name>
+ <value></value>
+ <description>submarine cluster address list, e.g.
ip1:port1;ip2:port2;ip3:port3</description>
+ </property>
+
+ <property>
+ <name>submarine.server.addr</name>
+ <value>0.0.0.0</value>
+ <description>Server address</description>
+ </property>
+
+ <property>
+ <name>submarine.server.port</name>
+ <value>8080</value>
+ <description>Server port.</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl</name>
+ <value>false</value>
+ <description>Should SSL be used by the submarine server?</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.port</name>
+ <value>8443</value>
+ <description>Server ssl port. (used when ssl property is set to
true)</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.client.auth</name>
+ <value>false</value>
+ <description>Should client authentication be used for SSL
connections?</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.keystore.path</name>
+ <value>keystore</value>
+ <description>Path to keystore relative to submarine configuration
directory</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.keystore.type</name>
+ <value>JKS</value>
+ <description>The format of the given keystore (e.g. JKS or
PKCS12)</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.keystore.password</name>
+ <value>change me</value>
+ <description>Keystore password. Can be obfuscated by the Jetty Password
tool</description>
+ </property>
+
+ <!--
+ <property>
+ <name>submarine.server.ssl.key.manager.password</name>
+ <value>change me</value>
+ <description>Key Manager password. Defaults to keystore password. Can be
obfuscated.</description>
+ </property>
+ -->
+
+ <property>
+ <name>submarine.server.ssl.truststore.path</name>
+ <value>truststore</value>
+ <description>Path to truststore relative to submarine configuration
directory. Defaults to the keystore path</description>
+ </property>
+
+ <property>
+ <name>submarine.server.ssl.truststore.type</name>
+ <value>JKS</value>
+ <description>The format of the given truststore (e.g. JKS or PKCS12).
Defaults to the same type as the keystore type</description>
+ </property>
+
+ <!--
+ <property>
+ <name>submarine.server.ssl.truststore.password</name>
+ <value>change me</value>
+ <description>Truststore password. Can be obfuscated by the Jetty Password
tool. Defaults to the keystore password</description>
+ </property>
+ -->
+
+ <property>
+ <name>workbench.web.war</name>
+ <value>../workbench-web.war</value>
+ <description>Submarine workbench web war file path.</description>
+ </property>
+
<property>
<name>submarine.runtime.class</name>
<value>org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory</value>
- <!--
-
<value>org.apache.submarine.server.submitter.yarnservice.YarnServiceRuntimeFactory</value>
- -->
+ <description>RuntimeFactory for Submarine jobs</description>
</property>
+
+ <property>
+ <name>submarine.server.remote.execution.enabled</name>
+ <value>false</value>
+ <description>Run jobs using rpc server.</description>
+ </property>
+
</configuration>
diff --git
a/dev-support/mini-submarine/submarine/run_submarine_mnist_tony_rpc.sh
b/dev-support/mini-submarine/submarine/run_submarine_mnist_tony_rpc.sh
new file mode 100644
index 0000000..3f0c483
--- /dev/null
+++ b/dev-support/mini-submarine/submarine/run_submarine_mnist_tony_rpc.sh
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/bin/bash
+while [ $# -gt 0 ]; do
+ case "$1" in
+ --debug*)
+ DEBUG=$1
+ shift
+ ;;
+ *)
+ break
+ ;;
+ esac
+done
+
+DEBUG_PORT=8000
+if [ "$DEBUG" ]; then
+ JAVA_CMD="java
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=${DEBUG_PORT}"
+else
+ JAVA_CMD="java"
+fi
+
+while getopts 'd:' OPT; do
+ case $OPT in
+ d)
+ DATA_URL="$OPTARG";;
+ esac
+done
+shift $(($OPTIND - 1))
+
+if [[ -n "$DATA_URL" ]]; then
+ WORKER_CMD="myvenv.zip/venv/bin/python mnist_distributed.py --steps 2
--data_dir /tmp/data --working_dir /tmp/mode --mnist_data_url ${DATA_URL}"
+else
+ WORKER_CMD="myvenv.zip/venv/bin/python mnist_distributed.py --steps 2
--data_dir /tmp/data --working_dir /tmp/mode"
+fi
+
+SUBMARINE_VERSION=0.3.0-SNAPSHOT
+HADOOP_VERSION=2.9
+
+${JAVA_CMD} -cp
/opt/submarine/submarine-dist-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}/submarine-all-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}.jar:/usr/local/hadoop/etc/hadoop:/opt/submarine/submarine-dist-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}/conf
\
+ org.apache.submarine.client.cli.Cli job run --name tf-job-001 \
+ --framework tensorflow \
+ --verbose \
+ --input_path "" \
+ --num_workers 2 \
+ --worker_resources memory=1G,vcores=1 \
+ --num_ps 1 \
+ --ps_resources memory=1G,vcores=1 \
+ --worker_launch_cmd "${WORKER_CMD}" \
+ --ps_launch_cmd "myvenv.zip/venv/bin/python mnist_distributed.py --steps 2
--data_dir /tmp/data --working_dir /tmp/mode" \
+ --insecure \
+ --conf
tony.containers.resources=/home/yarn/submarine/myvenv.zip#archive,/home/yarn/submarine/mnist_distributed.py,/opt/submarine/submarine-dist-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}/submarine-all-${SUBMARINE_VERSION}-hadoop-${HADOOP_VERSION}.jar
diff --git a/pom.xml b/pom.xml
index dc9ad25..4deec90 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@
<cglib.version>3.2.2</cglib.version>
<mybatis.version>3.2.8</mybatis.version>
<mysql-connector-java.version>5.1.39</mysql-connector-java.version>
+ <grpc.verison>1.25.0</grpc.verison>
<!-- frontend maven plugin related versions-->
<plugin.frontend.version>1.6</plugin.frontend.version>
@@ -129,8 +130,8 @@
</properties>
<modules>
- <module>submarine-client</module>
<module>submarine-commons</module>
+ <module>submarine-client</module>
<module>submodules/tony</module>
<module>submarine-server</module>
<module>submarine-all</module>
@@ -272,6 +273,21 @@
<artifactId>jackson-jaxrs</artifactId>
<version>${codehaus-jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/submarine-all/pom.xml b/submarine-all/pom.xml
index 900a544..72ee2c5 100644
--- a/submarine-all/pom.xml
+++ b/submarine-all/pom.xml
@@ -64,6 +64,16 @@
<groupId>org.apache.submarine</groupId>
<artifactId>submarine-client</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.submarine</groupId>
diff --git a/submarine-client/pom.xml b/submarine-client/pom.xml
index 11e0cc0..ebd5b85 100644
--- a/submarine-client/pom.xml
+++ b/submarine-client/pom.xml
@@ -37,6 +37,12 @@
<groupId>org.apache.submarine</groupId>
<artifactId>commons-runtime</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
@@ -119,6 +125,29 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-rpc</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>animal-sniffer-annotations</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java
index 4b8d0ad..296d6cc 100644
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java
+++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java
@@ -20,10 +20,12 @@
package org.apache.submarine.client.cli;
import org.apache.hadoop.conf.Configuration;
+import org.apache.submarine.client.cli.remote.RpcRuntimeFactory;
import org.apache.submarine.client.cli.runjob.RunJobCli;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.submarine.commons.runtime.ClientContext;
import org.apache.submarine.commons.runtime.RuntimeFactory;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +47,17 @@ public class Cli {
}
private static ClientContext getClientContext() {
- Configuration conf = new YarnConfiguration();
ClientContext clientContext = new ClientContext();
- clientContext.setYarnConfig(conf);
- RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory(
- clientContext);
+ RuntimeFactory runtimeFactory;
+ if (clientContext.getSubmarineConfig().getBoolean(
+ SubmarineConfiguration.ConfVars.
+ SUBMARINE_SERVER_REMOTE_EXECUTION_ENABLED)) {
+ runtimeFactory = new RpcRuntimeFactory(clientContext);
+ } else {
+ Configuration conf = new YarnConfiguration();
+ clientContext.setYarnConfig(conf);
+ runtimeFactory = RuntimeFactory.getRuntimeFactory(clientContext);
+ }
clientContext.setRuntimeFactory(runtimeFactory);
return clientContext;
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
index d76c44c..222de1f 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
@@ -21,6 +21,7 @@ package org.apache.submarine.client.cli;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
import
org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
@@ -125,4 +126,33 @@ public class CliUtils {
}
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
+
+ /**
+ * As hadoop-2.7 doesn't have this method, we add this method in submarine.
+ * @param appIdStr
+ * @return
+ */
+ public static ApplicationId fromString(String appIdStr) {
+ String APPLICATION_ID_PREFIX = "application_";
+ if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) {
+ throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ + appIdStr + ". The valid ApplicationId should start with prefix "
+ + "application");
+ }
+ try {
+ int pos1 = APPLICATION_ID_PREFIX.length() - 1;
+ int pos2 = appIdStr.indexOf('_', pos1 + 1);
+ if (pos2 < 0) {
+ throw new IllegalArgumentException("Invalid ApplicationId: "
+ + appIdStr);
+ }
+ long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2));
+ int appId = Integer.parseInt(appIdStr.substring(pos2 + 1));
+ ApplicationId applicationId = ApplicationId.newInstance(rmId, appId);
+ return applicationId;
+ } catch (NumberFormatException n) {
+ throw new IllegalArgumentException("Invalid ApplicationId: "
+ + appIdStr, n);
+ }
+ }
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
index 18297e1..0377450 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
@@ -108,24 +108,27 @@ public class Localization {
return remoteUri;
}
- public void setRemoteUri(String rUti) {
+ public Localization setRemoteUri(String rUti) {
this.remoteUri = rUti;
+ return this;
}
public String getLocalPath() {
return localPath;
}
- public void setLocalPath(String lPath) {
+ public Localization setLocalPath(String lPath) {
this.localPath = lPath;
+ return this;
}
public String getMountPermission() {
return mountPermission;
}
- public void setMountPermission(String mPermission) {
+ public Localization setMountPermission(String mPermission) {
this.mountPermission = mPermission;
+ return this;
}
private boolean isSupportedScheme(String scheme) {
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
index a6df357..0c1c21e 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
@@ -75,15 +75,15 @@ public final class ParametersHolder implements Parameter {
- private final CommandLine parsedCommandLine;
- private final Map<String, String> yamlStringConfigs;
- private final Map<String, List<String>> yamlListConfigs;
- private final ConfigType configType;
+ private CommandLine parsedCommandLine;
+ private Map<String, String> yamlStringConfigs;
+ private Map<String, List<String>> yamlListConfigs;
+ private ConfigType configType;
private Command command;
private final Set onlyDefinedWithCliArgs = ImmutableSet.of(
CliConstants.VERBOSE);
- private final Framework framework;
- private final BaseParameters parameters;
+ private Framework framework;
+ private BaseParameters parameters;
private ParametersHolder(CommandLine parsedCommandLine,
YamlConfigFile yamlConfig, ConfigType configType, Command command)
@@ -98,6 +98,10 @@ public final class ParametersHolder implements Parameter {
this.parameters = createParameters();
}
+ private ParametersHolder(){
+ super();
+ }
+
private BaseParameters createParameters() {
if (command == Command.RUN_JOB) {
if (framework == Framework.TENSORFLOW) {
@@ -289,6 +293,10 @@ public final class ParametersHolder implements Parameter {
.collect(Collectors.toList());
}
+ public static ParametersHolder create() {
+ return new ParametersHolder();
+ }
+
public static ParametersHolder createWithCmdLine(CommandLine cli,
Command command) throws ParseException, YarnException {
return new ParametersHolder(cli, null, ConfigType.CLI, command);
@@ -434,6 +442,12 @@ public final class ParametersHolder implements Parameter {
return framework;
}
+ @Override
+ public Parameter setFramework(Framework framework) {
+ this.framework = framework;
+ return this;
+ }
+
public void updateParameters(ClientContext clientContext)
throws ParseException, YarnException, IOException {
parameters.updateParameters(this, clientContext);
@@ -442,4 +456,9 @@ public final class ParametersHolder implements Parameter {
public BaseParameters getParameters() {
return parameters;
}
+
+ public Parameter setParameters(BaseParameters parameters) {
+ this.parameters = parameters;
+ return this;
+ }
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
index 80e3c53..4fe035c 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
@@ -62,15 +62,35 @@ public class Quicklink {
return label;
}
+ public Quicklink setLabel(String label) {
+ this.label = label;
+ return this;
+ }
+
public String getComponentInstanceName() {
return componentInstanceName;
}
+ public Quicklink setComponentInstanceName(String componentInstanceName) {
+ this.componentInstanceName = componentInstanceName;
+ return this;
+ }
+
public String getProtocol() {
return protocol;
}
+ public Quicklink setProtocol(String protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
public int getPort() {
return port;
}
+
+ public Quicklink setPort(int port) {
+ this.port = port;
+ return this;
+ }
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
index dbbe31a..80767cf 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
@@ -49,7 +49,7 @@ public class PyTorchRunJobParameters extends RunJobParameters
{
String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
this.workerParameters =
- getWorkerParameters(clientContext, parametersHolder, input);
+ generateWorkerParameters(clientContext, parametersHolder, input);
this.distributed = determineIfDistributed(workerParameters.getReplicas());
executePostOperations(clientContext);
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
index 32daae1..f219ef5 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
@@ -178,14 +178,29 @@ public abstract class RunJobParameters extends
RunParameters {
return waitJobFinish;
}
+ public RunJobParameters setWaitJobFinish(boolean waitJobFinish) {
+ this.waitJobFinish = waitJobFinish;
+ return this;
+ }
+
public List<Quicklink> getQuicklinks() {
return quicklinks;
}
+ public RunJobParameters setQuicklinks(List<Quicklink> quicklinks) {
+ this.quicklinks = quicklinks;
+ return this;
+ }
+
public List<Localization> getLocalizations() {
return localizations;
}
+ public RunJobParameters setLocalizations(List<Localization> localizations) {
+ this.localizations = localizations;
+ return this;
+ }
+
public String getKeytab() {
return keytab;
}
@@ -208,8 +223,9 @@ public abstract class RunJobParameters extends
RunParameters {
return securityDisabled;
}
- public void setSecurityDisabled(boolean securityDisabled) {
+ public RunJobParameters setSecurityDisabled(boolean securityDisabled) {
this.securityDisabled = securityDisabled;
+ return this;
}
public boolean isDistributeKeytab() {
@@ -231,11 +247,12 @@ public abstract class RunJobParameters extends
RunParameters {
return this;
}
- public void setDistributed(boolean distributed) {
+ public RunJobParameters setDistributed(boolean distributed) {
this.distributed = distributed;
+ return this;
}
- RoleParameters getWorkerParameters(ClientContext clientContext,
+ RoleParameters generateWorkerParameters(ClientContext clientContext,
Parameter parametersHolder, String input)
throws ParseException, YarnException, IOException {
int nWorkers = getNumberOfWorkers(parametersHolder, input);
@@ -282,6 +299,15 @@ public abstract class RunJobParameters extends
RunParameters {
return nWorkers;
}
+ public RoleParameters getWorkerParameter() {
+ return workerParameters;
+ }
+
+ public RunJobParameters setWorkerParameter(RoleParameters workerParameters) {
+ this.workerParameters = workerParameters;
+ return this;
+ }
+
public String getWorkerLaunchCmd() {
return workerParameters.getLaunchCommand();
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
index b084a5d..06f2bc0 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
@@ -52,7 +52,7 @@ public class TensorFlowRunJobParameters extends
RunJobParameters {
String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
this.workerParameters =
- getWorkerParameters(clientContext, parametersHolder, input);
+ generateWorkerParameters(clientContext, parametersHolder, input);
this.psParameters = getPSParameters(clientContext, parametersHolder);
this.distributed = determineIfDistributed(workerParameters.getReplicas(),
psParameters.getReplicas());
@@ -159,6 +159,14 @@ public class TensorFlowRunJobParameters extends
RunJobParameters {
tensorboardDockerImage, tensorboardResource);
}
+ public RoleParameters getPsParameters() {
+ return psParameters;
+ }
+
+ public void setPsParameters(RoleParameters parameters) {
+ this.psParameters = parameters;
+ }
+
public int getNumPS() {
return psParameters.getReplicas();
}
@@ -191,10 +199,22 @@ public class TensorFlowRunJobParameters extends
RunJobParameters {
psParameters.setLaunchCommand(launchCmd);
}
+ public RoleParameters getTensorBoardParameters() {
+ return tensorBoardParameters;
+ }
+
+ public void setTensorBoardParameters(RoleParameters tensorBoardParameters) {
+ this.tensorBoardParameters = tensorBoardParameters;
+ }
+
public boolean isTensorboardEnabled() {
return tensorboardEnabled;
}
+ public void setTensorboardEnabled(boolean tensorboardEnabled) {
+ this.tensorboardEnabled = tensorboardEnabled;
+ }
+
public Resource getTensorboardResource() {
return tensorBoardParameters.getResource();
}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
new file mode 100644
index 0000000..c531978
--- /dev/null
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/ClientProto.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.client.cli.remote;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.submarine.client.cli.CliUtils;
+import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.Quicklink;
+import org.apache.submarine.client.cli.param.RunParameters;
+import org.apache.submarine.client.cli.param.ShowJobParameters;
+import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
+import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
+import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
+import org.apache.submarine.client.cli.runjob.RoleParameters;
+import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.ParameterProto;
+import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
+import org.apache.submarine.commons.rpc.QuicklinkProto;
+import org.apache.submarine.commons.rpc.ResourceProto;
+import org.apache.submarine.commons.rpc.RoleParameterProto;
+import org.apache.submarine.commons.rpc.RunParameterProto;
+import org.apache.submarine.commons.rpc.ShowJobParameterProto;
+import org.apache.submarine.commons.rpc.TensorFlowRunJobParameterProto;
+import org.apache.submarine.commons.runtime.param.BaseParameters;
+import org.apache.submarine.commons.runtime.param.Parameter;
+import org.apache.submarine.commons.runtime.resource.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ClientProto {
+
+ public static ParameterProto convertParameterToParameterProto(
+ Parameter parameters, RpcContext rpcContext) {
+ ParameterProto proto = null;
+ BaseParameters baseParameters = parameters.getParameters();
+ if (baseParameters instanceof RunParameters) {
+ if (baseParameters instanceof PyTorchRunJobParameters) {
+ // Handle pyTorch job parameters
+ proto = convertPyTorchRunJobToParameterProto(parameters, rpcContext);
+ } else if (baseParameters instanceof TensorFlowRunJobParameters) {
+ // Handle tensorflow job parameters
+ proto = convertTensorFlowRunJobToParameterProto(parameters,
rpcContext);
+ }
+ } else if (baseParameters instanceof ShowJobParameters) {
+ // Handle show job parameters
+ proto = convertShowJobToParameterProto(parameters, rpcContext);
+ }
+ return proto;
+ }
+
+ public static ParameterProto convertPyTorchRunJobToParameterProto(
+ Parameter parameters, RpcContext rpcContext) {
+ PyTorchRunJobParameterProto pytorchProto =
+ PyTorchRunJobParameterProto.newBuilder()
+ .setRunParameterProto(
+ convertParameterToRunParametersProto(parameters)).build();
+ ParameterProto parameterProto = ParameterProto.newBuilder()
+ .setPytorchRunJobParameter(pytorchProto)
+ .setFramework(parameters.getFramework().getValue())
+ .putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap())
+ .build();
+ return parameterProto;
+ }
+
+ public static ParameterProto convertTensorFlowRunJobToParameterProto(
+ Parameter parameters, RpcContext rpcContext) {
+ TensorFlowRunJobParameters tensorFlowRunJobParameters =
+ (TensorFlowRunJobParameters)parameters.getParameters();
+ TensorFlowRunJobParameterProto tfProto =
+ TensorFlowRunJobParameterProto.newBuilder()
+ .setRunParameterProto(
+ convertParameterToRunParametersProto(parameters))
+ .setTensorboardEnabled(
+ tensorFlowRunJobParameters.isTensorboardEnabled())
+ .setPsParameter(convertRoleParametersToRoleParameterProto(
+ tensorFlowRunJobParameters.getPsParameters()))
+ .setTensorBoardParameter(convertRoleParametersToRoleParameterProto(
+ tensorFlowRunJobParameters.getTensorBoardParameters()))
+ .build();
+ ParameterProto parameterProto = ParameterProto.newBuilder()
+ .setTensorflowRunJobParameter(tfProto)
+ .setFramework(parameters.getFramework().getValue())
+ .putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap())
+ .build();
+ return parameterProto;
+ }
+
+ public static RunParameterProto convertParameterToRunParametersProto(
+ Parameter parameters) {
+ RunJobParameters runJobParameter =
+ (RunJobParameters) parameters.getParameters();
+ RunParameterProto runParametersProto = RunParameterProto.newBuilder()
+ .setCheckpointPath(
+
Optional.ofNullable(runJobParameter.getCheckpointPath()).orElse(""))
+ .addAllConfPairs(Optional.ofNullable(runJobParameter.getConfPairs())
+ .orElse(new ArrayList<String>()))
+ .setDistributed(runJobParameter.isDistributed())
+ .setDistributeKeytab(runJobParameter.isDistributeKeytab())
+ .setDockerImageName(
+
Optional.ofNullable(runJobParameter.getDockerImageName()).orElse(""))
+ .addAllEnvars(runJobParameter.getEnvars())
+
.setInput(Optional.ofNullable(runJobParameter.getInputPath()).orElse(""))
+ .setKeytab(Optional.ofNullable(runJobParameter.getKeytab()).orElse(""))
+ .addAllLocalizations(convertLocalizationToLocalizationProto(
+ runJobParameter.getLocalizations()))
+ .setName(Optional.ofNullable(runJobParameter.getName()).orElse(""))
+ .setPrincipal(
+ Optional.ofNullable(runJobParameter.getPrincipal()).orElse(""))
+ .setQueue(
+ Optional.ofNullable(runJobParameter.getQueue()).orElse(""))
+ .addAllQuicklinks(convertQuicklinktoQuicklinkProto(
+ runJobParameter.getQuicklinks()))
+ .setSavedModelPath(
+
Optional.ofNullable(runJobParameter.getSavedModelPath()).orElse(""))
+ .setSecurityDisabled(runJobParameter.isSecurityDisabled())
+ .setWaitJobFinish(runJobParameter.isWaitJobFinish())
+ .setWorkerParameter(convertRoleParametersToRoleParameterProto(
+ runJobParameter.getWorkerParameter()))
+ .build();
+ return runParametersProto;
+ }
+
+ public static List<LocalizationProto> convertLocalizationToLocalizationProto(
+ List<Localization> localizations) {
+ List<LocalizationProto> LocalizationProtos =
+ new ArrayList<LocalizationProto>();
+ for (Localization localization: localizations) {
+ LocalizationProtos.add(LocalizationProto.newBuilder()
+ .setLocalPath(localization.getLocalPath())
+ .setMountPermission(localization.getMountPermission())
+ .setRemoteUri(localization.getRemoteUri())
+ .build());
+ }
+ return LocalizationProtos;
+ }
+
+ public static List<QuicklinkProto> convertQuicklinktoQuicklinkProto(
+ List<Quicklink> quicklinks) {
+ List<QuicklinkProto> quicklinkProtos =
+ new ArrayList<QuicklinkProto>();
+ for (Quicklink quicklink: quicklinks) {
+ quicklinkProtos.add(QuicklinkProto.newBuilder()
+ .setLabel(quicklink.getLabel())
+ .setComponentInstanceName(quicklink.getComponentInstanceName())
+ .setProtocol(quicklink.getProtocol())
+ .setPort(quicklink.getPort())
+ .build());
+ }
+ return quicklinkProtos;
+ }
+
+ public static RoleParameterProto convertRoleParametersToRoleParameterProto(
+ RoleParameters roleParameter) {
+ RoleParameterProto roleParameterProto = RoleParameterProto.newBuilder()
+ .setRole(roleParameter.getRole().getName())
+ .setDockerImage(
+ Optional.ofNullable(roleParameter.getDockerImage()).orElse(""))
+ .setReplicas(roleParameter.getReplicas())
+ .setLaunchCommand(
+ Optional.ofNullable(roleParameter.getLaunchCommand()).orElse(""))
+ .setResourceProto(
+ convertResourceToResourceProto(roleParameter.getResource()))
+ .build();
+ return roleParameterProto;
+ }
+
+ public static ResourceProto convertResourceToResourceProto(
+ Resource resource) {
+ Map<String, Long> map = ResourceUtils.getResourceMap(resource);
+ return ResourceProto.newBuilder().putAllResourceMap(map).build();
+ }
+
+ public static ParameterProto convertShowJobToParameterProto(
+ Parameter parameters, RpcContext rpcContext) {
+ ShowJobParameterProto showJobproto = ShowJobParameterProto.newBuilder()
+ .setName(parameters.getParameters().getName())
+ .build();
+ ParameterProto parameterProto = ParameterProto.newBuilder()
+ .setShowJobParameter(showJobproto)
+ .setFramework(parameters.getFramework().getValue())
+ .putAllSubmarineJobConfigMap(rpcContext.getSubmarineJobConfigMap())
+ .build();
+ return parameterProto;
+ }
+
+ public static ApplicationId convertApplicationIdProtoToApplicationId(
+ ApplicationIdProto applicationIdProto) {
+ return CliUtils.fromString(applicationIdProto.getApplicationId());
+ }
+
+}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/JobSubmitterRpcImpl.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/JobSubmitterRpcImpl.java
new file mode 100644
index 0000000..748fa14
--- /dev/null
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/JobSubmitterRpcImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.client.cli.remote;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.StatusRuntimeException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.ParameterProto;
+import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc;
+import
org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc.SubmarineServerProtocolBlockingStub;
+import
org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc.SubmarineServerProtocolStub;
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.runtime.JobSubmitter;
+import org.apache.submarine.commons.runtime.param.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class JobSubmitterRpcImpl implements JobSubmitter {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(JobSubmitterRpcImpl.class.getName());
+
+ private final ManagedChannel channel;
+ private final SubmarineServerProtocolBlockingStub blockingStub;
+ private final SubmarineServerProtocolStub asyncStub;
+ private final RpcContext rpcContext;
+
+ /** Construct client for accessing RouteGuide server at {@code host:port}. */
+ public JobSubmitterRpcImpl(String host, int port,
+ ClientContext clientContext) {
+ this(ManagedChannelBuilder.forAddress(host, port).usePlaintext(),
+ clientContext);
+ }
+
+ /** Construct client for accessing RouteGuide server using the existing
channel. */
+ public JobSubmitterRpcImpl(ManagedChannelBuilder<?> channelBuilder,
+ ClientContext clientContext) {
+ channel = channelBuilder.build();
+ blockingStub = SubmarineServerProtocolGrpc.newBlockingStub(channel);
+ asyncStub = SubmarineServerProtocolGrpc.newStub(channel);
+ rpcContext = RpcContext.convertClientContextToRpcContext(clientContext);
+ }
+
+ @Override
+ public ApplicationId submitJob(Parameter parameters) throws IOException,
YarnException {
+ ParameterProto request = ClientProto.convertParameterToParameterProto(
+ parameters, rpcContext);
+
+ ApplicationId applicationId = null;
+ try {
+ ApplicationIdProto applicationIdProto = blockingStub.submitJob(request);
+ applicationId =
+
ClientProto.convertApplicationIdProtoToApplicationId(applicationIdProto);
+ } catch (StatusRuntimeException e) {
+ LOG.error(e.getMessage(),e);
+ } finally {
+ shutdown();
+ }
+ return applicationId;
+ }
+
+ public void shutdown() {
+ try {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcContext.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcContext.java
new file mode 100644
index 0000000..891a8f5
--- /dev/null
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.client.cli.remote;
+
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RpcContext {
+
+ private Map<String, String> submarineJobConfigMap = new HashMap<>();
+
+ public static RpcContext convertClientContextToRpcContext(
+ ClientContext clientContext) {
+ SubmarineConfiguration submarineConfig =
+ clientContext.getSubmarineConfig();
+ RpcContext rpcContext = new RpcContext();
+ rpcContext.addSubmarineJobConfiguration(rpcContext, submarineConfig);
+ return rpcContext;
+ }
+
+ private void addSubmarineJobConfiguration(RpcContext rpcContext,
+ SubmarineConfiguration submarineConfig) {
+ rpcContext.getSubmarineJobConfigMap().put(
+ SubmarineConfiguration.ConfVars.SUBMARINE_RUNTIME_CLASS.getVarName(),
+ submarineConfig.getString(
+ SubmarineConfiguration.ConfVars.SUBMARINE_RUNTIME_CLASS)
+ );
+ }
+
+ public Map<String, String> getSubmarineJobConfigMap() {
+ return submarineJobConfigMap;
+ }
+
+ public RpcContext setSubmarineJobConfigMap(
+ Map<String, String> submarineJobConfigMap) {
+ this.submarineJobConfigMap = submarineJobConfigMap;
+ return this;
+ }
+
+}
\ No newline at end of file
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java
new file mode 100644
index 0000000..135570c
--- /dev/null
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.submarine.client.cli.remote;
+
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.runtime.JobMonitor;
+import org.apache.submarine.commons.runtime.JobSubmitter;
+import org.apache.submarine.commons.runtime.RuntimeFactory;
+import org.apache.submarine.commons.runtime.fs.MemorySubmarineStorage;
+import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+
+/**
+ * Implementation of RuntimeFactory with rpc server
+ */
+public class RpcRuntimeFactory extends RuntimeFactory {
+ private JobSubmitterRpcImpl submitter;
+
+ public RpcRuntimeFactory(ClientContext clientContext) {
+ super(clientContext);
+ String remoteHost = clientContext.getSubmarineConfig().getServerAddress();
+ int port = clientContext.getSubmarineConfig().getInt(
+
SubmarineConfiguration.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT);
+ submitter = new JobSubmitterRpcImpl(remoteHost, port, clientContext);
+ }
+
+ @Override
+ protected JobSubmitter internalCreateJobSubmitter() {
+ return submitter;
+ }
+
+ @Override
+ protected JobMonitor internalCreateJobMonitor() {
+ return null;
+ }
+
+ @Override
+ protected SubmarineStorage internalCreateSubmarineStorage() {
+ return new MemorySubmarineStorage();
+ }
+}
diff --git
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
index bda5850..fe15d29 100644
---
a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
+++
b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
@@ -58,27 +58,31 @@ public class RoleParameters {
return launchCommand;
}
- public void setLaunchCommand(String launchCommand) {
+ public RoleParameters setLaunchCommand(String launchCommand) {
this.launchCommand = launchCommand;
+ return this;
}
public String getDockerImage() {
return dockerImage;
}
- public void setDockerImage(String dockerImage) {
+ public RoleParameters setDockerImage(String dockerImage) {
this.dockerImage = dockerImage;
+ return this;
}
public Resource getResource() {
return resource;
}
- public void setResource(Resource resource) {
+ public RoleParameters setResource(Resource resource) {
this.resource = resource;
+ return this;
}
- public void setReplicas(int replicas) {
+ public RoleParameters setReplicas(int replicas) {
this.replicas = replicas;
+ return this;
}
}
diff --git a/submarine-commons/commons-rpc/pom.xml
b/submarine-commons/commons-rpc/pom.xml
new file mode 100644
index 0000000..a9ca761
--- /dev/null
+++ b/submarine-commons/commons-rpc/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>submarine-commons</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-rpc</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <name>Submarine: Commons RPC</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>animal-sniffer-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>submarine-${artifactId}-${project.version}</finalName>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+
<protocArtifact>com.google.protobuf:protoc:3.10.0:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.verison}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>submarine.commons.rpc.com.google.protobuf</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
new file mode 100644
index 0000000..27d88a3
--- /dev/null
+++ b/submarine-commons/commons-rpc/src/main/proto/SubmarineServerProtocol.proto
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.submarine.commons.rpc";
+option java_outer_classname = "SubmarineServerRpc";
+
+// Interface exported by the server.
+service SubmarineServerProtocol {
+ // Submit a job to submarine server.
+ rpc SubmitJob(ParameterProto) returns (ApplicationIdProto) {}
+
+ // test rpc
+ rpc TestRpc(ParametersHolderProto) returns (ApplicationIdProto) {}
+
+ // A server-to-client streaming RPC.
+ //
+ // Obtains the Features available within the given Rectangle. Results are
+ // streamed rather than returned at once (e.g. in a response message with a
+ // repeated field), as the rectangle may cover a large area and contain a
+ // huge number of features.
+ // rpc ListFeatures(Rectangle) returns (stream Feature) {}
+
+ // A client-to-server streaming RPC.
+ //
+ // Accepts a stream of Points on a route being traversed, returning a
+ // RouteSummary when traversal is completed.
+ // rpc RecordRoute(stream Point) returns (RouteSummary) {}
+
+ // A Bidirectional streaming RPC.
+ //
+ // Accepts a stream of RouteNotes sent while a route is being traversed,
+ // while receiving other RouteNotes (e.g. from other users).
+ // rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
+}
+
+message ParametersHolderProto {
+ int32 helloworld = 1;
+}
+
+message ParameterProto {
+ string framework = 1;
+ PyTorchRunJobParameterProto pytorch_run_job_parameter = 2;
+ TensorFlowRunJobParameterProto tensorflow_run_job_parameter = 3;
+ ShowJobParameterProto show_job_parameter = 4;
+ map<string, string> submarine_job_config_map = 5;
+}
+
+message ResourceProto {
+ map<string, int64> resource_map = 1;
+}
+
+
+message RoleParameterProto {
+ string role = 1;
+ int32 replicas = 2;
+ string launch_command = 3;
+ string docker_image = 4;
+ ResourceProto resource_proto = 5;
+}
+
+message LocalizationProto {
+ string remote_uri = 1;
+ string local_path = 2;
+ string mount_permission = 3;
+}
+
+message QuicklinkProto {
+ string label = 1;
+ string component_instance_name = 2;
+ string protocol = 3;
+ int32 port = 4;
+}
+
+message RunParameterProto {
+ string name = 1;
+ string saved_model_path = 2;
+ string docker_image_name = 3;
+ string queue = 4;
+ repeated string envars = 5;
+ string input = 6;
+ string checkpoint_path = 7;
+ repeated QuicklinkProto quicklinks = 8;
+ repeated LocalizationProto localizations = 9;
+ bool wait_job_finish = 10;
+ bool distributed = 11;
+ bool security_disabled = 12;
+ string keytab = 13;
+ string principal = 14;
+ bool distribute_keytab = 15;
+ repeated string conf_pairs = 16;
+ RoleParameterProto worker_parameter = 17;
+}
+
+message PyTorchRunJobParameterProto {
+ RunParameterProto run_parameter_proto = 1;
+}
+
+message TensorFlowRunJobParameterProto {
+ RunParameterProto run_parameter_proto = 1;
+ bool tensorboard_enabled = 2;
+ RoleParameterProto ps_parameter = 3;
+ RoleParameterProto tensor_board_parameter = 4;
+}
+
+message ShowJobParameterProto {
+ string name = 1;
+}
+
+message ApplicationIdProto {
+ // One corner of the rectangle.
+ string application_id = 1;
+}
diff --git a/submarine-commons/commons-rpc/src/main/resources/log4j.properties
b/submarine-commons/commons-rpc/src/main/resources/log4j.properties
new file mode 100644
index 0000000..55e02b6
--- /dev/null
+++ b/submarine-commons/commons-rpc/src/main/resources/log4j.properties
@@ -0,0 +1,17 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+log4j.rootLogger = info, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target = System.out
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd
HH:mm:ss,SSS} method:%l%n%m%n
diff --git
a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
index f52b0e6..95fffe1 100644
---
a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
+++
b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
@@ -31,8 +31,12 @@ public interface Parameter {
*/
Framework getFramework();
+ Parameter setFramework(Framework framework);
+
BaseParameters getParameters();
+ Parameter setParameters(BaseParameters parameters);
+
String getOptionValue(String option) throws YarnException;
boolean hasOption(String option);
diff --git
a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
index aa88e26..dd402a0 100644
---
a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
+++
b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
@@ -51,6 +51,7 @@ public final class ResourceUtils {
private static final String GET_RESOURCE_VALUE_METHOD = "getResourceValue";
private static final String GET_RESOURCE_TYPE_METHOD =
"getResourcesTypeInfo";
+ private static final String GET_RESOURCES = "getResources";
private static final String REINITIALIZE_RESOURCES_METHOD =
"reinitializeResources";
public static final String MEMORY_URI = "memory-mb";
@@ -66,12 +67,16 @@ public final class ResourceUtils {
public static Resource createResourceFromString(String resourceStr) {
Map<String, Long> typeToValue = parseResourcesString(resourceStr);
+ return createResource(typeToValue);
+ }
+
+ public static Resource createResource(Map<String, Long> typeToValue) {
Resource resource = Resource.newInstance(0, 0);
for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
- if (entry.getKey().equals(VCORES_URI)) {
+ if (entry.getKey().equalsIgnoreCase(VCORES_URI)) {
resource.setVirtualCores(entry.getValue().intValue());
continue;
- } else if (entry.getKey().equals(MEMORY_URI)) {
+ } else if (entry.getKey().equalsIgnoreCase(MEMORY_URI)) {
setMemorySize(resource, entry.getValue());
continue;
}
@@ -80,6 +85,22 @@ public final class ResourceUtils {
return resource;
}
+ public static Map<String, Long> getResourceMap(Resource resource) {
+ Map<String, Long> resourceMap;
+ if (resource == null) {
+ resourceMap = new HashMap<>();
+ } else {
+ String resourceValue = resource.toString();
+ // Delete <> in the resourceValue and replace ":" with "="
+ resourceMap =
+ parseResourcesString(
+ resourceValue.substring(1, resourceValue.length() - 1)
+ .replaceAll(":", "=")
+ .replaceAll("memory", "memory-mb"));
+ }
+ return resourceMap;
+ }
+
private static Map<String, Long> parseResourcesString(String resourcesStr) {
Map<String, Long> resources = new HashMap<>();
String[] pairs = resourcesStr.trim().split(",");
diff --git
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
index d76d7d1..8de9655 100644
---
a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
+++
b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfiguration.java
@@ -437,6 +437,10 @@ public class SubmarineConfiguration extends
XMLConfiguration {
return getBooleanValue(propertyName, defaultValue);
}
+ public void updateConfiguration(String name, String value) {
+ properties.put(name, value);
+ }
+
public enum ConfVars {
SUBMARINE_CONF_DIR("submarine.conf.dir", "conf"),
SUBMARINE_LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB(
@@ -458,6 +462,10 @@ public class SubmarineConfiguration extends
XMLConfiguration {
SUBMARINE_SERVER_SSL_TRUSTSTORE_TYPE("submarine.server.ssl.truststore.type",
null),
SUBMARINE_SERVER_SSL_TRUSTSTORE_PASSWORD("submarine.server.ssl.truststore.password",
null),
SUBMARINE_CLUSTER_ADDR("submarine.cluster.addr", ""),
+ SUBMARINE_SERVER_REMOTE_EXECUTION_ENABLED(
+ "submarine.server.remote.execution.enabled", false),
+ SUBMARINE_SERVER_REMOTE_EXECUTION_PORT(
+ "submarine.server.remote.execution.port", 8980),
CLUSTER_HEARTBEAT_INTERVAL("cluster.heartbeat.interval", 3000),
CLUSTER_HEARTBEAT_TIMEOUT("cluster.heartbeat.timeout", 9000),
JDBC_DRIVERCLASSNAME("jdbc.driverClassName", "com.mysql.jdbc.Driver"),
@@ -475,8 +483,7 @@ public class SubmarineConfiguration extends
XMLConfiguration {
WORKBENCH_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE(
"workbench.websocket.max.text.message.size", "1024000"),
WORKBENCH_WEB_WAR("workbench.web.war",
"submarine-workbench/workbench-web/dist"),
- SUBMARINE_RUNTIME_CLASS(
- "submarine.runtime.class",
+ SUBMARINE_RUNTIME_CLASS("submarine.runtime.class",
"org.apache.submarine.server.submitter.yarn.YarnRuntimeFactory");
private String varName;
diff --git a/submarine-commons/pom.xml b/submarine-commons/pom.xml
index 9e67fb8..c4b7722 100644
--- a/submarine-commons/pom.xml
+++ b/submarine-commons/pom.xml
@@ -39,6 +39,7 @@
<module>commons-runtime</module>
<module>commons-cluster</module>
<module>commons-metastore</module>
+ <module>commons-rpc</module>
</modules>
</project>
diff --git a/submarine-dist/src/assembly/distribution.xml
b/submarine-dist/src/assembly/distribution.xml
index c8cc582..6c0fa65 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -52,13 +52,11 @@
<fileSet>
<directory>../conf</directory>
<outputDirectory>/conf</outputDirectory>
- <fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>../bin</directory>
<outputDirectory>/bin</outputDirectory>
- <fileMode>0755</fileMode>
</fileSet>
<fileSet>
@@ -97,6 +95,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>../submarine-commons/commons-rpc/target</directory>
+ <outputDirectory>/lib</outputDirectory>
+ <includes>
+ <include>submarine-commons-rpc-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>../submarine-server/server-submitter/submitter-k8s/target</directory>
<outputDirectory>/lib/submitter</outputDirectory>
<includes>
@@ -107,7 +112,7 @@
<directory>../submarine-server/server-submitter/submitter-yarn/target</directory>
<outputDirectory>/lib/submitter</outputDirectory>
<includes>
- <include>submarine-submitter-yarn-${project.version}.jar</include>
+
<include>submarine-submitter-yarn-${project.version}-shade.jar</include>
</includes>
</fileSet>
<fileSet>
@@ -139,6 +144,9 @@
<exclude>commons-utils-${project.version}.jar</exclude>
<exclude>commons-runtime-${project.version}.jar</exclude>
<exclude>commons-cluster-${project.version}.jar</exclude>
+ <exclude>commons-rpc-${project.version}.jar</exclude>
+ <exclude>grpc-*.jar</exclude>
+ <exclude>protobuf-java*.jar</exclude>
</excludes>
</fileSet>
<fileSet>
diff --git a/submarine-server/pom.xml b/submarine-server/pom.xml
index b20aa3e..b026bd4 100644
--- a/submarine-server/pom.xml
+++ b/submarine-server/pom.xml
@@ -36,6 +36,7 @@
<modules>
<module>server-submitter</module>
<module>server-core</module>
+ <module>server-rpc</module>
</modules>
<dependencyManagement>
diff --git a/submarine-server/server-core/pom.xml
b/submarine-server/server-core/pom.xml
index 5dad9b6..beb9435 100644
--- a/submarine-server/server-core/pom.xml
+++ b/submarine-server/server-core/pom.xml
@@ -39,6 +39,86 @@
</dependency>
<dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>server-rpc</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>animal-sniffer-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-webapp</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jaxb-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
diff --git
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
index a5bb208..85828d7 100644
---
a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
+++
b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java
@@ -19,6 +19,7 @@
package org.apache.submarine.server;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.submarine.server.rpc.SubmarineRpcServer;
import org.apache.submarine.server.workbench.websocket.NotebookServer;
import org.apache.submarine.commons.cluster.ClusterServer;
import org.eclipse.jetty.http.HttpVersion;
@@ -52,16 +53,19 @@ import
org.apache.submarine.commons.utils.SubmarineConfiguration.ConfVars;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
+import java.io.IOException;
public class SubmarineServer extends ResourceConfig {
private static final Logger LOG =
LoggerFactory.getLogger(SubmarineServer.class);
public static Server jettyWebServer;
+ public static SubmarineRpcServer rpcServer;
public static ServiceLocator sharedServiceLocator;
private static SubmarineConfiguration conf =
SubmarineConfiguration.getInstance();
- public static void main(String[] args) throws InterruptedException {
+ public static void main(String[] args) throws InterruptedException,
+ IOException {
PropertyConfigurator.configure(ClassLoader.getSystemResource("log4j.properties"));
final SubmarineConfiguration conf = SubmarineConfiguration.getInstance();
@@ -102,6 +106,7 @@ public class SubmarineServer extends ResourceConfig {
// Cluster Server
setupClusterServer();
+ rpcServer = SubmarineRpcServer.startRpcServer();
startServer();
}
diff --git
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
index fbc550c..88e797b 100644
---
a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
+++
b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java
@@ -114,7 +114,7 @@ public abstract class AbstractSubmarineServerTest {
executor.submit(SERVER);
long s = System.currentTimeMillis();
boolean started = false;
- while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes
+ while (System.currentTimeMillis() - s < 1000 * 60 * 5) { // 5 minutes
Thread.sleep(2000);
started = checkIfServerIsRunning();
if (started == true) {
@@ -145,6 +145,7 @@ public abstract class AbstractSubmarineServerTest {
if (!WAS_RUNNING) {
LOG.info("Terminating test Submarine server...");
SubmarineServer.jettyWebServer.stop();
+ SubmarineServer.rpcServer.stop();
executor.shutdown();
long s = System.currentTimeMillis();
diff --git a/submarine-server/server-rpc/pom.xml
b/submarine-server/server-rpc/pom.xml
new file mode 100644
index 0000000..54126d0
--- /dev/null
+++ b/submarine-server/server-rpc/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>submarine-server</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>server-rpc</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <name>Submarine: Server RPC</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-rpc</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>animal-sniffer-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>submitter-yarn</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-runtime</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>submarine-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
new file mode 100644
index 0000000..4ab0d31
--- /dev/null
+++
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.submarine.client.cli.remote.RpcContext;
+import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.ParameterProto;
+import org.apache.submarine.commons.rpc.ParametersHolderProto;
+import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc;
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.runtime.JobSubmitter;
+import org.apache.submarine.commons.runtime.RuntimeFactory;
+import org.apache.submarine.commons.runtime.param.Parameter;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A gRPC server that provides submarine service.
+ */
+public class SubmarineRpcServer {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SubmarineRpcServer.class.getName());
+
+ protected int port;
+ protected Server server;
+
+ public SubmarineRpcServer(int port) throws IOException {
+ this(ServerBuilder.forPort(port), port);
+ }
+
+ /** Create a RouteGuide server using serverBuilder as a base and features as
data. */
+ public SubmarineRpcServer(ServerBuilder<?> serverBuilder, int port) {
+ this(serverBuilder, port, new SubmarineServerRpcService());
+ }
+
+ public SubmarineRpcServer(int port,
+ SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase service) {
+ this(ServerBuilder.forPort(port), port, service);
+ }
+
+ public SubmarineRpcServer(ServerBuilder<?> serverBuilder, int port,
+ SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase service) {
+ this.port = port;
+ server = serverBuilder.addService(service).build();
+ }
+
+ /** Start serving requests. */
+ public void start() throws IOException {
+ server.start();
+ LOG.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // Use stderr here since the logger may have been reset by its JVM
shutdown hook.
+ LOG.info("*** shutting down gRPC server since JVM is shutting down");
+ SubmarineRpcServer.this.stop();
+ }
+ });
+ }
+
+ /** Stop serving requests and shutdown resources. */
+ public void stop() {
+ if (server != null) {
+ server.shutdown();
+ LOG.info("*** server shut down");
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon
threads.
+ */
+ public void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ private static ClientContext getClientContext(RpcContext rpcContext) {
+ Configuration conf = new YarnConfiguration();
+ ClientContext clientContext = new ClientContext();
+ clientContext.setYarnConfig(conf);
+ mergeSubmarineConfiguration(clientContext.getSubmarineConfig(),
rpcContext);
+ RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory(
+ clientContext);
+ clientContext.setRuntimeFactory(runtimeFactory);
+ return clientContext;
+ }
+
+ private static void mergeSubmarineConfiguration(
+ SubmarineConfiguration submarineConfiguration, RpcContext rpcContext) {
+ Map<String, String> submarineJobConfigMap =
+ rpcContext.getSubmarineJobConfigMap();
+ for(Map.Entry<String, String> entry: submarineJobConfigMap.entrySet()){
+ submarineConfiguration.updateConfiguration(
+ entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Main method. This comment makes the linter happy.
+ */
+ public static void main(String[] args) throws Exception {
+ SubmarineRpcServer server = startRpcServer();
+ server.blockUntilShutdown();
+ }
+
+ public static SubmarineRpcServer startRpcServer() throws IOException {
+ SubmarineConfiguration submarineConfiguration =
+ SubmarineConfiguration.getInstance();
+ int rpcServerPort = submarineConfiguration.getInt(
+
SubmarineConfiguration.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT);
+ SubmarineRpcServer server = new SubmarineRpcServer(rpcServerPort);
+ server.start();
+ return server;
+ }
+
+ /**
+ * <p>See SubmarineServerProtocol.proto for details of the methods.
+ */
+ protected static class SubmarineServerRpcService
+ extends SubmarineServerProtocolGrpc.SubmarineServerProtocolImplBase {
+
+ @Override
+ public void submitJob(ParameterProto request,
+ io.grpc.stub.StreamObserver<ApplicationIdProto> responseObserver) {
+ LOG.info("Start to submit a job.");
+ RpcContext rpcContext =
+ SubmarineRpcServerProto.convertParameterProtoToRpcContext(request);
+ Parameter parameter =
+ SubmarineRpcServerProto.convertParameterProtoToParameter(request);
+ ClientContext clientContext = getClientContext(rpcContext);
+ ApplicationId applicationId = null;
+ try {
+ applicationId = run(clientContext, parameter);
+ } catch (IOException | YarnException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ responseObserver.onNext(SubmarineRpcServerProto.
+ convertApplicationIdToApplicationIdProto(applicationId));
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void testRpc(ParametersHolderProto request,
+ io.grpc.stub.StreamObserver<ApplicationIdProto> responseObserver) {
+ responseObserver.onNext(checkFeature(request));
+ responseObserver.onCompleted();
+ }
+
+ private ApplicationIdProto checkFeature(ParametersHolderProto request) {
+ LOG.debug(request.toString());
+ return
ApplicationIdProto.newBuilder().setApplicationId("application_1_1").build();
+ }
+
+ protected ApplicationId run(ClientContext clientContext, Parameter
parameter)
+ throws IOException, YarnException {
+ JobSubmitter jobSubmitter =
+ clientContext.getRuntimeFactory().getJobSubmitterInstance();
+ ApplicationId applicationId = jobSubmitter.submitJob(parameter);
+ return applicationId;
+ }
+ }
+
+}
diff --git
a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
new file mode 100644
index 0000000..3bfe393
--- /dev/null
+++
b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServerProto.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import com.google.protobuf.LazyStringArrayList;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.submarine.client.cli.param.Localization;
+import org.apache.submarine.client.cli.param.ParametersHolder;
+import org.apache.submarine.client.cli.param.Quicklink;
+import org.apache.submarine.client.cli.param.ShowJobParameters;
+import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
+import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
+import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
+import org.apache.submarine.client.cli.remote.RpcContext;
+import org.apache.submarine.client.cli.runjob.RoleParameters;
+import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.LocalizationProto;
+import org.apache.submarine.commons.rpc.ParameterProto;
+import org.apache.submarine.commons.rpc.PyTorchRunJobParameterProto;
+import org.apache.submarine.commons.rpc.QuicklinkProto;
+import org.apache.submarine.commons.rpc.ResourceProto;
+import org.apache.submarine.commons.rpc.RoleParameterProto;
+import org.apache.submarine.commons.rpc.RunParameterProto;
+import org.apache.submarine.commons.rpc.ShowJobParameterProto;
+import org.apache.submarine.commons.rpc.TensorFlowRunJobParameterProto;
+import org.apache.submarine.commons.runtime.Framework;
+import org.apache.submarine.commons.runtime.api.PyTorchRole;
+import org.apache.submarine.commons.runtime.api.Role;
+import org.apache.submarine.commons.runtime.api.TensorFlowRole;
+import org.apache.submarine.commons.runtime.param.Parameter;
+import org.apache.submarine.commons.runtime.resource.ResourceUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SubmarineRpcServerProto {
+
+ public static Parameter convertParameterProtoToParameter(
+ ParameterProto parameterProto) {
+ Parameter parameter = null;
+ if (parameterProto.hasPytorchRunJobParameter()) {
+ parameter = convertParameterProtoToPyTorchRunJob(parameterProto);
+ } else if (parameterProto.hasTensorflowRunJobParameter()) {
+ parameter = convertParameterProtoToTensorFlowRunJob(parameterProto);
+ } else if (parameterProto.hasShowJobParameter()) {
+ parameter = convertParameterProtoToShowJob(parameterProto);
+ }
+ return parameter;
+ }
+
+ public static Parameter convertParameterProtoToPyTorchRunJob(
+ ParameterProto parameterProto) {
+ Framework framework =
Framework.parseByValue(parameterProto.getFramework());
+ PyTorchRunJobParameterProto pyTorchRunJobParameterProto =
+ parameterProto.getPytorchRunJobParameter();
+ PyTorchRunJobParameters runJobParameters = new PyTorchRunJobParameters();
+
+ Parameter parameter =
convertRunParametersProtoToParameter(runJobParameters,
+ pyTorchRunJobParameterProto.getRunParameterProto(), framework);
+ return parameter;
+ }
+
+ public static Parameter convertParameterProtoToTensorFlowRunJob(
+ ParameterProto parameterProto) {
+ TensorFlowRunJobParameterProto tensorFlowRunJobParameterProto =
+ parameterProto.getTensorflowRunJobParameter();
+ Framework framework =
Framework.parseByValue(parameterProto.getFramework());
+
+ TensorFlowRunJobParameters runJobParameters =
+ new TensorFlowRunJobParameters();
+ runJobParameters.setTensorboardEnabled(
+ tensorFlowRunJobParameterProto.getTensorboardEnabled());
+ runJobParameters.setPsParameters(convertRoleParameterProtoToRoleParameters(
+ tensorFlowRunJobParameterProto.getPsParameter(), framework));
+ runJobParameters.setTensorBoardParameters(
+ convertRoleParameterProtoToRoleParameters(
+ tensorFlowRunJobParameterProto.getTensorBoardParameter(),
framework));
+
+ Parameter parameter =
convertRunParametersProtoToParameter(runJobParameters,
+ tensorFlowRunJobParameterProto.getRunParameterProto(), framework);
+ parameter.setFramework(framework);
+ return parameter;
+ }
+
+ public static Parameter convertParameterProtoToShowJob(
+ ParameterProto parameterProto) {
+ Framework framework =
Framework.parseByValue(parameterProto.getFramework());
+ ShowJobParameterProto showJobParameterProto =
+ parameterProto.getShowJobParameter();
+ ShowJobParameters showJobParameters = new ShowJobParameters();
+ showJobParameters.setName(showJobParameterProto.getName());
+
+ ParametersHolder parameter = ParametersHolder.create();
+ parameter.setParameters(showJobParameters);
+ parameter.setFramework(framework);
+ return parameter;
+ }
+
+ public static RpcContext convertParameterProtoToRpcContext(
+ ParameterProto parameterProto) {
+ RpcContext rpcContext = new RpcContext();
+ if (parameterProto.getSubmarineJobConfigMapMap() != null) {
+ rpcContext.setSubmarineJobConfigMap(
+ parameterProto.getSubmarineJobConfigMapMap());
+ }
+ return rpcContext;
+ }
+
+ public static Parameter convertRunParametersProtoToParameter(
+ RunJobParameters runJobParameters, RunParameterProto
runJobParameterProto,
+ Framework framework) {
+ if (StringUtils.isNotBlank(runJobParameterProto.getCheckpointPath())) {
+ runJobParameters.setCheckpointPath(
+ runJobParameterProto.getCheckpointPath());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getDockerImageName())) {
+ runJobParameters.setDockerImageName(
+ runJobParameterProto.getDockerImageName());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getInput())) {
+ runJobParameters.setInputPath(runJobParameterProto.getInput());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getKeytab())) {
+ runJobParameters.setKeytab(runJobParameterProto.getKeytab());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getName())) {
+ runJobParameters.setName(runJobParameterProto.getName());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getPrincipal())) {
+ runJobParameters.setPrincipal(runJobParameterProto.getPrincipal());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getQueue())) {
+ runJobParameters.setQueue(runJobParameterProto.getQueue());
+ }
+ if (StringUtils.isNotBlank(runJobParameterProto.getSavedModelPath())) {
+ runJobParameters.setSavedModelPath(
+ runJobParameterProto.getSavedModelPath());
+ }
+
+ runJobParameters.setConfPairs(runJobParameterProto.getConfPairsList())
+ .setDistributed(runJobParameterProto.getDistributed())
+ .setDistributeKeytab(runJobParameterProto.getDistributeKeytab())
+ .setLocalizations(convertLocalizationProtoToLocalization(
+ runJobParameterProto.getLocalizationsList()))
+ .setQuicklinks(convertQuicklinkProtoToQuicklink(
+ runJobParameterProto.getQuicklinksList()))
+ .setSecurityDisabled(runJobParameterProto.getSecurityDisabled())
+ .setWaitJobFinish(runJobParameterProto.getWaitJobFinish())
+ .setWorkerParameter(convertRoleParameterProtoToRoleParameters(
+ runJobParameterProto.getWorkerParameter(), framework));
+ runJobParameters.setEnvars(runJobParameterProto.getEnvarsList());
+
+ ParametersHolder parameter = ParametersHolder.create();
+ parameter.setParameters(runJobParameters);
+ parameter.setFramework(framework);
+ return parameter;
+ }
+
+ public static RoleParameters convertRoleParameterProtoToRoleParameters(
+ RoleParameterProto roleParameterProto, Framework framework) {
+ Role role = null;
+ switch (framework) {
+ case TENSORFLOW:
+ role = TensorFlowRole.valueOf(roleParameterProto.getRole());
+ break;
+ case PYTORCH:
+ role = PyTorchRole.valueOf(roleParameterProto.getRole());
+ break;
+ }
+
+ RoleParameters roleParameters = RoleParameters.createEmpty(role);
+ if (StringUtils.isNotBlank(roleParameterProto.getDockerImage())) {
+ roleParameters.setDockerImage(roleParameterProto.getDockerImage());
+ }
+ if (StringUtils.isNotBlank(roleParameterProto.getLaunchCommand())) {
+ roleParameters.setLaunchCommand(roleParameterProto.getLaunchCommand());
+ }
+
+ roleParameters.setReplicas(roleParameterProto.getReplicas())
+ .setResource(convertResourceProtoToResource(
+ roleParameterProto.getResourceProto()));
+ return roleParameters;
+ }
+
+ public static List<Localization> convertLocalizationProtoToLocalization(
+ List<LocalizationProto> localizationsList) {
+ List<Localization> localizations = new ArrayList<Localization>();
+ for (LocalizationProto localizationProto: localizationsList) {
+ Localization localization = new Localization();
+ localization.setLocalPath(localizationProto.getLocalPath())
+ .setMountPermission(localizationProto.getMountPermission())
+ .setRemoteUri(localizationProto.getRemoteUri());
+ localizations.add(localization);
+ }
+ return localizations;
+ }
+
+ public static List<Quicklink> convertQuicklinkProtoToQuicklink(
+ List<QuicklinkProto> quicklinksList) {
+ List<Quicklink> quicklinks = new ArrayList<Quicklink>();
+ for (QuicklinkProto quicklinkProto: quicklinksList) {
+ Quicklink quicklink = new Quicklink();
+ quicklink.setLabel(quicklinkProto.getLabel())
+ .setComponentInstanceName(quicklinkProto.getComponentInstanceName())
+ .setProtocol(quicklinkProto.getProtocol())
+ .setPort(quicklinkProto.getPort());
+ quicklinks.add(quicklink);
+ }
+ return quicklinks;
+ }
+
+ public static Resource convertResourceProtoToResource(
+ ResourceProto resourceProto) {
+ Resource resource = ResourceUtils.createResource(
+ resourceProto.getResourceMapMap());
+ return resource;
+ }
+
+ public ParameterProto convertShowJobToParameterProto(Parameter parameters) {
+ ShowJobParameterProto showJobproto = ShowJobParameterProto.newBuilder()
+ .setName(parameters.getParameters().getName())
+ .build();
+ ParameterProto parameterProto = ParameterProto.newBuilder()
+ .setShowJobParameter(showJobproto)
+ .setFramework(parameters.getFramework().getValue())
+ .build();
+ return parameterProto;
+ }
+
+ public static ApplicationIdProto convertApplicationIdToApplicationIdProto(
+ ApplicationId applicationId) {
+ String application = applicationId != null ? applicationId.toString() : "";
+ ApplicationIdProto applicationIdProto =
+ ApplicationIdProto.newBuilder().setApplicationId(
+ application).build();
+ return applicationIdProto;
+ }
+
+}
\ No newline at end of file
diff --git
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
new file mode 100644
index 0000000..c85aefd
--- /dev/null
+++
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import io.grpc.ServerBuilder;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.submarine.client.cli.CliUtils;
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.runtime.param.Parameter;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+
+import java.io.IOException;
+
+public class MockRpcServer extends SubmarineRpcServer {
+
+ public MockRpcServer(int port) throws IOException {
+ this(ServerBuilder.forPort(port), port);
+ }
+
+ public MockRpcServer(ServerBuilder<?> serverBuilder, int port) {
+ super(serverBuilder, port, new mockSubmarineServerRpcService());
+ }
+
+ protected static class mockSubmarineServerRpcService
+ extends SubmarineServerRpcService {
+ @Override
+ protected ApplicationId run(ClientContext clientContext,
+ Parameter parameter) {
+ return CliUtils.fromString("application_1_123");
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ SubmarineRpcServer server = startRpcServer();
+ server.blockUntilShutdown();
+ }
+
+ public static SubmarineRpcServer startRpcServer() throws IOException {
+ SubmarineConfiguration submarineConfiguration =
+ SubmarineConfiguration.getInstance();
+ int rpcServerPort = submarineConfiguration.getInt(
+
SubmarineConfiguration.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT);
+ SubmarineRpcServer server = new MockRpcServer(rpcServerPort);
+ server.start();
+ return server;
+ }
+
+}
\ No newline at end of file
diff --git
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/RpcServerTestUtils.java
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/RpcServerTestUtils.java
new file mode 100644
index 0000000..3a199f3
--- /dev/null
+++
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/RpcServerTestUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RpcServerTestUtils {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(RpcServerTestUtils.class);
+ static ExecutorService executor;
+
+ public static void startUp(String testClassName) throws Exception {
+ LOG.info("Staring test Submarine rpc server for " + testClassName);
+
+ executor = Executors.newSingleThreadExecutor();
+ executor.submit(
+ () -> {
+ try {
+ MockRpcServer.main(new String[]{""});
+ } catch (Exception e) {
+ LOG.error("Exception in Starting submarine rpc server.", e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ long s = System.currentTimeMillis();
+ boolean started = false;
+ while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes
+ Thread.sleep(2000);
+ started = checkIfRpcServerIsRunning();
+ if (started == true) {
+ break;
+ }
+ }
+ if (started == false) {
+ throw new RuntimeException("Can not start Submarine server.");
+ }
+ LOG.info("Test Submarine rpc server stared.");
+ }
+
+ private static boolean checkIfRpcServerIsRunning() {
+ SubmarineConfiguration config = SubmarineConfiguration.getInstance();
+ SubmarineRpcClient client = new SubmarineRpcClient(config);
+ boolean isRunning = false;
+ try {
+ isRunning = client.testRpcConnection();
+ } catch(InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return isRunning;
+ }
+}
diff --git
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java
new file mode 100644
index 0000000..f15eadb
--- /dev/null
+++
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.StatusRuntimeException;
+import org.apache.submarine.commons.rpc.ApplicationIdProto;
+import org.apache.submarine.commons.rpc.ParametersHolderProto;
+import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc;
+import
org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc.SubmarineServerProtocolBlockingStub;
+import
org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc.SubmarineServerProtocolStub;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Sample client code that makes gRPC calls to the server.
+ */
+public class SubmarineRpcClient extends RpcServerTestUtils {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SubmarineRpcClient.class.getName());
+
+ protected final ManagedChannel channel;
+ protected final SubmarineServerProtocolBlockingStub blockingStub;
+ protected final SubmarineServerProtocolStub asyncStub;
+
+ public SubmarineRpcClient(SubmarineConfiguration config) {
+ this(ManagedChannelBuilder.forAddress(
+ config.getString(
+ SubmarineConfiguration.ConfVars.SUBMARINE_SERVER_ADDR),
+ config.getInt(
+ SubmarineConfiguration.ConfVars.
+ SUBMARINE_SERVER_REMOTE_EXECUTION_PORT))
+ .usePlaintext());
+ }
+
+ /** Construct client for accessing RouteGuide server at {@code host:port}. */
+ public SubmarineRpcClient(String host, int port) {
+ this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
+ }
+
+ /** Construct client for accessing RouteGuide server using the existing
channel. */
+ public SubmarineRpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ channel = channelBuilder.build();
+ blockingStub = SubmarineServerProtocolGrpc.newBlockingStub(channel);
+ asyncStub = SubmarineServerProtocolGrpc.newStub(channel);
+ }
+
+ public void shutdown() throws InterruptedException {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ public boolean testRpcConnection() throws InterruptedException {
+ LOG.info("Try to connect to submarine rpc server.");
+ boolean isRunning = false;
+
+ ParametersHolderProto request =
+ ParametersHolderProto.newBuilder().setHelloworld(1).build();
+ try {
+ blockingStub.testRpc(request);
+ isRunning = true;
+ } catch (StatusRuntimeException e) {
+ LOG.error(e.getMessage(),e);
+ } finally {
+ shutdown();
+ }
+ return isRunning;
+ }
+
+
+ public static void main(String[] args) throws InterruptedException {
+ SubmarineRpcClient client = new SubmarineRpcClient("localhost", 8980);
+ try {
+ // Looking for a valid feature
+ client.testRpcConnection();
+ } finally {
+ client.shutdown();
+ }
+ }
+
+}
diff --git
a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcServerTest.java
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcServerTest.java
new file mode 100644
index 0000000..5ea54f7
--- /dev/null
+++
b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcServerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.server.rpc;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.submarine.client.cli.remote.RpcRuntimeFactory;
+import org.apache.submarine.client.cli.runjob.RunJobCli;
+import org.apache.submarine.commons.runtime.ClientContext;
+import org.apache.submarine.commons.runtime.RuntimeFactory;
+import org.apache.submarine.commons.runtime.exception.SubmarineException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SubmarineRpcServerTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SubmarineRpcServerTest.class.getName());
+
+ @BeforeClass
+ public static void init() throws Exception {
+ RpcServerTestUtils.startUp(
+ SubmarineRpcServerTest.class.getSimpleName());
+ }
+
+ @Test
+ public void testSubmitSubmarineJob() throws InterruptedException,
+ SubmarineException, YarnException, ParseException, IOException {
+ LOG.info("testSubmitSubmarineJob start.");
+ ClientContext clientContext = new ClientContext();
+ RuntimeFactory runtimeFactory = new RpcRuntimeFactory(clientContext);
+ clientContext.setRuntimeFactory(runtimeFactory);
+
+ String[] moduleArgs = new String []{
+ "--name", "tf-job-001",
+ "--framework", "tensorflow",
+ "--input_path", "",
+ "--num_workers", "2",
+ "--worker_resources", "memory=1G,vcores=1",
+ "--num_ps", "1",
+ "--ps_resources", "memory=1G,vcores=1",
+ "--worker_launch_cmd", "${WORKER_CMD}",
+ "--ps_launch_cmd", "myvenv.zip/venv/bin/python mnist_distributed.py
--steps 2 --data_dir /tmp/data --working_dir /tmp/mode",
+ "--insecure"
+ };
+ new RunJobCli(clientContext).run(moduleArgs);
+ LOG.info("testSubmitSubmarineJob done.");
+ }
+
+}
diff --git a/submarine-server/server-submitter/submitter-yarn/pom.xml
b/submarine-server/server-submitter/submitter-yarn/pom.xml
index cb2b2ed..2fd0d3f 100644
--- a/submarine-server/server-submitter/submitter-yarn/pom.xml
+++ b/submarine-server/server-submitter/submitter-yarn/pom.xml
@@ -48,6 +48,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -104,6 +108,26 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-guice</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -175,6 +199,18 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -222,6 +258,44 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${plugin.shade.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<outputFile>target/submarine-${artifactId}-${project.version}-shade.jar</outputFile>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ <relocations>
+ <!--
+ <relocation>
+ <pattern>javax.ws.rs</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.javax.ws.rs</shadedPattern>
+ </relocation>
+ -->
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<skip>false</skip>
diff --git a/submarine-server/server-submitter/submitter-yarnservice/pom.xml
b/submarine-server/server-submitter/submitter-yarnservice/pom.xml
index a11a08c..7107a10 100644
--- a/submarine-server/server-submitter/submitter-yarnservice/pom.xml
+++ b/submarine-server/server-submitter/submitter-yarnservice/pom.xml
@@ -47,6 +47,12 @@
<groupId>org.apache.submarine</groupId>
<artifactId>submarine-client</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -200,11 +206,6 @@
</dependency>
<dependency>
<groupId>org.apache.submarine</groupId>
- <artifactId>submarine-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.submarine</groupId>
<artifactId>commons-runtime</artifactId>
<version>${project.version}</version>
</dependency>
@@ -330,8 +331,6 @@
<goals>
<goal>jar</goal>
</goals>
- <!-- strictly speaking, the unit test is really a regression test.
It
- needs the main jar to be available to be able to run. -->
<phase>test-compile</phase>
</execution>
</executions>
@@ -343,6 +342,7 @@
</archive>
</configuration>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
diff --git a/submarine-workbench/interpreter/pom.xml
b/submarine-workbench/interpreter/pom.xml
index f24fec5..2a66727 100644
--- a/submarine-workbench/interpreter/pom.xml
+++ b/submarine-workbench/interpreter/pom.xml
@@ -42,4 +42,28 @@
<module>spark-interpreter</module>
</modules>
+ <properties>
+ <grpc.verison>1.15.0</grpc.verison>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.verison}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
</project>
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml
b/submarine-workbench/interpreter/python-interpreter/pom.xml
index d589a94..178583a 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -237,6 +237,14 @@
<pattern>org.apache.zeppelin</pattern>
<shadedPattern>${shaded.dependency.prefix}.org.apache.zeppelin</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>io.grpc</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.io.grpc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.opencensus</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.io.opencensus</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/submarine-workbench/interpreter/spark-interpreter/pom.xml
b/submarine-workbench/interpreter/spark-interpreter/pom.xml
index 4388173..6eae8ba 100644
--- a/submarine-workbench/interpreter/spark-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/spark-interpreter/pom.xml
@@ -218,6 +218,20 @@
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.zeppelin</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.org.apache.zeppelin</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.grpc</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.io.grpc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.opencensus</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.io.opencensus</shadedPattern>
+ </relocation>
+ </relocations>
</configuration>
</execution>
</executions>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]