This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3d4d5ac044 [fix][fn] Enable optimized Netty direct byte buffer
support for Pulsar Function runtimes (#22910)
f3d4d5ac044 is described below
commit f3d4d5ac0442eed2b538b8587186cdc0b8df9987
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 17 09:26:28 2024 +0300
[fix][fn] Enable optimized Netty direct byte buffer support for Pulsar
Function runtimes (#22910)
---
.../pulsar/functions/runtime/RuntimeUtils.java | 18 +++++++++--
.../runtime/kubernetes/KubernetesRuntimeTest.java | 36 ++++++++++++----------
.../runtime/process/ProcessRuntimeTest.java | 16 ++++++----
3 files changed, 46 insertions(+), 24 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 6160626c958..49a5dd40fa2 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -367,12 +367,26 @@ public class RuntimeUtils {
instanceConfig.getFunctionDetails().getName(),
shardId));
+ // Needed for optimized Netty direct byte buffer support
args.add("-Dio.netty.tryReflectionSetAccessible=true");
+ // Handle possible shaded Netty versions
+
args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true");
+
args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true");
+
+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) {
+ // Needed for optimized Netty direct byte buffer support
+ args.add("--add-opens");
+ args.add("java.base/java.nio=ALL-UNNAMED");
+ args.add("--add-opens");
+ args.add("java.base/jdk.internal.misc=ALL-UNNAMED");
+ }
- // Needed for netty.DnsResolverUtil on JDK9+
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+ // Needed for optimized checksum calculation when
com.scurrilous.circe.checksum.Java9IntHash
+ // is used. That gets used when the native library
libcirce-checksum is not available or cannot
+ // be loaded.
args.add("--add-opens");
- args.add("java.base/sun.net=ALL-UNNAMED");
+ args.add("java.base/java.util.zip=ALL-UNNAMED");
}
if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 980f763f7c3..bf73f0a9d34 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -441,14 +441,14 @@ public class KubernetesRuntimeTest {
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 46;
- portArg = 33;
- metricsPortArg = 35;
+ totalArgs = 52;
+ portArg = 39;
+ metricsPortArg = 41;
} else {
extraDepsEnv = "";
- portArg = 32;
- metricsPortArg = 34;
- totalArgs = 45;
+ portArg = 38;
+ metricsPortArg = 40;
+ totalArgs = 51;
}
if (secretsAttached) {
totalArgs += 4;
@@ -479,7 +479,11 @@ public class KubernetesRuntimeTest {
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" +
FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" +
config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Dio.netty.tryReflectionSetAccessible=true"
- + " --add-opens java.base/sun.net=ALL-UNNAMED"
+ + "
-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+ + "
-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+ + " --add-opens java.base/java.nio=ALL-UNNAMED"
+ + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+ + " --add-opens java.base/java.util.zip=ALL-UNNAMED"
+ " -Xmx" + RESOURCES.getRam()
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation
@@ -1314,7 +1318,7 @@ public class KubernetesRuntimeTest {
.contains("--metrics_port 0"));
}
}
-
+
@Test
public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars()
throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
@@ -1323,22 +1327,22 @@ public class KubernetesRuntimeTest {
CoreV1Api coreApi = mock(CoreV1Api.class);
AppsV1Api appsApi = mock(AppsV1Api.class);
-
+
Call successfulCall = mock(Call.class);
Response okResponse = mock(Response.class);
when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
when(okResponse.isSuccessful()).thenReturn(true);
when(okResponse.message()).thenReturn("");
when(successfulCall.execute()).thenReturn(okResponse);
-
+
final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s",
"c-tenant", "c-ns", "c-fn");
-
+
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
factory.setCoreClient(coreApi);
factory.setAppsClient(appsApi);
ArgumentMatcher<String> hasTranslatedFunctionName = (String t) ->
t.startsWith(expectedFunctionNamePrefix);
-
+
when(appsApi.deleteNamespacedStatefulSetCall(
argThat(hasTranslatedFunctionName),
anyString(), isNull(), isNull(), anyInt(), isNull(),
anyString(), any(), isNull())).thenReturn(successfulCall);
@@ -1350,14 +1354,14 @@ public class KubernetesRuntimeTest {
V1PodList podList = mock(V1PodList.class);
when(podList.getItems()).thenReturn(Collections.emptyList());
-
+
String expectedLabels =
String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
-
+
when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull())).thenReturn(podList);
- KubernetesRuntime kr = factory.createContainer(config, "/test/code",
"code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
+ KubernetesRuntime kr = factory.createContainer(config, "/test/code",
"code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
kr.deleteStatefulSet();
-
+
verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull());
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index f63f24dc256..365704ea0b4 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -297,7 +297,7 @@ public class ProcessRuntimeTest {
String extraDepsEnv;
int portArg;
int metricsPortArg;
- int totalArgCount = 48;
+ int totalArgCount = 54;
if (webServiceUrl != null &&
config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
@@ -305,13 +305,13 @@ public class ProcessRuntimeTest {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" +
depsDir;
classpath = classpath + ":" + depsDir + "/*";
- portArg = 31;
- metricsPortArg = 33;
+ portArg = 37;
+ metricsPortArg = 39;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
- portArg = 30;
- metricsPortArg = 32;
+ portArg = 36;
+ metricsPortArg = 38;
}
if (webServiceUrl != null &&
config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
@@ -328,7 +328,11 @@ public class ProcessRuntimeTest {
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/"
+ FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" +
config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " -Dio.netty.tryReflectionSetAccessible=true"
- + " --add-opens java.base/sun.net=ALL-UNNAMED"
+ + "
-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+ + "
-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+ + " --add-opens java.base/java.nio=ALL-UNNAMED"
+ + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+ + " --add-opens java.base/java.util.zip=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile
+ " --transform_function_jar " + userJarFile