This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da83322  Added ability a download dependencies in Kubernetes (#2729)
da83322 is described below

commit da833223404e6ea4ce2a13f65e9f74bb0db783a1
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Oct 5 10:14:23 2018 -0700

    Added ability a download dependencies in Kubernetes (#2729)
    
    * Add a config to allow user code dependencies to be installed at runtime.
    Enable it only for Kubernetes runtime
    
    * Fix bug
    
    * Fixed indentation issue
    
    * Specify the right cmd line
    
    * Install dep only in the temp dir
    
    * Queue is not in python3. Thus add a try catch block
    
    * Fixed unittest
---
 pulsar-functions/instance/src/main/python/python_instance.py   |  7 +++++--
 .../instance/src/main/python/python_instance_main.py           |  8 ++++++--
 .../org/apache/pulsar/functions/runtime/JavaInstanceMain.java  |  3 +++
 .../org/apache/pulsar/functions/runtime/KubernetesRuntime.java |  3 ++-
 .../pulsar/functions/runtime/KubernetesRuntimeFactory.java     |  4 ++++
 .../org/apache/pulsar/functions/runtime/ProcessRuntime.java    |  2 +-
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java |  7 ++++++-
 .../apache/pulsar/functions/runtime/KubernetesRuntimeTest.java | 10 +++++-----
 .../apache/pulsar/functions/worker/FunctionRuntimeManager.java |  1 +
 .../java/org/apache/pulsar/functions/worker/WorkerConfig.java  |  1 +
 10 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index f8a14ff..2aae5f6 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -26,7 +26,10 @@ import base64
 import os
 import signal
 import time
-import Queue
+try:
+  import Queue as queue
+except:
+  import queue
 import threading
 from functools import partial
 from collections import namedtuple
@@ -117,7 +120,7 @@ class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, expected_healthcheck_interval, 
user_code, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
-    self.queue = Queue.Queue(max_buffered_tuples)
+    self.queue = queue.Queue(max_buffered_tuples)
     self.log_topic_handler = None
     if function_details.logTopic is not None and function_details.logTopic != 
"":
       self.log_topic_handler = 
log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 6ef74c2..5fc899a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -71,6 +71,7 @@ def main():
   parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--expected_healthcheck_interval', required=True, 
help='Expected time in seconds between health checks', type=int)
+  parser.add_argument('--install_usercode_dependencies', required=False, 
help='For packaged python like wheel files, do we need to install all 
dependencies', type=bool)
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -82,8 +83,11 @@ def main():
   json_format.Parse(args.function_details, function_details)
 
   if os.path.splitext(str(args.py))[1] == '.whl':
-    zpfile = zipfile.ZipFile(str(args.py), 'r')
-    zpfile.extractall(os.path.dirname(str(args.py)))
+    if args.install_usercode_dependencies:
+      os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), 
str(args.py)))
+    else:
+      zpfile = zipfile.ZipFile(str(args.py), 'r')
+      zpfile.extractall(os.path.dirname(str(args.py)))
     sys.path.insert(0, os.path.dirname(str(args.py)))
 
   log_file = os.path.join(args.logging_directory,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f..157de9e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -95,6 +95,9 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--expected_healthcheck_interval", description = 
"Expected interval in seconds between healtchecks", required = true)
     protected int expectedHealthCheckInterval;
 
+    @Parameter(names = "--install_usercode_dependencies", description = "Do we 
need to explictly install any user code dependencies(Does not apply to Java", 
required = false)
+    protected Boolean installUsercodeDependencies;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 43f5a5d..631b910 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -96,6 +96,7 @@ class KubernetesRuntime implements Runtime {
                       CoreV1Api coreClient,
                       String jobNamespace,
                       Map<String, String> customLabels,
+                      Boolean installUserCodeDependencies,
                       String pulsarDockerImageName,
                       String pulsarRootDir,
                       InstanceConfig instanceConfig,
@@ -118,7 +119,7 @@ class KubernetesRuntime implements Runtime {
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, 
instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, 
stateStorageServiceUrl,
-                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, 
"conf/log4j2.yaml");
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, 
"conf/log4j2.yaml", installUserCodeDependencies);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index cba9ebf..dee265f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -45,6 +45,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     private final String pulsarDockerImageName;
     private final String pulsarRootDir;
     private final Boolean submittingInsidePod;
+    private final Boolean installUserCodeDependencies;
     private final Map<String, String> customLabels;
     private final String pulsarAdminUri;
     private final String pulsarServiceUri;
@@ -62,6 +63,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
                                     String pulsarDockerImageName,
                                     String pulsarRootDir,
                                     Boolean submittingInsidePod,
+                                    Boolean installUserCodeDependencies,
                                     Map<String, String> customLabels,
                                     String pulsarServiceUri,
                                     String pulsarAdminUri,
@@ -84,6 +86,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             this.pulsarRootDir = "/pulsar";
         }
         this.submittingInsidePod = submittingInsidePod;
+        this.installUserCodeDependencies = installUserCodeDependencies;
         this.customLabels = customLabels;
         this.pulsarServiceUri = pulsarServiceUri;
         this.pulsarAdminUri = pulsarAdminUri;
@@ -119,6 +122,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             coreClient,
             jobNamespace,
             customLabels,
+            installUserCodeDependencies,
             pulsarDockerImageName,
             pulsarRootDir,
             instanceConfig,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2146376..3534915 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -74,7 +74,7 @@ class ProcessRuntime implements Runtime {
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, 
instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), 
instanceConfig.getPort(), expectedHealthCheckInterval,
-                "java_instance_log4j2.yml");
+                "java_instance_log4j2.yml", false);
     }
 
     /**
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index fe2a88e..3342482 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,7 +47,8 @@ class RuntimeUtils {
                                            String shardId,
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
-                                           String javaLog4jFileName) throws 
Exception {
+                                           String javaLog4jFileName,
+                                           Boolean 
installUserCodeDepdendencies) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -130,6 +131,10 @@ class RuntimeUtils {
         }
         args.add("--expected_healthcheck_interval");
         args.add(String.valueOf(expectedHealthCheckInterval));
+        if (installUserCodeDepdendencies != null && 
installUserCodeDepdendencies) {
+            args.add("--install_usercode_dependencies");
+            args.add("True");
+        }
         return args;
     }
 }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index f4aeafc..6d2fc23 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, 
pulsarRootDir,
-            false, null, pulsarServiceUrl, pulsarAdminUrl, 
stateStorageServiceUrl, null));
+            false, true, null, pulsarServiceUrl, pulsarAdminUrl, 
stateStorageServiceUrl, null));
         doNothing().when(this.factory).setupClient();
     }
 
@@ -121,7 +121,7 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 30);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + 
javaInstanceJarFile
                 + " -Dlog4j.configurationFile=conf/log4j2.yaml "
@@ -135,7 +135,7 @@ public class KubernetesRuntimeTest {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1 
--install_usercode_dependencies True";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -145,7 +145,7 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 24);
+        assertEquals(args.size(), 26);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile + " 
--logging_directory "
                 + logDirectory + " --logging_file " + 
config.getFunctionDetails().getName() + " --instance_id "
@@ -154,7 +154,7 @@ public class KubernetesRuntimeTest {
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(21)
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1 
--install_usercode_dependencies True";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e5b4a6b..1b8a590 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -125,6 +125,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     
workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
                     
workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
                     
workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
+                    
workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
                     
workerConfig.getKubernetesContainerFactory().getCustomLabels(),
                     
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl())
 ? workerConfig.getPulsarServiceUrl() : 
workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
                     
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl())
 ? workerConfig.getPulsarWebServiceUrl() : 
workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a3c393f..c18f824 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -139,6 +139,7 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
         private Boolean submittingInsidePod;
         private String pulsarServiceUrl;
         private String pulsarAdminUrl;
+        private Boolean installUserCodeDependencies;
         private Map<String, String> customLabels;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;

Reply via email to