This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d4d5ac044 [fix][fn] Enable optimized Netty direct byte buffer 
support for Pulsar Function runtimes (#22910)
f3d4d5ac044 is described below

commit f3d4d5ac0442eed2b538b8587186cdc0b8df9987
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 17 09:26:28 2024 +0300

    [fix][fn] Enable optimized Netty direct byte buffer support for Pulsar 
Function runtimes (#22910)
---
 .../pulsar/functions/runtime/RuntimeUtils.java     | 18 +++++++++--
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 36 ++++++++++++----------
 .../runtime/process/ProcessRuntimeTest.java        | 16 ++++++----
 3 files changed, 46 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 6160626c958..49a5dd40fa2 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -367,12 +367,26 @@ public class RuntimeUtils {
                     instanceConfig.getFunctionDetails().getName(),
                     shardId));
 
+            // Needed for optimized Netty direct byte buffer support
             args.add("-Dio.netty.tryReflectionSetAccessible=true");
+            // Handle possible shaded Netty versions
+            
args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true");
+            
args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true");
+
+            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) {
+                // Needed for optimized Netty direct byte buffer support
+                args.add("--add-opens");
+                args.add("java.base/java.nio=ALL-UNNAMED");
+                args.add("--add-opens");
+                args.add("java.base/jdk.internal.misc=ALL-UNNAMED");
+            }
 
-            // Needed for netty.DnsResolverUtil on JDK9+
             if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+                // Needed for optimized checksum calculation when 
com.scurrilous.circe.checksum.Java9IntHash
+                // is used. That gets used when the native library 
libcirce-checksum is not available or cannot
+                // be loaded.
                 args.add("--add-opens");
-                args.add("java.base/sun.net=ALL-UNNAMED");
+                args.add("java.base/java.util.zip=ALL-UNNAMED");
             }
 
             if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 980f763f7c3..bf73f0a9d34 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -441,14 +441,14 @@ public class KubernetesRuntimeTest {
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 46;
-            portArg = 33;
-            metricsPortArg = 35;
+            totalArgs = 52;
+            portArg = 39;
+            metricsPortArg = 41;
         } else {
             extraDepsEnv = "";
-            portArg = 32;
-            metricsPortArg = 34;
-            totalArgs = 45;
+            portArg = 38;
+            metricsPortArg = 40;
+            totalArgs = 51;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -479,7 +479,11 @@ public class KubernetesRuntimeTest {
                 + "-Dpulsar.function.log.dir=" + logDirectory + "/" + 
FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + 
config.getFunctionDetails().getName() + "-$SHARD_ID"
                 + " -Dio.netty.tryReflectionSetAccessible=true"
-                + " --add-opens java.base/sun.net=ALL-UNNAMED"
+                + " 
-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+                + " 
-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+                + " --add-opens java.base/java.nio=ALL-UNNAMED"
+                + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+                + " --add-opens java.base/java.util.zip=ALL-UNNAMED"
                 + " -Xmx" + RESOURCES.getRam()
                 + " org.apache.pulsar.functions.instance.JavaInstanceMain"
                 + " --jar " + jarLocation
@@ -1314,7 +1318,7 @@ public class KubernetesRuntimeTest {
                     .contains("--metrics_port 0"));
         }
     }
-    
+
     @Test
     public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() 
throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
@@ -1323,22 +1327,22 @@ public class KubernetesRuntimeTest {
 
         CoreV1Api coreApi = mock(CoreV1Api.class);
         AppsV1Api appsApi = mock(AppsV1Api.class);
-        
+
         Call successfulCall = mock(Call.class);
         Response okResponse = mock(Response.class);
         when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
         when(okResponse.isSuccessful()).thenReturn(true);
         when(okResponse.message()).thenReturn("");
         when(successfulCall.execute()).thenReturn(okResponse);
-        
+
         final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", 
"c-tenant", "c-ns", "c-fn");
-        
+
         factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
         factory.setCoreClient(coreApi);
         factory.setAppsClient(appsApi);
 
         ArgumentMatcher<String> hasTranslatedFunctionName = (String t) -> 
t.startsWith(expectedFunctionNamePrefix);
-        
+
         when(appsApi.deleteNamespacedStatefulSetCall(
                 argThat(hasTranslatedFunctionName),
                 anyString(), isNull(), isNull(), anyInt(), isNull(), 
anyString(), any(), isNull())).thenReturn(successfulCall);
@@ -1350,14 +1354,14 @@ public class KubernetesRuntimeTest {
 
         V1PodList podList = mock(V1PodList.class);
         when(podList.getItems()).thenReturn(Collections.emptyList());
-        
+
         String expectedLabels = 
String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
-        
+
         when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), 
isNull(), isNull(),
                 eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), 
isNull())).thenReturn(podList);
-        KubernetesRuntime kr = factory.createContainer(config, "/test/code", 
"code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);        
+        KubernetesRuntime kr = factory.createContainer(config, "/test/code", 
"code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
         kr.deleteStatefulSet();
-        
+
         verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), 
isNull(), isNull(),
                 eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), 
isNull());
     }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index f63f24dc256..365704ea0b4 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -297,7 +297,7 @@ public class ProcessRuntimeTest {
         String extraDepsEnv;
         int portArg;
         int metricsPortArg;
-        int totalArgCount = 48;
+        int totalArgCount = 54;
         if (webServiceUrl != null && 
config.isExposePulsarAdminClientEnabled()) {
             totalArgCount += 3;
         }
@@ -305,13 +305,13 @@ public class ProcessRuntimeTest {
             assertEquals(args.size(), totalArgCount);
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + 
depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            portArg = 31;
-            metricsPortArg = 33;
+            portArg = 37;
+            metricsPortArg = 39;
         } else {
             assertEquals(args.size(), totalArgCount-1);
             extraDepsEnv = "";
-            portArg = 30;
-            metricsPortArg = 32;
+            portArg = 36;
+            metricsPortArg = 38;
         }
         if (webServiceUrl != null && 
config.isExposePulsarAdminClientEnabled()) {
             portArg += 3;
@@ -328,7 +328,11 @@ public class ProcessRuntimeTest {
                 + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" 
+ FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + 
config.getFunctionDetails().getName() + "-" + config.getInstanceId()
                 + " -Dio.netty.tryReflectionSetAccessible=true"
-                + " --add-opens java.base/sun.net=ALL-UNNAMED"
+                + " 
-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true"
+                + " 
-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true"
+                + " --add-opens java.base/java.nio=ALL-UNNAMED"
+                + " --add-opens java.base/jdk.internal.misc=ALL-UNNAMED"
+                + " --add-opens java.base/java.util.zip=ALL-UNNAMED"
                 + " org.apache.pulsar.functions.instance.JavaInstanceMain"
                 + " --jar " + userJarFile
                 + " --transform_function_jar " + userJarFile

Reply via email to