srkukarni closed pull request #2892: Remove runtime dependency from pulsar-admin URL: https://github.com/apache/pulsar/pull/2892
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/function-localrunner b/bin/function-localrunner new file mode 100755 index 0000000000..16362b4a0e --- /dev/null +++ b/bin/function-localrunner @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +BINDIR=$(dirname "$0") +export PULSAR_HOME=`cd $BINDIR/..;pwd` +. "$PULSAR_HOME/bin/pulsar-admin-common.sh" + +# functions related variables +FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions +DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar +JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"} +DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py +PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"} + +# find the java instance location +if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then + # didn't find a released jar, then search the built jar + BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" + if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then + JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR} + else + echo "\nCouldn't find pulsar java instance jar."; + echo "Make sure you've run 'mvn package'\n"; + exit 1; + fi +fi + +# find the python instance location +if [ ! -f "${PY_INSTANCE_FILE}" ]; then + # didn't find a released python instance, then search the built python instance + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" + if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then + PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} + else + echo "\nCouldn't find pulsar python instance."; + echo "Make sure you've run 'mvn package'\n"; + exit 1; + fi +fi + +# functions +OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}" +OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" + +MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner" + +#Change to PULSAR_HOME to support relative paths +cd "$PULSAR_HOME" +exec $JAVA $OPTS $MAINCLASS "$@" diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 39d12393b2..97206e61f4 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -20,129 +20,7 @@ BINDIR=$(dirname "$0") export PULSAR_HOME=`cd $BINDIR/..;pwd` - -DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf -DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml - -# functions related variables -FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions -DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar -JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"} -DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py -PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"} - -if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ] -then - . "$PULSAR_HOME/conf/pulsar_tools_env.sh" -fi - -# Check for the java to use -if [[ -z $JAVA_HOME ]]; then - JAVA=$(which java) - if [ $? != 0 ]; then - echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2 - exit 1 - fi -else - JAVA=$JAVA_HOME/bin/java -fi - -# exclude tests jar -RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? == 0 ]; then - PULSAR_JAR=$RELEASE_JAR -fi - -# exclude tests jar -BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then - echo "\nCouldn't find pulsar jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; -elif [ -e "$BUILT_JAR" ]; then - PULSAR_JAR=$BUILT_JAR -fi - -add_maven_deps_to_classpath() { - MVN="mvn" - if [ "$MAVEN_HOME" != "" ]; then - MVN=${MAVEN_HOME}/bin/mvn - fi - - # Need to generate classpath from maven pom. This is costly so generate it - # and cache it. Save the file into our target dir so a mvn clean will get - # clean it up and force us create a new one. - f="${PULSAR_HOME}/distribution/server/target/classpath.txt" - if [ ! -f "${f}" ] - then - ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null - fi - PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"` -} - -if [ -d "$PULSAR_HOME/lib" ]; then - PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*" -else - add_maven_deps_to_classpath -fi - -if [ -z "$PULSAR_CLIENT_CONF" ]; then - PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF -fi -if [ -z "$PULSAR_LOG_CONF" ]; then - PULSAR_LOG_CONF=$DEFAULT_LOG_CONF -fi - -PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH" -PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH" -OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`" -OPTS="$OPTS -Djava.net.preferIPv4Stack=true" - -OPTS="-cp $PULSAR_CLASSPATH $OPTS" - -OPTS="$OPTS $PULSAR_EXTRA_OPTS" - -# log directory & file -PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"} -PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"} -PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"} -PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"} - -#Configure log configuration system properties -OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER" -OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR" -OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL" -OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT" - -# find the java instance location -if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then - # didn't find a released jar, then search the built jar - BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" - if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then - JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR} - else - echo "\nCouldn't find pulsar java instance jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# find the python instance location -if [ ! -f "${PY_INSTANCE_FILE}" ]; then - # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" - if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then - PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} - else - echo "\nCouldn't find pulsar python instance."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# functions -OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}" -OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" +. "$PULSAR_HOME/bin/pulsar-admin-common.sh" #Change to PULSAR_HOME to support relative paths cd "$PULSAR_HOME" diff --git a/bin/pulsar-admin-common.sh b/bin/pulsar-admin-common.sh new file mode 100755 index 0000000000..0ccfc70985 --- /dev/null +++ b/bin/pulsar-admin-common.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf +DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml + +if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ] +then + . "$PULSAR_HOME/conf/pulsar_tools_env.sh" +fi + +# Check for the java to use +if [[ -z $JAVA_HOME ]]; then + JAVA=$(which java) + if [ $? != 0 ]; then + echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2 + exit 1 + fi +else + JAVA=$JAVA_HOME/bin/java +fi + +# exclude tests jar +RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` +if [ $? == 0 ]; then + PULSAR_JAR=$RELEASE_JAR +fi + +# exclude tests jar +BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` +if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then + echo "\nCouldn't find pulsar jar."; + echo "Make sure you've run 'mvn package'\n"; + exit 1; +elif [ -e "$BUILT_JAR" ]; then + PULSAR_JAR=$BUILT_JAR +fi + +add_maven_deps_to_classpath() { + MVN="mvn" + if [ "$MAVEN_HOME" != "" ]; then + MVN=${MAVEN_HOME}/bin/mvn + fi + + # Need to generate classpath from maven pom. This is costly so generate it + # and cache it. Save the file into our target dir so a mvn clean will get + # clean it up and force us create a new one. + f="${PULSAR_HOME}/distribution/server/target/classpath.txt" + if [ ! -f "${f}" ] + then + ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null + fi + PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"` +} + +if [ -d "$PULSAR_HOME/lib" ]; then + PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*" +else + add_maven_deps_to_classpath +fi + +if [ -z "$PULSAR_CLIENT_CONF" ]; then + PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF +fi +if [ -z "$PULSAR_LOG_CONF" ]; then + PULSAR_LOG_CONF=$DEFAULT_LOG_CONF +fi + +PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH" +PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH" +OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`" +OPTS="$OPTS -Djava.net.preferIPv4Stack=true" + +OPTS="-cp $PULSAR_CLASSPATH $OPTS" + +OPTS="$OPTS $PULSAR_EXTRA_OPTS" + +# log directory & file +PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"} +PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"} +PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"} +PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"} + +#Configure log configuration system properties +OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER" +OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR" +OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL" +OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT" diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 3169e2224b..8f30f2fba9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -42,17 +42,13 @@ import io.netty.buffer.Unpooled; import java.io.File; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -69,19 +65,13 @@ import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; -import org.apache.pulsar.functions.instance.AuthenticationConfig; -import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; -import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; -import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.windowing.WindowUtils; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") public class CmdFunctions extends CmdBase { - private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; - private final LocalRunner localRunner; private final CreateFunction creater; private final DeleteFunction deleter; @@ -641,14 +631,22 @@ private void mergeArgs() { void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getParallelism(), - instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - userCodeFile, admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--functionConfig"); + localRunArgs.add(new Gson().toJson(functionConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + if(field.getName().startsWith("this$0")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } } @@ -1029,80 +1027,4 @@ private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functio } } - protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails, - int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, - String userCodeFile, PulsarAdmin admin) - throws Exception { - - String serviceUrl = admin.getServiceUrl(); - if (brokerServiceUrl != null) { - serviceUrl = brokerServiceUrl; - } - if (serviceUrl == null) { - serviceUrl = DEFAULT_SERVICE_URL; - } - - try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, - null, new DefaultSecretsProviderConfigurator())) { - List<RuntimeSpawner> spawners = new LinkedList<>(); - for (int i = 0; i < parallelism; ++i) { - InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionDetails); - // TODO: correctly implement function version and id - instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); - instanceConfig.setFunctionId(UUID.randomUUID().toString()); - instanceConfig.setInstanceId(i + instanceIdOffset); - instanceConfig.setMaxBufferedTuples(1024); - instanceConfig.setPort(Utils.findAvailablePort()); - RuntimeSpawner runtimeSpawner = new RuntimeSpawner( - instanceConfig, - userCodeFile, - null, - containerFactory, - 30000); - spawners.add(runtimeSpawner); - runtimeSpawner.start(); - } - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - log.info("Shutting down the localrun runtimeSpawner ..."); - for (RuntimeSpawner spawner : spawners) { - spawner.close(); - } - } - }); - Timer statusCheckTimer = new Timer(); - statusCheckTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()]; - int index = 0; - for (RuntimeSpawner spawner : spawners) { - futures[index] = spawner.getFunctionStatusAsJson(index); - index++; - } - try { - CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS); - for (index = 0; index < futures.length; ++index) { - String json = futures[index].get(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - log.info(gson.toJson(new JsonParser().parse(json))); - } - } catch (Exception ex) { - log.error("Could not get status from all local instances"); - } - } - }, 30000, 30000); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - statusCheckTimer.cancel(); - } - }); - for (RuntimeSpawner spawner : spawners) { - spawner.join(); - log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException()); - } - - } - } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index a3bfd88025..cd57b9a54e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.nio.file.Path; import java.nio.file.Paths; @@ -55,7 +56,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; @@ -165,18 +165,24 @@ private void mergeArgs() { } @Override - void runCmd() throws Exception { + public void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - - CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(), - 0, brokerServiceUrl, null, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - sinkConfig.getArchive(), admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--sinkConfig"); + localRunArgs.add(new Gson().toJson(sinkConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } @Override diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index baeb5e0b09..ff4625246a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -35,9 +35,11 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,7 +57,6 @@ import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.functions.utils.SourceConfigUtils; @@ -168,18 +169,25 @@ private void mergeArgs() { } @Override - void runCmd() throws Exception { + public void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(), - 0, brokerServiceUrl, null, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - sourceConfig.getArchive(), admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--sourceConfig"); + localRunArgs.add(new Gson().toJson(sourceConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } @Override diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index 6b59023d94..aea19fd0bb 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -117,7 +117,7 @@ public void setup() throws Exception { deleteSink = spy(cmdSinks.getDeleteSink()); mockStatic(CmdFunctions.class); - PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + PowerMockito.doNothing().when(localSinkRunner).runCmd(); URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); if (file == null) { throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index cb0c9e98ea..32a6c60186 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -101,7 +101,7 @@ public void setup() throws Exception { deleteSource = spy(CmdSources.getDeleteSource()); mockStatic(CmdFunctions.class); - PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + PowerMockito.doNothing().when(localSourceRunner).runCmd(); JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile(); WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile(); Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH))); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java new file mode 100644 index 0000000000..ed52a96a0d --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import java.io.File; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.*; + +import static org.apache.pulsar.functions.utils.Utils.*; + +@Slf4j +public class LocalRunner { + + @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true) + protected String functionConfigString; + @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true) + protected String sourceConfigString; + @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true) + protected String sinkConfigString; + @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true) + protected String stateStorageServiceUrl; + @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true) + protected String brokerServiceUrl; + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true) + protected String clientAuthPlugin; + @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true) + protected String clientAuthParams; + @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1) + protected boolean useTls; + @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1) + protected boolean tlsAllowInsecureConnection; + @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1) + protected boolean tlsHostNameVerificationEnabled; + @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true) + protected String tlsTrustCertFilePath; + @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true) + protected Integer instanceIdOffset = 0; + private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; + + public static void main(String[] args) throws Exception { + LocalRunner localRunner = new LocalRunner(); + JCommander jcommander = new JCommander(localRunner); + jcommander.setProgramName("LocalRunner"); + + // parse args by JCommander + jcommander.parse(args); + localRunner.start(); + } + + void start() throws Exception { + Function.FunctionDetails functionDetails; + String userCodeFile; + int parallelism; + if (!StringUtils.isEmpty(functionConfigString)) { + FunctionConfig functionConfig = new Gson().fromJson(functionConfigString, FunctionConfig.class); + ClassLoader classLoader = null; + parallelism = functionConfig.getParallelism(); + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + userCodeFile = functionConfig.getJar(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractClassLoader(userCodeFile); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("User jar does not exist"); + } + classLoader = loadJar(file); + } + } else { + userCodeFile = functionConfig.getPy(); + } + functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader); + } else if (!StringUtils.isEmpty(sourceConfigString)) { + SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class); + NarClassLoader classLoader; + parallelism = sourceConfig.getParallelism(); + userCodeFile = sourceConfig.getArchive(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractNarClassLoader(null, userCodeFile, null); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("Source archive does not exist"); + } + classLoader = extractNarClassLoader(null, null, file); + } + functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader); + } else { + SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class); + NarClassLoader classLoader; + parallelism = sinkConfig.getParallelism(); + userCodeFile = sinkConfig.getArchive(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractNarClassLoader(null, userCodeFile, null); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("Sink archive does not exist"); + } + classLoader = extractNarClassLoader(null, null, file); + } + functionDetails = SinkConfigUtils.convert(sinkConfig, classLoader); + } + startLocalRun(functionDetails, parallelism, + instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + userCodeFile); + } + + protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails, + int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, + String userCodeFile) + throws Exception { + + String serviceUrl = DEFAULT_SERVICE_URL; + if (brokerServiceUrl != null) { + serviceUrl = brokerServiceUrl; + } + + try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, + null, new DefaultSecretsProviderConfigurator())) { + List<RuntimeSpawner> spawners = new LinkedList<>(); + for (int i = 0; i < parallelism; ++i) { + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(functionDetails); + // TODO: correctly implement function version and id + instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); + instanceConfig.setFunctionId(UUID.randomUUID().toString()); + instanceConfig.setInstanceId(i + instanceIdOffset); + instanceConfig.setMaxBufferedTuples(1024); + instanceConfig.setPort(Utils.findAvailablePort()); + RuntimeSpawner runtimeSpawner = new RuntimeSpawner( + instanceConfig, + userCodeFile, + null, + containerFactory, + 30000); + spawners.add(runtimeSpawner); + runtimeSpawner.start(); + } + java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + log.info("Shutting down the localrun runtimeSpawner ..."); + for (RuntimeSpawner spawner : spawners) { + spawner.close(); + } + } + }); + Timer statusCheckTimer = new Timer(); + statusCheckTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()]; + int index = 0; + for (RuntimeSpawner spawner : spawners) { + futures[index] = spawner.getFunctionStatusAsJson(index); + index++; + } + try { + CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS); + for (index = 0; index < futures.length; ++index) { + String json = futures[index].get(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + log.info(gson.toJson(new JsonParser().parse(json))); + } + } catch (Exception ex) { + log.error("Could not get status from all local instances"); + } + } + }, 30000, 30000); + java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + statusCheckTimer.cancel(); + } + }); + for (RuntimeSpawner spawner : spawners) { + spawner.join(); + log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException()); + } + + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
