This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d2b45b6  issue#9345 -- bug fix for RuntimeUtils.getCmd not setting 
pending-async-requests (#9349)
d2b45b6 is described below

commit d2b45b6d26e6b9f76761d0e0bfca58ff2ad6ec68
Author: csthomas1 <[email protected]>
AuthorDate: Fri Mar 12 22:18:29 2021 -0500

    issue#9345 -- bug fix for RuntimeUtils.getCmd not setting 
pending-async-requests (#9349)
    
    Fixes #9345
    
    ### Motivation
    
    When I enable either the Process or Kubernetes function Runtimes and set 
the parameter "maxPendingAsyncRequests" to a value other than 1000 in the 
function worker configuration (e.g. functions_worker.yml), any function 
instances launched by the worker continue to have maxPendingAsyncRequests = 
1000. I'd like to be able to adjust this parameter to guard against my function 
running out of memory -- without increasing the function heap size -- due to 
too many publish operations in flight [...]
    
    ### Modifications
    
    Modified RuntimeUtils.getCmd to add --pending-sync-requests to the list of 
arguments passed to JavaInstanceStarter
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
    
      - Updated 
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 to test for presence of --pending-async-requests argument in generated 
commandline, and increased expected arg counts by 2 to account for expected 
presence of --pending-async-requests <value>
      - Updated 
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
 to test for presence of --pending-async-requests argument in generated 
commandline, and increased expected arg counts by 2 to account for expected 
presence of --pending-async-requests <value>
      - Updated 
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
 to verify that the configured value of "maxPendingAsyncRequests" was being 
properly loaded into the WorkerConfig
---
 .../pulsar/functions/runtime/RuntimeUtils.java     |  8 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 85 ++++------------------
 .../runtime/process/ProcessRuntimeTest.java        |  4 +-
 .../worker/WorkerApiV2ResourceConfigTest.java      |  1 +
 .../src/test/resources/test_worker_config.yml      |  1 +
 5 files changed, 27 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 4204d3a..1ac1ece 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -390,13 +390,19 @@ public class RuntimeUtils {
         }
         args.add("--max_buffered_tuples");
         args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-
+        
         args.add("--port");
         args.add(String.valueOf(grpcPort));
 
         args.add("--metrics_port");
         args.add(String.valueOf(instanceConfig.getMetricsPort()));
 
+        // only the Java instance supports --pending_async_requests right now.
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
+            args.add("--pending_async_requests");
+            
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
+        }
+        
         // state storage configs
         if (null != stateStorageServiceUrl) {
             args.add("--state_storage_serviceurl");
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index aae8d47..6a68965 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -284,6 +284,7 @@ public class KubernetesRuntimeTest {
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setMaxPendingAsyncRequests(200);
         config.setClusterName("standalone");
 
         return config;
@@ -373,6 +374,10 @@ public class KubernetesRuntimeTest {
         
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
 roundDecimal(resources.getCpu(), 3));
     }
 
+    private void verifyJavaInstance(InstanceConfig config, String depsDir, 
boolean secretsAttached) throws Exception {
+        verifyJavaInstance(config, depsDir, secretsAttached, null);
+    }
+
     private void verifyJavaInstance(InstanceConfig config, String depsDir, 
boolean secretsAttached, String downloadDirectory) throws Exception {
         KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
@@ -386,91 +391,30 @@ public class KubernetesRuntimeTest {
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 37;
+            totalArgs = 39;
             portArg = 26;
             metricsPortArg = 28;
         } else {
             extraDepsEnv = "";
             portArg = 25;
             metricsPortArg = 27;
-            totalArgs = 36;
+            totalArgs = 38;
         }
         if (secretsAttached) {
             totalArgs += 4;
         }
+        if (config.isExposePulsarAdminClientEnabled()) {
+            totalArgs += 2;
+            portArg += 2;
+            metricsPortArg += 2;
+        }
+
         if (StringUtils.isNotEmpty(downloadDirectory)){
             jarLocation = downloadDirectory + "/" + userJarFile;
         } else {
             jarLocation = pulsarRootDir + "/" + userJarFile;
         }
 
-        assertEquals(args.size(), totalArgs,
-                "Actual args : " + StringUtils.join(args, " "));
-
-        String expectedArgs = "exec java -cp " + classpath
-                + extraDepsEnv
-                + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
-                + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
-                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + 
FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
-                + " -Dpulsar.function.log.file=" + 
config.getFunctionDetails().getName() + "-$SHARD_ID"
-                + " -Xmx" + String.valueOf(RESOURCES.getRam())
-                + " org.apache.pulsar.functions.instance.JavaInstanceMain"
-                + " --jar " + jarLocation + " --instance_id "
-                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
-                + " --function_version " + config.getFunctionVersion()
-                + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
-                + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
-                + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval -1";
-        if (secretsAttached) {
-            expectedArgs += " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
-                    + " --secrets_provider_config 
'{\"Somevalue\":\"myvalue\"}'";
-        }
-        expectedArgs += " --cluster_name standalone --nar_extraction_directory 
" + narExtractionDirectory;
-
-        assertEquals(String.join(" ", args), expectedArgs);
-
-        // check padding and xmx
-        long heap = Long.parseLong(args.stream().filter(s -> 
s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
-        V1Container containerSpec = 
container.getFunctionContainer(Collections.emptyList(), RESOURCES);
-        assertEquals(heap, RESOURCES.getRam());
-        
assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(),
 Math.round(heap + (heap * 0.1)));
-
-        // check cpu
-        
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(),
 RESOURCES.getCpu());
-        
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
 RESOURCES.getCpu());
-    }
-
-    private void verifyJavaInstance(InstanceConfig config, String depsDir, 
boolean secretsAttached) throws Exception {
-        KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
-        List<String> args = container.getProcessArgs();
-
-        String classpath = javaInstanceJarFile;
-        String extraDepsEnv;
-        int portArg;
-        int metricsPortArg;
-        int totalArgs;
-        if (null != depsDir) {
-            extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
-            classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 37;
-            portArg = 26;
-            metricsPortArg = 28;
-        } else {
-            extraDepsEnv = "";
-            portArg = 25;
-            metricsPortArg = 27;
-            totalArgs = 36;
-        }
-        if (secretsAttached) {
-            totalArgs += 4;
-        }
-        if (config.isExposePulsarAdminClientEnabled()) {
-            totalArgs += 2;
-            portArg += 2;
-            metricsPortArg += 2;
-        }
 
         assertEquals(args.size(), totalArgs,
                 "Actual args : " + StringUtils.join(args, " "));
@@ -485,13 +429,14 @@ public class KubernetesRuntimeTest {
                 + " -Dpulsar.function.log.file=" + 
config.getFunctionDetails().getName() + "-$SHARD_ID"
                 + " -Xmx" + String.valueOf(RESOURCES.getRam())
                 + " org.apache.pulsar.functions.instance.JavaInstanceMain"
-                + " --jar " + pulsarRootDir + "/" + userJarFile + " 
--instance_id "
+                + " --jar " + jarLocation + " --instance_id "
                 + "$SHARD_ID" + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + 
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + pulsarAdminArg
                 + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
+                + " --pending_async_requests 200"
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval -1";
         if (secretsAttached) {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 10c640a..6a2997d 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -207,6 +207,7 @@ public class ProcessRuntimeTest {
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
+        config.setMaxPendingAsyncRequests(200);
         config.setClusterName("standalone");
 
         return config;
@@ -297,7 +298,7 @@ public class ProcessRuntimeTest {
         String extraDepsEnv;
         int portArg;
         int metricsPortArg;
-        int totalArgCount = 39;
+        int totalArgCount = 41;
         if (webServiceUrl != null && 
config.isExposePulsarAdminClientEnabled()) {
             totalArgCount += 2;
         }
@@ -335,6 +336,7 @@ public class ProcessRuntimeTest {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + pulsarAdminArg
                 + " --max_buffered_tuples 1024 --port " + args.get(portArg) + 
" --metrics_port " + args.get(metricsPortArg)
+                + " --pending_async_requests 200"
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval 30"
                 + " --secrets_provider 
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index 21e98b8..a7ae6b2 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -64,6 +64,7 @@ public class WorkerApiV2ResourceConfigTest {
         assertEquals(3, wc.getNumFunctionPackageReplicas());
         assertEquals("test-worker", wc.getWorkerId());
         assertEquals(new Integer(7654), wc.getWorkerPort());
+        assertEquals(200, wc.getMaxPendingAsyncRequests());
     }
 
     @Test
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml 
b/pulsar-functions/src/test/resources/test_worker_config.yml
index f2645f6..4614ca3 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -22,4 +22,5 @@ workerPort: 7654
 pulsarServiceUrl: pulsar://localhost:6650
 functionMetadataTopicName: test-function-metadata-topic
 numFunctionPackageReplicas: 3
+maxPendingAsyncRequests: 200
 

Reply via email to