srkukarni closed pull request #2729: Added ability a download dependencies in Kubernetes URL: https://github.com/apache/pulsar/pull/2729
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index f8a14ffc1a..2aae5f64c6 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -26,7 +26,10 @@ import os import signal import time -import Queue +try: + import Queue as queue +except: + import queue import threading from functools import partial from collections import namedtuple @@ -117,7 +120,7 @@ class PythonInstance(object): def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client): self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) self.user_code = user_code - self.queue = Queue.Queue(max_buffered_tuples) + self.queue = queue.Queue(max_buffered_tuples) self.log_topic_handler = None if function_details.logTopic is not None and function_details.logTopic != "": self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 6ef74c2fba..5fc899ad84 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -71,6 +71,7 @@ def main(): parser.add_argument('--logging_directory', required=True, help='Logging Directory') parser.add_argument('--logging_file', required=True, help='Log file name') parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int) + parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool) args = parser.parse_args() function_details = Function_pb2.FunctionDetails() @@ -82,8 +83,11 @@ def main(): json_format.Parse(args.function_details, function_details) if os.path.splitext(str(args.py))[1] == '.whl': - zpfile = zipfile.ZipFile(str(args.py), 'r') - zpfile.extractall(os.path.dirname(str(args.py))) + if args.install_usercode_dependencies: + os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py))) + else: + zpfile = zipfile.ZipFile(str(args.py), 'r') + zpfile.extractall(os.path.dirname(str(args.py))) sys.path.insert(0, os.path.dirname(str(args.py))) log_file = os.path.join(args.logging_directory, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 1551f6f345..157de9e37d 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -95,6 +95,9 @@ @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true) protected int expectedHealthCheckInterval; + @Parameter(names = "--install_usercode_dependencies", description = "Do we need to explictly install any user code dependencies(Does not apply to Java", required = false) + protected Boolean installUsercodeDependencies; + private Server server; private RuntimeSpawner runtimeSpawner; private ThreadRuntimeFactory containerFactory; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 43f5a5d386..631b910a80 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -96,6 +96,7 @@ CoreV1Api coreClient, String jobNamespace, Map<String, String> customLabels, + Boolean installUserCodeDependencies, String pulsarDockerImageName, String pulsarRootDir, InstanceConfig instanceConfig, @@ -118,7 +119,7 @@ this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName; this.pulsarAdminUrl = pulsarAdminUrl; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, - authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml"); + authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml", installUserCodeDependencies); running = false; doChecks(instanceConfig.getFunctionDetails()); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index cba9ebf2d6..dee265f346 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -45,6 +45,7 @@ private final String pulsarDockerImageName; private final String pulsarRootDir; private final Boolean submittingInsidePod; + private final Boolean installUserCodeDependencies; private final Map<String, String> customLabels; private final String pulsarAdminUri; private final String pulsarServiceUri; @@ -62,6 +63,7 @@ public KubernetesRuntimeFactory(String k8Uri, String pulsarDockerImageName, String pulsarRootDir, Boolean submittingInsidePod, + Boolean installUserCodeDependencies, Map<String, String> customLabels, String pulsarServiceUri, String pulsarAdminUri, @@ -84,6 +86,7 @@ public KubernetesRuntimeFactory(String k8Uri, this.pulsarRootDir = "/pulsar"; } this.submittingInsidePod = submittingInsidePod; + this.installUserCodeDependencies = installUserCodeDependencies; this.customLabels = customLabels; this.pulsarServiceUri = pulsarServiceUri; this.pulsarAdminUri = pulsarAdminUri; @@ -119,6 +122,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c coreClient, jobNamespace, customLabels, + installUserCodeDependencies, pulsarDockerImageName, pulsarRootDir, instanceConfig, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 2146376d7a..35349159a3 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -74,7 +74,7 @@ this.expectedHealthCheckInterval = expectedHealthCheckInterval; this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl, authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval, - "java_instance_log4j2.yml"); + "java_instance_log4j2.yml", false); } /** diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index fe2a88ee8e..33424820a2 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -47,7 +47,8 @@ String shardId, Integer grpcPort, Long expectedHealthCheckInterval, - String javaLog4jFileName) throws Exception { + String javaLog4jFileName, + Boolean installUserCodeDepdendencies) throws Exception { List<String> args = new LinkedList<>(); if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { args.add("java"); @@ -130,6 +131,10 @@ } args.add("--expected_healthcheck_interval"); args.add(String.valueOf(expectedHealthCheckInterval)); + if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) { + args.add("--install_usercode_dependencies"); + args.add("True"); + } return args; } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index f4aeafc6a1..6d2fc23eed 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -72,7 +72,7 @@ public KubernetesRuntimeTest() throws Exception { this.stateStorageServiceUrl = "bk://localhost:4181"; this.logDirectory = "logs/functions"; this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir, - false, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null)); + false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null)); doNothing().when(this.factory).setupClient(); } @@ -121,7 +121,7 @@ public void testJavaConstructor() throws Exception { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 28); + assertEquals(args.size(), 30); String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile + " -Dlog4j.configurationFile=conf/log4j2.yaml " @@ -135,7 +135,7 @@ public void testJavaConstructor() throws Exception { + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(23) + " --state_storage_serviceurl " + stateStorageServiceUrl - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1 --install_usercode_dependencies True"; assertEquals(String.join(" ", args), expectedArgs); } @@ -145,7 +145,7 @@ public void testPythonConstructor() throws Exception { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 24); + assertEquals(args.size(), 26); String expectedArgs = "python " + pythonInstanceFile + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory " + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id " @@ -154,7 +154,7 @@ public void testPythonConstructor() throws Exception { + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(21) - + " --expected_healthcheck_interval -1"; + + " --expected_healthcheck_interval -1 --install_usercode_dependencies True"; assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index e5b4a6b8d1..1b8a590baf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -125,6 +125,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(), workerConfig.getKubernetesContainerFactory().getPulsarRootDir(), workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(), + workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(), workerConfig.getKubernetesContainerFactory().getCustomLabels(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index a3c393f510..c18f824714 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -139,6 +139,7 @@ private Boolean submittingInsidePod; private String pulsarServiceUrl; private String pulsarAdminUrl; + private Boolean installUserCodeDependencies; private Map<String, String> customLabels; } private KubernetesContainerFactory kubernetesContainerFactory; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services