This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d2b45b6 issue#9345 -- bug fix for RuntimeUtils.getCmd not setting
pending-async-requests (#9349)
d2b45b6 is described below
commit d2b45b6d26e6b9f76761d0e0bfca58ff2ad6ec68
Author: csthomas1 <[email protected]>
AuthorDate: Fri Mar 12 22:18:29 2021 -0500
issue#9345 -- bug fix for RuntimeUtils.getCmd not setting
pending-async-requests (#9349)
Fixes #9345
### Motivation
When I enable either the Process or Kubernetes function Runtimes and set
the parameter "maxPendingAsyncRequests" to a value other than 1000 in the
function worker configuration (e.g. functions_worker.yml), any function
instances launched by the worker continue to have maxPendingAsyncRequests =
1000. I'd like to be able to adjust this parameter to guard against my function
running out of memory -- without increasing the function heap size -- due to
too many publish operations in flight [...]
### Modifications
Modified RuntimeUtils.getCmd to add --pending-sync-requests to the list of
arguments passed to JavaInstanceStarter
### Verifying this change
This change added tests and can be verified as follows:
- Updated
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
to test for presence of --pending-async-requests argument in generated
commandline, and increased expected arg counts by 2 to account for expected
presence of --pending-async-requests <value>
- Updated
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
to test for presence of --pending-async-requests argument in generated
commandline, and increased expected arg counts by 2 to account for expected
presence of --pending-async-requests <value>
- Updated
pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
to verify that the configured value of "maxPendingAsyncRequests" was being
properly loaded into the WorkerConfig
---
.../pulsar/functions/runtime/RuntimeUtils.java | 8 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 85 ++++------------------
.../runtime/process/ProcessRuntimeTest.java | 4 +-
.../worker/WorkerApiV2ResourceConfigTest.java | 1 +
.../src/test/resources/test_worker_config.yml | 1 +
5 files changed, 27 insertions(+), 72 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 4204d3a..1ac1ece 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -390,13 +390,19 @@ public class RuntimeUtils {
}
args.add("--max_buffered_tuples");
args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-
+
args.add("--port");
args.add(String.valueOf(grpcPort));
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));
+ // only the Java instance supports --pending_async_requests right now.
+ if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
+ args.add("--pending_async_requests");
+
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
+ }
+
// state storage configs
if (null != stateStorageServiceUrl) {
args.add("--state_storage_serviceurl");
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index aae8d47..6a68965 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -284,6 +284,7 @@ public class KubernetesRuntimeTest {
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setMaxPendingAsyncRequests(200);
config.setClusterName("standalone");
return config;
@@ -373,6 +374,10 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
roundDecimal(resources.getCpu(), 3));
}
+ private void verifyJavaInstance(InstanceConfig config, String depsDir,
boolean secretsAttached) throws Exception {
+ verifyJavaInstance(config, depsDir, secretsAttached, null);
+ }
+
private void verifyJavaInstance(InstanceConfig config, String depsDir,
boolean secretsAttached, String downloadDirectory) throws Exception {
KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
@@ -386,91 +391,30 @@ public class KubernetesRuntimeTest {
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 37;
+ totalArgs = 39;
portArg = 26;
metricsPortArg = 28;
} else {
extraDepsEnv = "";
portArg = 25;
metricsPortArg = 27;
- totalArgs = 36;
+ totalArgs = 38;
}
if (secretsAttached) {
totalArgs += 4;
}
+ if (config.isExposePulsarAdminClientEnabled()) {
+ totalArgs += 2;
+ portArg += 2;
+ metricsPortArg += 2;
+ }
+
if (StringUtils.isNotEmpty(downloadDirectory)){
jarLocation = downloadDirectory + "/" + userJarFile;
} else {
jarLocation = pulsarRootDir + "/" + userJarFile;
}
- assertEquals(args.size(), totalArgs,
- "Actual args : " + StringUtils.join(args, " "));
-
- String expectedArgs = "exec java -cp " + classpath
- + extraDepsEnv
- + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
- + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
- + "-Dpulsar.function.log.dir=" + logDirectory + "/" +
FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
- + " -Dpulsar.function.log.file=" +
config.getFunctionDetails().getName() + "-$SHARD_ID"
- + " -Xmx" + String.valueOf(RESOURCES.getRam())
- + " org.apache.pulsar.functions.instance.JavaInstanceMain"
- + " --jar " + jarLocation + " --instance_id "
- + "$SHARD_ID" + " --function_id " + config.getFunctionId()
- + " --function_version " + config.getFunctionVersion()
- + " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
- + "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
- + " --state_storage_serviceurl " + stateStorageServiceUrl
- + " --expected_healthcheck_interval -1";
- if (secretsAttached) {
- expectedArgs += " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
- + " --secrets_provider_config
'{\"Somevalue\":\"myvalue\"}'";
- }
- expectedArgs += " --cluster_name standalone --nar_extraction_directory
" + narExtractionDirectory;
-
- assertEquals(String.join(" ", args), expectedArgs);
-
- // check padding and xmx
- long heap = Long.parseLong(args.stream().filter(s ->
s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
- V1Container containerSpec =
container.getFunctionContainer(Collections.emptyList(), RESOURCES);
- assertEquals(heap, RESOURCES.getRam());
-
assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(),
Math.round(heap + (heap * 0.1)));
-
- // check cpu
-
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(),
RESOURCES.getCpu());
-
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
RESOURCES.getCpu());
- }
-
- private void verifyJavaInstance(InstanceConfig config, String depsDir,
boolean secretsAttached) throws Exception {
- KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, 30l);
- List<String> args = container.getProcessArgs();
-
- String classpath = javaInstanceJarFile;
- String extraDepsEnv;
- int portArg;
- int metricsPortArg;
- int totalArgs;
- if (null != depsDir) {
- extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
- classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 37;
- portArg = 26;
- metricsPortArg = 28;
- } else {
- extraDepsEnv = "";
- portArg = 25;
- metricsPortArg = 27;
- totalArgs = 36;
- }
- if (secretsAttached) {
- totalArgs += 4;
- }
- if (config.isExposePulsarAdminClientEnabled()) {
- totalArgs += 2;
- portArg += 2;
- metricsPortArg += 2;
- }
assertEquals(args.size(), totalArgs,
"Actual args : " + StringUtils.join(args, " "));
@@ -485,13 +429,14 @@ public class KubernetesRuntimeTest {
+ " -Dpulsar.function.log.file=" +
config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Xmx" + String.valueOf(RESOURCES.getRam())
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
- + " --jar " + pulsarRootDir + "/" + userJarFile + "
--instance_id "
+ + " --jar " + jarLocation + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" +
JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ pulsarAdminArg
+ " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ + " --pending_async_requests 200"
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval -1";
if (secretsAttached) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 10c640a..6a2997d 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -207,6 +207,7 @@ public class ProcessRuntimeTest {
config.setFunctionVersion("1.0");
config.setInstanceId(0);
config.setMaxBufferedTuples(1024);
+ config.setMaxPendingAsyncRequests(200);
config.setClusterName("standalone");
return config;
@@ -297,7 +298,7 @@ public class ProcessRuntimeTest {
String extraDepsEnv;
int portArg;
int metricsPortArg;
- int totalArgCount = 39;
+ int totalArgCount = 41;
if (webServiceUrl != null &&
config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 2;
}
@@ -335,6 +336,7 @@ public class ProcessRuntimeTest {
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ pulsarAdminArg
+ " --max_buffered_tuples 1024 --port " + args.get(portArg) +
" --metrics_port " + args.get(metricsPortArg)
+ + " --pending_async_requests 200"
+ " --state_storage_serviceurl " + stateStorageServiceUrl
+ " --expected_healthcheck_interval 30"
+ " --secrets_provider
org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index 21e98b8..a7ae6b2 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -64,6 +64,7 @@ public class WorkerApiV2ResourceConfigTest {
assertEquals(3, wc.getNumFunctionPackageReplicas());
assertEquals("test-worker", wc.getWorkerId());
assertEquals(new Integer(7654), wc.getWorkerPort());
+ assertEquals(200, wc.getMaxPendingAsyncRequests());
}
@Test
diff --git a/pulsar-functions/src/test/resources/test_worker_config.yml
b/pulsar-functions/src/test/resources/test_worker_config.yml
index f2645f6..4614ca3 100644
--- a/pulsar-functions/src/test/resources/test_worker_config.yml
+++ b/pulsar-functions/src/test/resources/test_worker_config.yml
@@ -22,4 +22,5 @@ workerPort: 7654
pulsarServiceUrl: pulsar://localhost:6650
functionMetadataTopicName: test-function-metadata-topic
numFunctionPackageReplicas: 3
+maxPendingAsyncRequests: 200