This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e6213  Remove runtime dependency from pulsar-admin (#2892)
40e6213 is described below

commit 40e6213bf8939752fe2c20e7818d133cdae56cd4
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Wed Oct 31 18:02:55 2018 -0700

    Remove runtime dependency from pulsar-admin (#2892)
    
    * Move LocalRun inside Functions module. This eliminates function runtime 
dependency with pulsar-client
    
    * Removed unneeded stuff
    
    * Optimize changes
    
    * Fixed build
    
    * Fixed unittest
---
 bin/function-localrunner                           |  66 ++++++
 bin/pulsar-admin                                   | 124 +-----------
 bin/{pulsar-admin => pulsar-admin-common.sh}       |  44 ----
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 116 ++---------
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  28 ++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  28 ++-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |   2 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   2 +-
 .../pulsar/functions/runtime/LocalRunner.java      | 224 +++++++++++++++++++++
 9 files changed, 347 insertions(+), 287 deletions(-)

diff --git a/bin/function-localrunner b/bin/function-localrunner
new file mode 100755
index 0000000..16362b4
--- /dev/null
+++ b/bin/function-localrunner
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+BINDIR=$(dirname "$0")
+export PULSAR_HOME=`cd $BINDIR/..;pwd`
+. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
+
+# functions related variables
+FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
+DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
+JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
+DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
+PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
+
+# find the java instance location
+if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
+    # didn't find a released jar, then search the built jar
+    
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
+    if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
+        JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
+    else
+        echo "\nCouldn't find pulsar java instance jar.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+fi
+
+# find the python instance location
+if [ ! -f "${PY_INSTANCE_FILE}" ]; then
+    # didn't find a released python instance, then search the built python 
instance
+    
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
+    if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
+        PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
+    else
+        echo "\nCouldn't find pulsar python instance.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+fi
+
+# functions
+OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
+OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
+
+MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner"
+
+#Change to PULSAR_HOME to support relative paths
+cd "$PULSAR_HOME"
+exec $JAVA $OPTS $MAINCLASS "$@"
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index 39d1239..97206e6 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -20,129 +20,7 @@
 
 BINDIR=$(dirname "$0")
 export PULSAR_HOME=`cd $BINDIR/..;pwd`
-
-DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
-DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
-
-# functions related variables
-FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
-DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
-JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
-DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
-PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
-
-if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
-then
-    . "$PULSAR_HOME/conf/pulsar_tools_env.sh"
-fi
-
-# Check for the java to use
-if [[ -z $JAVA_HOME ]]; then
-    JAVA=$(which java)
-    if [ $? != 0 ]; then
-        echo "Error: JAVA_HOME not set, and no java executable found in 
$PATH." 1>&2
-        exit 1
-    fi
-else
-    JAVA=$JAVA_HOME/bin/java
-fi
-
-# exclude tests jar
-RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail 
-1`
-if [ $? == 0 ]; then
-    PULSAR_JAR=$RELEASE_JAR
-fi
-
-# exclude tests jar
-BUILT_JAR=`ls $PULSAR_HOME/pulsar-client-tools/target/pulsar-*.jar 2> 
/dev/null | grep -v tests | tail -1`
-if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
-    echo "\nCouldn't find pulsar jar.";
-    echo "Make sure you've run 'mvn package'\n";
-    exit 1;
-elif [ -e "$BUILT_JAR" ]; then
-    PULSAR_JAR=$BUILT_JAR
-fi
-
-add_maven_deps_to_classpath() {
-    MVN="mvn"
-    if [ "$MAVEN_HOME" != "" ]; then
-       MVN=${MAVEN_HOME}/bin/mvn
-    fi
-
-    # Need to generate classpath from maven pom. This is costly so generate it
-    # and cache it. Save the file into our target dir so a mvn clean will get
-    # clean it up and force us create a new one.
-    f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
-    if [ ! -f "${f}" ]
-    then
-       ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath 
-DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
-    fi
-    PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
-}
-
-if [ -d "$PULSAR_HOME/lib" ]; then
-    PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
-else
-    add_maven_deps_to_classpath
-fi
-
-if [ -z "$PULSAR_CLIENT_CONF" ]; then
-    PULSAR_CLIENT_CONF=$DEFAULT_CLIENT_CONF
-fi
-if [ -z "$PULSAR_LOG_CONF" ]; then
-    PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
-fi
-
-PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
-PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
-OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
-OPTS="$OPTS -Djava.net.preferIPv4Stack=true"
-
-OPTS="-cp $PULSAR_CLASSPATH $OPTS"
-
-OPTS="$OPTS $PULSAR_EXTRA_OPTS"
-
-# log directory & file
-PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
-PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
-PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
-PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
-
-#Configure log configuration system properties
-OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
-OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
-OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
-OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
-
-# find the java instance location
-if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
-    # didn't find a released jar, then search the built jar
-    
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
-    if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
-        JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
-    else
-        echo "\nCouldn't find pulsar java instance jar.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# find the python instance location
-if [ ! -f "${PY_INSTANCE_FILE}" ]; then
-    # didn't find a released python instance, then search the built python 
instance
-    
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
-    if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
-        PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
-    else
-        echo "\nCouldn't find pulsar python instance.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# functions
-OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
-OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
+. "$PULSAR_HOME/bin/pulsar-admin-common.sh"
 
 #Change to PULSAR_HOME to support relative paths
 cd "$PULSAR_HOME"
diff --git a/bin/pulsar-admin b/bin/pulsar-admin-common.sh
similarity index 66%
copy from bin/pulsar-admin
copy to bin/pulsar-admin-common.sh
index 39d1239..0ccfc70 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin-common.sh
@@ -18,19 +18,9 @@
 # under the License.
 #
 
-BINDIR=$(dirname "$0")
-export PULSAR_HOME=`cd $BINDIR/..;pwd`
-
 DEFAULT_CLIENT_CONF=$PULSAR_HOME/conf/client.conf
 DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
 
-# functions related variables
-FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
-DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
-JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
-DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
-PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
-
 if [ -f "$PULSAR_HOME/conf/pulsar_tools_env.sh" ]
 then
     . "$PULSAR_HOME/conf/pulsar_tools_env.sh"
@@ -113,37 +103,3 @@ OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
 OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
 OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
 OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
-
-# find the java instance location
-if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
-    # didn't find a released jar, then search the built jar
-    
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
-    if [ -f "${BUILT_JAVA_INSTANCE_JAR}" ]; then
-        JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
-    else
-        echo "\nCouldn't find pulsar java instance jar.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# find the python instance location
-if [ ! -f "${PY_INSTANCE_FILE}" ]; then
-    # didn't find a released python instance, then search the built python 
instance
-    
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
-    if [ -f "${BUILT_PY_INSTANCE_FILE}" ]; then
-        PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
-    else
-        echo "\nCouldn't find pulsar python instance.";
-        echo "Make sure you've run 'mvn package'\n";
-        exit 1;
-    fi
-fi
-
-# functions
-OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
-OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
-
-#Change to PULSAR_HOME to support relative paths
-cd "$PULSAR_HOME"
-exec $JAVA $OPTS org.apache.pulsar.admin.cli.PulsarAdminTool 
$PULSAR_CLIENT_CONF "$@"
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3169e22..8f30f2f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -42,17 +42,13 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -69,19 +65,13 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.*;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
 public class CmdFunctions extends CmdBase {
-    private static final String DEFAULT_SERVICE_URL = 
"pulsar://localhost:6650";
-
     private final LocalRunner localRunner;
     private final CreateFunction creater;
     private final DeleteFunction deleter;
@@ -641,14 +631,22 @@ public class CmdFunctions extends CmdBase {
         void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
-            
CmdFunctions.startLocalRun(FunctionConfigUtils.convert(functionConfig, 
classLoader), functionConfig.getParallelism(),
-                    instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
-                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    userCodeFile, admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + 
"/bin/function-localrunner");
+            localRunArgs.add("--functionConfig");
+            localRunArgs.add(new Gson().toJson(functionConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                if(field.getName().startsWith("this$0")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new 
ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
     }
 
@@ -1029,80 +1027,4 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
-    protected static void 
startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails 
functionDetails,
-            int parallelism, int instanceIdOffset, String brokerServiceUrl, 
String stateStorageServiceUrl, AuthenticationConfig authConfig,
-            String userCodeFile, PulsarAdmin admin)
-            throws Exception {
-
-        String serviceUrl = admin.getServiceUrl();
-        if (brokerServiceUrl != null) {
-            serviceUrl = brokerServiceUrl;
-        }
-        if (serviceUrl == null) {
-            serviceUrl = DEFAULT_SERVICE_URL;
-        }
-
-        try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, 
null,
-                null, new DefaultSecretsProviderConfigurator())) {
-            List<RuntimeSpawner> spawners = new LinkedList<>();
-            for (int i = 0; i < parallelism; ++i) {
-                InstanceConfig instanceConfig = new InstanceConfig();
-                instanceConfig.setFunctionDetails(functionDetails);
-                // TODO: correctly implement function version and id
-                
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
-                instanceConfig.setFunctionId(UUID.randomUUID().toString());
-                instanceConfig.setInstanceId(i + instanceIdOffset);
-                instanceConfig.setMaxBufferedTuples(1024);
-                instanceConfig.setPort(Utils.findAvailablePort());
-                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
-                        instanceConfig,
-                        userCodeFile,
-                        null,
-                        containerFactory,
-                        30000);
-                spawners.add(runtimeSpawner);
-                runtimeSpawner.start();
-            }
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                public void run() {
-                    log.info("Shutting down the localrun runtimeSpawner ...");
-                    for (RuntimeSpawner spawner : spawners) {
-                        spawner.close();
-                    }
-                }
-            });
-            Timer statusCheckTimer = new Timer();
-            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                    @Override
-                    public void run() {
-                        CompletableFuture<String>[] futures = new 
CompletableFuture[spawners.size()];
-                        int index = 0;
-                        for (RuntimeSpawner spawner : spawners) {
-                            futures[index] = 
spawner.getFunctionStatusAsJson(index);
-                            index++;
-                        }
-                        try {
-                            CompletableFuture.allOf(futures).get(5, 
TimeUnit.SECONDS);
-                            for (index = 0; index < futures.length; ++index) {
-                                String json = futures[index].get();
-                                Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
-                                log.info(gson.toJson(new 
JsonParser().parse(json)));
-                            }
-                        } catch (Exception ex) {
-                            log.error("Could not get status from all local 
instances");
-                        }
-                    }
-                }, 30000, 30000);
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                    public void run() {
-                        statusCheckTimer.cancel();
-                    }
-                });
-            for (RuntimeSpawner spawner : spawners) {
-                spawner.join();
-                log.info("RuntimeSpawner quit because of", 
spawner.getRuntime().getDeathException());
-            }
-
-        }
-    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index a3bfd88..cd57b9a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -35,6 +35,7 @@ import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -55,7 +56,6 @@ import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
@@ -165,18 +165,24 @@ public class CmdSinks extends CmdBase {
         }
 
         @Override
-        void runCmd() throws Exception {
+        public void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
-
-            CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), 
sinkConfig.getParallelism(),
-                    0, brokerServiceUrl, null,
-                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    sinkConfig.getArchive(), admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + 
"/bin/function-localrunner");
+            localRunArgs.add("--sinkConfig");
+            localRunArgs.add(new Gson().toJson(sinkConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new 
ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
 
         @Override
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index baeb5e0..ff46252 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -35,9 +35,11 @@ import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,7 +57,6 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
@@ -168,18 +169,25 @@ public class CmdSources extends CmdBase {
         }
 
         @Override
-        void runCmd() throws Exception {
+        public void runCmd() throws Exception {
             // merge deprecated args with new args
             mergeArgs();
 
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), 
sourceConfig.getParallelism(),
-                    0, brokerServiceUrl, null,
-                    
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                            
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                            
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                            
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                            
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                    sourceConfig.getArchive(), admin);
+            List<String> localRunArgs = new LinkedList<>();
+            localRunArgs.add(System.getenv("PULSAR_HOME") + 
"/bin/function-localrunner");
+            localRunArgs.add("--sourceConfig");
+            localRunArgs.add(new Gson().toJson(sourceConfig));
+            for (Field field : this.getClass().getDeclaredFields()) {
+                if (field.getName().startsWith("DEPRECATED")) continue;
+                Object value = field.get(this);
+                if (value != null) {
+                    localRunArgs.add("--" + field.getName());
+                    localRunArgs.add(value.toString());
+                }
+            }
+            ProcessBuilder processBuilder = new 
ProcessBuilder(localRunArgs).inheritIO();
+            Process process = processBuilder.start();
+            process.waitFor();
         }
 
         @Override
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 6b59023..aea19fd 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -117,7 +117,7 @@ public class TestCmdSinks {
         deleteSink = spy(cmdSinks.getDeleteSink());
 
         mockStatic(CmdFunctions.class);
-        PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", 
Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        PowerMockito.doNothing().when(localSinkRunner).runCmd();
         URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
             throw new RuntimeException("Failed to file required test archive: 
" + JAR_FILE_NAME);
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index cb0c9e9..32a6c60 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -101,7 +101,7 @@ public class TestCmdSources {
         deleteSource = spy(CmdSources.getDeleteSource());
 
         mockStatic(CmdFunctions.class);
-        PowerMockito.doNothing().when(CmdFunctions.class, "startLocalRun", 
Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
         WRONG_JAR_PATH = 
Thread.currentThread().getContextClassLoader().getResource(WRONG_JAR_FILE_NAME).getFile();
         Thread.currentThread().setContextClassLoader(Utils.loadJar(new 
File(JAR_FILE_PATH)));
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
new file mode 100644
index 0000000..ed52a96
--- /dev/null
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.*;
+
+import static org.apache.pulsar.functions.utils.Utils.*;
+
+@Slf4j
+public class LocalRunner {
+
+    @Parameter(names = "--functionConfig", description = "The json 
representation of FunctionConfig", hidden = true)
+    protected String functionConfigString;
+    @Parameter(names = "--sourceConfig", description = "The json 
representation of SourceConfig", hidden = true)
+    protected String sourceConfigString;
+    @Parameter(names = "--sinkConfig", description = "The json representation 
of SinkConfig", hidden = true)
+    protected String sinkConfigString;
+    @Parameter(names = "--stateStorageServiceUrl", description = "The URL for 
the state storage service (by default Apache BookKeeper)", hidden = true)
+    protected String stateStorageServiceUrl;
+    @Parameter(names = "--brokerServiceUrl", description = "The URL for the 
Pulsar broker", hidden = true)
+    protected String brokerServiceUrl;
+    @Parameter(names = "--clientAuthPlugin", description = "Client 
authentication plugin using which function-process can connect to broker", 
hidden = true)
+    protected String clientAuthPlugin;
+    @Parameter(names = "--clientAuthParams", description = "Client 
authentication param", hidden = true)
+    protected String clientAuthParams;
+    @Parameter(names = "--useTls", description = "Use tls connection\n", 
hidden = true, arity = 1)
+    protected boolean useTls;
+    @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow 
insecure tls connection\n", hidden = true, arity = 1)
+    protected boolean tlsAllowInsecureConnection;
+    @Parameter(names = "--tlsHostNameVerificationEnabled", description = 
"Enable hostname verification", hidden = true, arity = 1)
+    protected boolean tlsHostNameVerificationEnabled;
+    @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert 
file path", hidden = true)
+    protected String tlsTrustCertFilePath;
+    @Parameter(names = "--instanceIdOffset", description = "Start the 
instanceIds from this offset", hidden = true)
+    protected Integer instanceIdOffset = 0;
+    private static final String DEFAULT_SERVICE_URL = 
"pulsar://localhost:6650";
+
+    public static void main(String[] args) throws Exception {
+        LocalRunner localRunner = new LocalRunner();
+        JCommander jcommander = new JCommander(localRunner);
+        jcommander.setProgramName("LocalRunner");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        localRunner.start();
+    }
+
+    void start() throws Exception {
+        Function.FunctionDetails functionDetails;
+        String userCodeFile;
+        int parallelism;
+        if (!StringUtils.isEmpty(functionConfigString)) {
+            FunctionConfig functionConfig = new 
Gson().fromJson(functionConfigString, FunctionConfig.class);
+            ClassLoader classLoader = null;
+            parallelism = functionConfig.getParallelism();
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                userCodeFile = functionConfig.getJar();
+                if (isFunctionPackageUrlSupported(userCodeFile)) {
+                    classLoader = extractClassLoader(userCodeFile);
+                } else {
+                    File file = new File(userCodeFile);
+                    if (!file.exists()) {
+                        throw new RuntimeException("User jar does not exist");
+                    }
+                    classLoader = loadJar(file);
+                }
+            } else {
+                userCodeFile = functionConfig.getPy();
+            }
+            functionDetails = FunctionConfigUtils.convert(functionConfig, 
classLoader);
+        } else if (!StringUtils.isEmpty(sourceConfigString)) {
+            SourceConfig sourceConfig = new 
Gson().fromJson(sourceConfigString, SourceConfig.class);
+            NarClassLoader classLoader;
+            parallelism = sourceConfig.getParallelism();
+            userCodeFile = sourceConfig.getArchive();
+            if (isFunctionPackageUrlSupported(userCodeFile)) {
+                classLoader = extractNarClassLoader(null, userCodeFile, null);
+            } else {
+                File file = new File(userCodeFile);
+                if (!file.exists()) {
+                    throw new RuntimeException("Source archive does not 
exist");
+                }
+                classLoader = extractNarClassLoader(null, null, file);
+            }
+            functionDetails = SourceConfigUtils.convert(sourceConfig, 
classLoader);
+        } else {
+            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, 
SinkConfig.class);
+            NarClassLoader classLoader;
+            parallelism = sinkConfig.getParallelism();
+            userCodeFile = sinkConfig.getArchive();
+            if (isFunctionPackageUrlSupported(userCodeFile)) {
+                classLoader = extractNarClassLoader(null, userCodeFile, null);
+            } else {
+                File file = new File(userCodeFile);
+                if (!file.exists()) {
+                    throw new RuntimeException("Sink archive does not exist");
+                }
+                classLoader = extractNarClassLoader(null, null, file);
+            }
+            functionDetails = SinkConfigUtils.convert(sinkConfig, classLoader);
+        }
+        startLocalRun(functionDetails, parallelism,
+                instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
+                
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+                        
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                        .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                        
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                userCodeFile);
+    }
+
+    protected static void 
startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails 
functionDetails,
+                                        int parallelism, int instanceIdOffset, 
String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig 
authConfig,
+                                        String userCodeFile)
+            throws Exception {
+
+        String serviceUrl = DEFAULT_SERVICE_URL;
+        if (brokerServiceUrl != null) {
+            serviceUrl = brokerServiceUrl;
+        }
+
+        try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, 
null,
+                null, new DefaultSecretsProviderConfigurator())) {
+            List<RuntimeSpawner> spawners = new LinkedList<>();
+            for (int i = 0; i < parallelism; ++i) {
+                InstanceConfig instanceConfig = new InstanceConfig();
+                instanceConfig.setFunctionDetails(functionDetails);
+                // TODO: correctly implement function version and id
+                
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+                instanceConfig.setFunctionId(UUID.randomUUID().toString());
+                instanceConfig.setInstanceId(i + instanceIdOffset);
+                instanceConfig.setMaxBufferedTuples(1024);
+                instanceConfig.setPort(Utils.findAvailablePort());
+                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+                        instanceConfig,
+                        userCodeFile,
+                        null,
+                        containerFactory,
+                        30000);
+                spawners.add(runtimeSpawner);
+                runtimeSpawner.start();
+            }
+            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    log.info("Shutting down the localrun runtimeSpawner ...");
+                    for (RuntimeSpawner spawner : spawners) {
+                        spawner.close();
+                    }
+                }
+            });
+            Timer statusCheckTimer = new Timer();
+            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    CompletableFuture<String>[] futures = new 
CompletableFuture[spawners.size()];
+                    int index = 0;
+                    for (RuntimeSpawner spawner : spawners) {
+                        futures[index] = 
spawner.getFunctionStatusAsJson(index);
+                        index++;
+                    }
+                    try {
+                        CompletableFuture.allOf(futures).get(5, 
TimeUnit.SECONDS);
+                        for (index = 0; index < futures.length; ++index) {
+                            String json = futures[index].get();
+                            Gson gson = new 
GsonBuilder().setPrettyPrinting().create();
+                            log.info(gson.toJson(new 
JsonParser().parse(json)));
+                        }
+                    } catch (Exception ex) {
+                        log.error("Could not get status from all local 
instances");
+                    }
+                }
+            }, 30000, 30000);
+            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    statusCheckTimer.cancel();
+                }
+            });
+            for (RuntimeSpawner spawner : spawners) {
+                spawner.join();
+                log.info("RuntimeSpawner quit because of", 
spawner.getRuntime().getDeathException());
+            }
+
+        }
+    }
+}

Reply via email to