This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 40e6213 Remove runtime dependency from pulsar-admin (#2892) 40e6213 is described below commit 40e6213bf8939752fe2c20e7818d133cdae56cd4 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Oct 31 18:02:55 2018 -0700 Remove runtime dependency from pulsar-admin (#2892) * Move LocalRun inside Functions module. This eliminates function runtime dependency with pulsar-client * Removed unneeded stuff * Optimize changes * Fixed build * Fixed unittest --- bin/function-localrunner | 66 ++++++ bin/pulsar-admin | 124 +----------- bin/{pulsar-admin => pulsar-admin-common.sh} | 44 ---- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 116 ++--------- .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 28 ++- .../org/apache/pulsar/admin/cli/CmdSources.java | 28 ++- .../org/apache/pulsar/admin/cli/TestCmdSinks.java | 2 +- .../apache/pulsar/admin/cli/TestCmdSources.java | 2 +- .../pulsar/functions/runtime/LocalRunner.java | 224 +++++++++++++++++++++ 9 files changed, 347 insertions(+), 287 deletions(-) diff --git a/bin/function-localrunner b/bin/function-localrunner new file mode 100755 index 0000000..16362b4 --- /dev/null +++ b/bin/function-localrunner @@ -0,0 +1,66 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +BINDIR=$(dirname "$0") +export PULSAR_HOME=`cd $BINDIR/..;pwd` +. "$PULSAR_HOME/bin/pulsar-admin-common.sh" + +# functions related variables +FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions +DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar +JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"} +DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py +PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"} + +# find the java instance location +if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then + # didn't find a released jar, then search the built jar + BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" + if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then + JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR} + else + echo "\nCouldn't find pulsar java instance jar."; + echo "Make sure you've run 'mvn package'\n"; + exit 1; + fi +fi + +# find the python instance location +if [ ! -f "${PY_INSTANCE_FILE}" ]; then + # didn't find a released python instance, then search the built python instance + BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" + if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then + PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} + else + echo "\nCouldn't find pulsar python instance."; + echo "Make sure you've run 'mvn package'\n"; + exit 1; + fi +fi + +# functions +OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}" +OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" + +MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner" + +#Change to PULSAR_HOME to support relative paths +cd "$PULSAR_HOME" +exec $JAVA $OPTS $MAINCLASS "$@" diff --git a/bin/pulsar-admin b/bin/pulsar-admin index 39d1239..97206e6 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -20,129 +20,7 @@ BINDIR=$(dirname "$0") export PULSAR_HOME=`cd $BINDIR/..;pwd` - -DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf -DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml - -# functions related variables -FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions -DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar -JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"} -DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py -PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"} - -if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ] -then - . "$PULSAR_HOME/conf/pulsar_tools_env.sh" -fi - -# Check for the java to use -if [[ -z $JAVA_HOME ]]; then - JAVA=$(which java) - if [ $? != 0 ]; then - echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2 - exit 1 - fi -else - JAVA=$JAVA_HOME/bin/java -fi - -# exclude tests jar -RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? == 0 ]; then - PULSAR_JAR=$RELEASE_JAR -fi - -# exclude tests jar -BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then - echo "\nCouldn't find pulsar jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; -elif [ -e "$BUILT_JAR" ]; then - PULSAR_JAR=$BUILT_JAR -fi - -add_maven_deps_to_classpath() { - MVN="mvn" - if [ "$MAVEN_HOME" != "" ]; then - MVN=${MAVEN_HOME}/bin/mvn - fi - - # Need to generate classpath from maven pom. This is costly so generate it - # and cache it. Save the file into our target dir so a mvn clean will get - # clean it up and force us create a new one. - f="${PULSAR_HOME}/distribution/server/target/classpath.txt" - if [ ! -f "${f}" ] - then - ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null - fi - PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"` -} - -if [ -d "$PULSAR_HOME/lib" ]; then - PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*" -else - add_maven_deps_to_classpath -fi - -if [ -z "$PULSAR_CLIENT_CONF" ]; then - PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF -fi -if [ -z "$PULSAR_LOG_CONF" ]; then - PULSAR_LOG_CONF=$DEFAULT_LOG_CONF -fi - -PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH" -PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH" -OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`" -OPTS="$OPTS -Djava.net.preferIPv4Stack=true" - -OPTS="-cp $PULSAR_CLASSPATH $OPTS" - -OPTS="$OPTS $PULSAR_EXTRA_OPTS" - -# log directory & file -PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"} -PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"} -PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"} -PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"} - -#Configure log configuration system properties -OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER" -OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR" -OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL" -OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT" - -# find the java instance location -if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then - # didn't find a released jar, then search the built jar - BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" - if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then - JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR} - else - echo "\nCouldn't find pulsar java instance jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# find the python instance location -if [ ! -f "${PY_INSTANCE_FILE}" ]; then - # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" - if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then - PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} - else - echo "\nCouldn't find pulsar python instance."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# functions -OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}" -OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" +. "$PULSAR_HOME/bin/pulsar-admin-common.sh" #Change to PULSAR_HOME to support relative paths cd "$PULSAR_HOME" diff --git a/bin/pulsar-admin b/bin/pulsar-admin-common.sh similarity index 66% copy from bin/pulsar-admin copy to bin/pulsar-admin-common.sh index 39d1239..0ccfc70 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin-common.sh @@ -18,19 +18,9 @@ # under the License. # -BINDIR=$(dirname "$0") -export PULSAR_HOME=`cd $BINDIR/..;pwd` - DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml -# functions related variables -FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions -DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar -JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"} -DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py -PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"} - if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ] then . "$PULSAR_HOME/conf/pulsar_tools_env.sh" @@ -113,37 +103,3 @@ OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER" OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR" OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL" OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT" - -# find the java instance location -if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then - # didn't find a released jar, then search the built jar - BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar" - if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then - JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR} - else - echo "\nCouldn't find pulsar java instance jar."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# find the python instance location -if [ ! -f "${PY_INSTANCE_FILE}" ]; then - # didn't find a released python instance, then search the built python instance - BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py" - if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then - PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE} - else - echo "\nCouldn't find pulsar python instance."; - echo "Make sure you've run 'mvn package'\n"; - exit 1; - fi -fi - -# functions -OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}" -OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" - -#Change to PULSAR_HOME to support relative paths -cd "$PULSAR_HOME" -exec $JAVA $OPTS org.apache.pulsar.admin.cli.PulsarAdminTool $PULSAR_CLIENT_CONF "$@" diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 3169e22..8f30f2f 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -42,17 +42,13 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.io.File; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -69,19 +65,13 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.functions.WindowConfig; -import org.apache.pulsar.functions.instance.AuthenticationConfig; -import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; -import org.apache.pulsar.functions.runtime.RuntimeSpawner; -import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; -import org.apache.pulsar.functions.utils.*; +import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.windowing.WindowUtils; @Slf4j @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)") public class CmdFunctions extends CmdBase { - private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; - private final LocalRunner localRunner; private final CreateFunction creater; private final DeleteFunction deleter; @@ -641,14 +631,22 @@ public class CmdFunctions extends CmdBase { void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, classLoader), functionConfig.getParallelism(), - instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - userCodeFile, admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--functionConfig"); + localRunArgs.add(new Gson().toJson(functionConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + if(field.getName().startsWith("this$0")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } } @@ -1029,80 +1027,4 @@ public class CmdFunctions extends CmdBase { } } - protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails, - int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, - String userCodeFile, PulsarAdmin admin) - throws Exception { - - String serviceUrl = admin.getServiceUrl(); - if (brokerServiceUrl != null) { - serviceUrl = brokerServiceUrl; - } - if (serviceUrl == null) { - serviceUrl = DEFAULT_SERVICE_URL; - } - - try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, - null, new DefaultSecretsProviderConfigurator())) { - List<RuntimeSpawner> spawners = new LinkedList<>(); - for (int i = 0; i < parallelism; ++i) { - InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionDetails); - // TODO: correctly implement function version and id - instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); - instanceConfig.setFunctionId(UUID.randomUUID().toString()); - instanceConfig.setInstanceId(i + instanceIdOffset); - instanceConfig.setMaxBufferedTuples(1024); - instanceConfig.setPort(Utils.findAvailablePort()); - RuntimeSpawner runtimeSpawner = new RuntimeSpawner( - instanceConfig, - userCodeFile, - null, - containerFactory, - 30000); - spawners.add(runtimeSpawner); - runtimeSpawner.start(); - } - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - log.info("Shutting down the localrun runtimeSpawner ..."); - for (RuntimeSpawner spawner : spawners) { - spawner.close(); - } - } - }); - Timer statusCheckTimer = new Timer(); - statusCheckTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()]; - int index = 0; - for (RuntimeSpawner spawner : spawners) { - futures[index] = spawner.getFunctionStatusAsJson(index); - index++; - } - try { - CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS); - for (index = 0; index < futures.length; ++index) { - String json = futures[index].get(); - Gson gson = new GsonBuilder().setPrettyPrinting().create(); - log.info(gson.toJson(new JsonParser().parse(json))); - } - } catch (Exception ex) { - log.error("Could not get status from all local instances"); - } - } - }, 30000, 30000); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - statusCheckTimer.cancel(); - } - }); - for (RuntimeSpawner spawner : spawners) { - spawner.join(); - log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException()); - } - - } - } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index a3bfd88..cd57b9a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -35,6 +35,7 @@ import com.google.gson.reflect.TypeToken; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.nio.file.Path; import java.nio.file.Paths; @@ -55,7 +56,6 @@ import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.utils.*; import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; @@ -165,18 +165,24 @@ public class CmdSinks extends CmdBase { } @Override - void runCmd() throws Exception { + public void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - - CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(), - 0, brokerServiceUrl, null, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - sinkConfig.getArchive(), admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--sinkConfig"); + localRunArgs.add(new Gson().toJson(sinkConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } @Override diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index baeb5e0..ff46252 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -35,9 +35,11 @@ import com.google.gson.reflect.TypeToken; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,7 +57,6 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.functions.utils.SourceConfigUtils; @@ -168,18 +169,25 @@ public class CmdSources extends CmdBase { } @Override - void runCmd() throws Exception { + public void runCmd() throws Exception { // merge deprecated args with new args mergeArgs(); - CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(), - 0, brokerServiceUrl, null, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - sourceConfig.getArchive(), admin); + List<String> localRunArgs = new LinkedList<>(); + localRunArgs.add(System.getenv("PULSAR_HOME") + "/bin/function-localrunner"); + localRunArgs.add("--sourceConfig"); + localRunArgs.add(new Gson().toJson(sourceConfig)); + for (Field field : this.getClass().getDeclaredFields()) { + if (field.getName().startsWith("DEPRECATED")) continue; + Object value = field.get(this); + if (value != null) { + localRunArgs.add("--" + field.getName()); + localRunArgs.add(value.toString()); + } + } + ProcessBuilder processBuilder = new ProcessBuilder(localRunArgs).inheritIO(); + Process process = processBuilder.start(); + process.waitFor(); } @Override diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java index 6b59023..aea19fd 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java @@ -117,7 +117,7 @@ public class TestCmdSinks { deleteSink = spy(cmdSinks.getDeleteSink()); mockStatic(CmdFunctions.class); - PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + PowerMockito.doNothing().when(localSinkRunner).runCmd(); URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); if (file == null) { throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java index cb0c9e9..32a6c60 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java @@ -101,7 +101,7 @@ public class TestCmdSources { deleteSource = spy(CmdSources.getDeleteSource()); mockStatic(CmdFunctions.class); - PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + PowerMockito.doNothing().when(localSourceRunner).runCmd(); JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile(); WRONG_JAR_PATH = Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile(); Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH))); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java new file mode 100644 index 0000000..ed52a96 --- /dev/null +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.runtime; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParser; + +import java.io.File; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; +import org.apache.pulsar.functions.utils.*; + +import static org.apache.pulsar.functions.utils.Utils.*; + +@Slf4j +public class LocalRunner { + + @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true) + protected String functionConfigString; + @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true) + protected String sourceConfigString; + @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true) + protected String sinkConfigString; + @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true) + protected String stateStorageServiceUrl; + @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true) + protected String brokerServiceUrl; + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true) + protected String clientAuthPlugin; + @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true) + protected String clientAuthParams; + @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1) + protected boolean useTls; + @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1) + protected boolean tlsAllowInsecureConnection; + @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1) + protected boolean tlsHostNameVerificationEnabled; + @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true) + protected String tlsTrustCertFilePath; + @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true) + protected Integer instanceIdOffset = 0; + private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650"; + + public static void main(String[] args) throws Exception { + LocalRunner localRunner = new LocalRunner(); + JCommander jcommander = new JCommander(localRunner); + jcommander.setProgramName("LocalRunner"); + + // parse args by JCommander + jcommander.parse(args); + localRunner.start(); + } + + void start() throws Exception { + Function.FunctionDetails functionDetails; + String userCodeFile; + int parallelism; + if (!StringUtils.isEmpty(functionConfigString)) { + FunctionConfig functionConfig = new Gson().fromJson(functionConfigString, FunctionConfig.class); + ClassLoader classLoader = null; + parallelism = functionConfig.getParallelism(); + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + userCodeFile = functionConfig.getJar(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractClassLoader(userCodeFile); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("User jar does not exist"); + } + classLoader = loadJar(file); + } + } else { + userCodeFile = functionConfig.getPy(); + } + functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader); + } else if (!StringUtils.isEmpty(sourceConfigString)) { + SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class); + NarClassLoader classLoader; + parallelism = sourceConfig.getParallelism(); + userCodeFile = sourceConfig.getArchive(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractNarClassLoader(null, userCodeFile, null); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("Source archive does not exist"); + } + classLoader = extractNarClassLoader(null, null, file); + } + functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader); + } else { + SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class); + NarClassLoader classLoader; + parallelism = sinkConfig.getParallelism(); + userCodeFile = sinkConfig.getArchive(); + if (isFunctionPackageUrlSupported(userCodeFile)) { + classLoader = extractNarClassLoader(null, userCodeFile, null); + } else { + File file = new File(userCodeFile); + if (!file.exists()) { + throw new RuntimeException("Sink archive does not exist"); + } + classLoader = extractNarClassLoader(null, null, file); + } + functionDetails = SinkConfigUtils.convert(sinkConfig, classLoader); + } + startLocalRun(functionDetails, parallelism, + instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + userCodeFile); + } + + protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails, + int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, + String userCodeFile) + throws Exception { + + String serviceUrl = DEFAULT_SERVICE_URL; + if (brokerServiceUrl != null) { + serviceUrl = brokerServiceUrl; + } + + try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, + null, new DefaultSecretsProviderConfigurator())) { + List<RuntimeSpawner> spawners = new LinkedList<>(); + for (int i = 0; i < parallelism; ++i) { + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(functionDetails); + // TODO: correctly implement function version and id + instanceConfig.setFunctionVersion(UUID.randomUUID().toString()); + instanceConfig.setFunctionId(UUID.randomUUID().toString()); + instanceConfig.setInstanceId(i + instanceIdOffset); + instanceConfig.setMaxBufferedTuples(1024); + instanceConfig.setPort(Utils.findAvailablePort()); + RuntimeSpawner runtimeSpawner = new RuntimeSpawner( + instanceConfig, + userCodeFile, + null, + containerFactory, + 30000); + spawners.add(runtimeSpawner); + runtimeSpawner.start(); + } + java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + log.info("Shutting down the localrun runtimeSpawner ..."); + for (RuntimeSpawner spawner : spawners) { + spawner.close(); + } + } + }); + Timer statusCheckTimer = new Timer(); + statusCheckTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()]; + int index = 0; + for (RuntimeSpawner spawner : spawners) { + futures[index] = spawner.getFunctionStatusAsJson(index); + index++; + } + try { + CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS); + for (index = 0; index < futures.length; ++index) { + String json = futures[index].get(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + log.info(gson.toJson(new JsonParser().parse(json))); + } + } catch (Exception ex) { + log.error("Could not get status from all local instances"); + } + } + }, 30000, 30000); + java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + statusCheckTimer.cancel(); + } + }); + for (RuntimeSpawner spawner : spawners) { + spawner.join(); + log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException()); + } + + } + } +}