This is an automated email from the ASF dual-hosted git repository. zhouquan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push: new 6bfe2d5 SUBMARINE-390. Add yarn submitter with classloader 6bfe2d5 is described below commit 6bfe2d52ab8c2bc0857ce992b790fbaafc8a828a Author: Zac Zhou <zhouq...@apache.org> AuthorDate: Mon Feb 24 10:36:44 2020 +0800 SUBMARINE-390. Add yarn submitter with classloader ### What is this PR for? Submitter dependencies are excluded from submarine server. Submarine rpc server should initialize yarn submitter through a classloader. ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/SUBMARINE-390 ### How should this be tested? https://travis-ci.org/yuanzac/hadoop-submarine/builds/652407783?utm_source=github_status&utm_medium=notification ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Zac Zhou <zhouq...@apache.org> Closes #187 from yuanzac/topic/SUBMARINE-390 and squashes the following commits: 97bf6cc [Zac Zhou] Add rpc port configuration in submarine-site.xml and submarine-site.xml.template daaa08e [Zac Zhou] Rename submarine.server.remote.execution.enabled to submarine.server.rpc.enabled. d779f2f [Zac Zhou] Move yarn submitter to lib/submitter/yarn. 110cd46 [Zac Zhou] Add yarn submitter with classloader. --- bin/common.sh | 2 +- conf/submarine-site.xml | 12 ++++++ conf/submarine-site.xml.template | 10 ++++- dev-support/mini-submarine/README.md | 4 +- dev-support/mini-submarine/conf/submarine-site.xml | 2 +- .../java/org/apache/submarine/client/cli/Cli.java | 5 ++- .../client/cli/remote/RpcRuntimeFactory.java | 2 +- .../submarine/commons/runtime/RuntimeFactory.java | 4 +- .../submarine/commons/utils/SubmarineConfVars.java | 8 ++-- submarine-dist/src/assembly/distribution.xml | 6 +-- .../submarine/server/rpc/SubmarineRpcServer.java | 46 +++++++++++++++++++++- .../apache/submarine/server/rpc/MockRpcServer.java | 4 +- .../submarine/server/rpc/SubmarineRpcClient.java | 3 +- 13 files changed, 84 insertions(+), 24 deletions(-) diff --git a/bin/common.sh b/bin/common.sh index 0bccf5d..0b89619 100755 --- a/bin/common.sh +++ b/bin/common.sh @@ -88,7 +88,7 @@ function download_mysql_jdbc_jar(){ echo "Mysql jdbc jar is downloaded and put in the path of submarine/lib." } -JAVA_OPTS+=" ${SUBMARINE_SERVER_JAVA_OPTS} -Dfile.encoding=UTF-8 ${SUBMARINE_SERVER_MEM}" +JAVA_OPTS+=" -Dfile.encoding=UTF-8" JAVA_OPTS+=" -Dlog4j.configuration=file://${SUBMARINE_CONF_DIR}/log4j.properties" export JAVA_OPTS diff --git a/conf/submarine-site.xml b/conf/submarine-site.xml index c3a9919..397e1bd 100755 --- a/conf/submarine-site.xml +++ b/conf/submarine-site.xml @@ -143,6 +143,18 @@ <description>RuntimeFactory for Submarine jobs</description> </property> + <property> + <name>submarine.server.rpc.enabled</name> + <value>false</value> + <description>Run jobs using rpc server.</description> + </property> + + <property> + <name>submarine.server.rpc.port</name> + <value>8980</value> + <description>Rpc server port</description> + </property> + <!-- Submarine Submitters Configuration --> <property> <name>submarine.submitters</name> diff --git a/conf/submarine-site.xml.template b/conf/submarine-site.xml.template index 0583752..b9755ac 100755 --- a/conf/submarine-site.xml.template +++ b/conf/submarine-site.xml.template @@ -144,9 +144,15 @@ </property> <property> - <name>submarine.server.remote.execution.enabled</name> - <value>true</value> + <name>submarine.server.rpc.enabled</name> + <value>false</value> <description>Run jobs using rpc server.</description> </property> + <property> + <name>submarine.server.rpc.port</name> + <value>8980</value> + <description>Rpc server port</description> + </property> + </configuration> diff --git a/dev-support/mini-submarine/README.md b/dev-support/mini-submarine/README.md index bc657bf..f097fc5 100644 --- a/dev-support/mini-submarine/README.md +++ b/dev-support/mini-submarine/README.md @@ -162,11 +162,11 @@ Submarine server is supposed to manage jobs lifecycle. Clients can just submit job parameters or yaml file to submarine server instead of submitting jobs directly by themselves. Submarine server can handle the rest of the work. -Set submarine.server.remote.execution.enabled to true in the file of +Set submarine.server.rpc.enabled to true in the file of /opt/submarine-current/conf/submarine-site ``` <property> - <name>submarine.server.remote.execution.enabled</name> + <name>submarine.server.rpc.enabled</name> <value>true</value> <description>Run jobs using rpc server.</description> </property> diff --git a/dev-support/mini-submarine/conf/submarine-site.xml b/dev-support/mini-submarine/conf/submarine-site.xml index 4c3257f..462dd11 100644 --- a/dev-support/mini-submarine/conf/submarine-site.xml +++ b/dev-support/mini-submarine/conf/submarine-site.xml @@ -114,7 +114,7 @@ </property> <property> - <name>submarine.server.remote.execution.enabled</name> + <name>submarine.server.rpc.enabled</name> <value>false</value> <description>Run jobs using rpc server.</description> </property> diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java index 4cfa663..4353e3d 100644 --- a/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java +++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/Cli.java @@ -51,12 +51,13 @@ public class Cli { RuntimeFactory runtimeFactory; if (clientContext.getSubmarineConfig().getBoolean( SubmarineConfVars.ConfVars. - SUBMARINE_SERVER_REMOTE_EXECUTION_ENABLED)) { + SUBMARINE_SERVER_RPC_ENABLED)) { runtimeFactory = new RpcRuntimeFactory(clientContext); } else { Configuration conf = new YarnConfiguration(); clientContext.setYarnConfig(conf); - runtimeFactory = RuntimeFactory.getRuntimeFactory(clientContext); + runtimeFactory = RuntimeFactory.getRuntimeFactory(clientContext, + Thread.currentThread().getContextClassLoader()); } clientContext.setRuntimeFactory(runtimeFactory); return clientContext; diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java index 7596d8a..56433e9 100644 --- a/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java +++ b/submarine-client/src/main/java/org/apache/submarine/client/cli/remote/RpcRuntimeFactory.java @@ -37,7 +37,7 @@ public class RpcRuntimeFactory extends RuntimeFactory { super(clientContext); String remoteHost = clientContext.getSubmarineConfig().getServerAddress(); int port = clientContext.getSubmarineConfig().getInt( - SubmarineConfVars.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT); + SubmarineConfVars.ConfVars.SUBMARINE_SERVER_RPC_PORT); submitter = new JobSubmitterRpcImpl(remoteHost, port, clientContext); } diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java index aad555e..f6a65ad 100644 --- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java +++ b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java @@ -38,14 +38,14 @@ public abstract class RuntimeFactory { } public static RuntimeFactory getRuntimeFactory( - ClientContext clientContext) { + ClientContext clientContext, ClassLoader classLoader) { SubmarineConfiguration submarineConfiguration = clientContext.getSubmarineConfig(); String runtimeClass = submarineConfiguration.getString( SubmarineConfVars.ConfVars.SUBMARINE_RUNTIME_CLASS); try { - Class<?> runtimeClazz = Class.forName(runtimeClass); + Class<?> runtimeClazz = Class.forName(runtimeClass, true, classLoader); if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) { return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext); } else { diff --git a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfVars.java b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfVars.java index 7e6e586..99e624c 100644 --- a/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfVars.java +++ b/submarine-commons/commons-utils/src/main/java/org/apache/submarine/commons/utils/SubmarineConfVars.java @@ -44,10 +44,10 @@ public class SubmarineConfVars { SUBMARINE_SERVER_SSL_TRUSTSTORE_TYPE("submarine.server.ssl.truststore.type", null), SUBMARINE_SERVER_SSL_TRUSTSTORE_PASSWORD("submarine.server.ssl.truststore.password", null), SUBMARINE_CLUSTER_ADDR("submarine.cluster.addr", ""), - SUBMARINE_SERVER_REMOTE_EXECUTION_ENABLED( - "submarine.server.remote.execution.enabled", false), - SUBMARINE_SERVER_REMOTE_EXECUTION_PORT( - "submarine.server.remote.execution.port", 8980), + SUBMARINE_SERVER_RPC_ENABLED( + "submarine.server.rpc.enabled", false), + SUBMARINE_SERVER_RPC_PORT( + "submarine.server.rpc.port", 8980), CLUSTER_HEARTBEAT_INTERVAL("cluster.heartbeat.interval", 3000), CLUSTER_HEARTBEAT_TIMEOUT("cluster.heartbeat.timeout", 9000), diff --git a/submarine-dist/src/assembly/distribution.xml b/submarine-dist/src/assembly/distribution.xml index 60484d0..03aa9da 100644 --- a/submarine-dist/src/assembly/distribution.xml +++ b/submarine-dist/src/assembly/distribution.xml @@ -23,7 +23,7 @@ <dependencySets> <dependencySet> - <outputDirectory>/lib/submitter</outputDirectory> + <outputDirectory>/lib/submitter/yarn</outputDirectory> <includes> <include>com.linkedin.tony:tony-core</include> </includes> @@ -145,14 +145,14 @@ <fileSet> <directory>../submarine-server/server-submitter/submitter-yarn/target</directory> - <outputDirectory>/lib/submitter</outputDirectory> + <outputDirectory>/lib/submitter/yarn</outputDirectory> <includes> <include>submarine-submitter-yarn-${project.version}-shade.jar</include> </includes> </fileSet> <fileSet> <directory>../submarine-server/server-submitter/submitter-yarnservice/target</directory> - <outputDirectory>/lib/submitter</outputDirectory> + <outputDirectory>/lib/submitter/yarnservice</outputDirectory> <includes> <include>submarine-submitter-yarnservice-${project.version}.jar</include> </includes> diff --git a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java index dc3e936..d048c56 100644 --- a/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java +++ b/submarine-server/server-rpc/src/main/java/org/apache/submarine/server/rpc/SubmarineRpcServer.java @@ -40,9 +40,17 @@ import org.apache.submarine.commons.utils.SubmarineConfVars; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import static org.apache.submarine.commons.utils.SubmarineConfVars.ConfVars.SUBMARINE_RUNTIME_CLASS; + /** * A gRPC server that provides submarine service. */ @@ -109,11 +117,45 @@ public class SubmarineRpcServer { ClientContext clientContext = new ClientContext(); clientContext.setYarnConfig(conf); mergeSubmarineConfiguration(clientContext.getSubmarineConfig(), rpcContext); + String runtimeClass = + clientContext.getSubmarineConfig().getString(SUBMARINE_RUNTIME_CLASS); + ClassLoader classLoader = null; + if (runtimeClass.contains("YarnServiceRuntimeFactory")) { + classLoader = new URLClassLoader(constructUrlsFromClasspath("../lib/submitter/yarnservice")); + } else { + classLoader = new URLClassLoader(constructUrlsFromClasspath("../lib/submitter/yarn")); + } + RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory( - clientContext); + clientContext, classLoader); clientContext.setRuntimeFactory(runtimeFactory); return clientContext; } + private static URL[] constructUrlsFromClasspath(String classpath) { + List<URL> urls = new ArrayList<>(); + for (String path : classpath.split(File.pathSeparator)) { + if (path.endsWith("/*")) { + path = path.substring(0, path.length() - 2); + } + + File file = new File(path); + try { + if (file.isDirectory()) { + File[] items = file.listFiles(); + if (items != null) { + for (File item : items) { + urls.add(item.toURI().toURL()); + } + } + } else { + urls.add(file.toURI().toURL()); + } + } catch (MalformedURLException e) { + LOG.error(e.getMessage(), e); + } + } + return urls.toArray(new URL[0]); + } private static void mergeSubmarineConfiguration( SubmarineConfiguration submarineConfiguration, RpcContext rpcContext) { @@ -137,7 +179,7 @@ public class SubmarineRpcServer { SubmarineConfiguration submarineConfiguration = SubmarineConfiguration.getInstance(); int rpcServerPort = submarineConfiguration.getInt( - SubmarineConfVars.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT); + SubmarineConfVars.ConfVars.SUBMARINE_SERVER_RPC_PORT); SubmarineRpcServer server = new SubmarineRpcServer(rpcServerPort); server.start(); return server; diff --git a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java index df08233..afd4c42 100644 --- a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java +++ b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/MockRpcServer.java @@ -76,10 +76,10 @@ public class MockRpcServer extends SubmarineRpcServer { SubmarineConfiguration submarineConfiguration = SubmarineConfiguration.getInstance(); int rpcServerPort = submarineConfiguration.getInt( - SubmarineConfVars.ConfVars.SUBMARINE_SERVER_REMOTE_EXECUTION_PORT); + SubmarineConfVars.ConfVars.SUBMARINE_SERVER_RPC_PORT); SubmarineRpcServer server = new MockRpcServer(rpcServerPort); server.start(); return server; } -} \ No newline at end of file +} diff --git a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java index 7be095c..8b05561 100644 --- a/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java +++ b/submarine-server/server-rpc/src/test/java/org/apache/submarine/server/rpc/SubmarineRpcClient.java @@ -20,7 +20,6 @@ package org.apache.submarine.server.rpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; -import org.apache.submarine.commons.rpc.ApplicationIdProto; import org.apache.submarine.commons.rpc.ParametersHolderProto; import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc; import org.apache.submarine.commons.rpc.SubmarineServerProtocolGrpc.SubmarineServerProtocolBlockingStub; @@ -50,7 +49,7 @@ public class SubmarineRpcClient extends RpcServerTestUtils { SubmarineConfVars.ConfVars.SUBMARINE_SERVER_ADDR), config.getInt( SubmarineConfVars.ConfVars. - SUBMARINE_SERVER_REMOTE_EXECUTION_PORT)) + SUBMARINE_SERVER_RPC_PORT)) .usePlaintext()); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org For additional commands, e-mail: dev-h...@submarine.apache.org