This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 01be8de9 PROTON-2899 Add support for both netty 4.1 and 4.2
01be8de9 is described below

commit 01be8de9507613a40d305284d350c411b3e6ff58
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Thu Jul 17 13:39:04 2025 -0400

    PROTON-2899 Add support for both netty 4.1 and 4.2
    
    Allow the client to work with netty 4.2 and later while still
    defaulting to testing and supporting netty 4.1 releases
---
 .../client/transport/netty4/EpollSupport.java      |  17 +-
 .../client/transport/netty4/IOUringSupport.java    |  99 +++++++++--
 .../client/transport/netty4/KQueueSupport.java     |  17 +-
 .../client/transport/netty4/TcpTransportTest.java  | 183 ++++++++++++---------
 4 files changed, 229 insertions(+), 87 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
index 7f815c0b..c9408652 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java
@@ -35,8 +35,12 @@ public final class EpollSupport {
     public static final String NAME = "EPOLL";
 
     public static boolean isAvailable(TransportOptions transportOptions) {
+        return transportOptions.allowNativeIO() && isAvailable();
+    }
+
+    public static boolean isAvailable() {
         try {
-            return transportOptions.allowNativeIO() && Epoll.isAvailable();
+            return Epoll.isAvailable();
         } catch (NoClassDefFoundError ncdfe) {
             LOG.debug("Unable to check for Epoll support due to missing class 
definition", ncdfe);
             return false;
@@ -44,10 +48,21 @@ public final class EpollSupport {
     }
 
     public static EventLoopGroup createGroup(int nThreads, ThreadFactory 
ioThreadFactory) {
+        ensureAvailability();
+
         return new EpollEventLoopGroup(nThreads, ioThreadFactory);
     }
 
     public static Class<? extends Channel> getChannelClass() {
+        ensureAvailability();
+
         return EpollSocketChannel.class;
     }
+
+    public static void ensureAvailability() {
+        if (!isAvailable()) {
+            throw new UnsupportedOperationException(
+                "Netty Epoll support is not enabled because the Netty library 
indicates it is not present or disabled");
+        }
+    }
 }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
index bc33dc85..9e1d806b 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.protonj2.client.transport.netty4;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.util.concurrent.ThreadFactory;
 
 import org.apache.qpid.protonj2.client.TransportOptions;
@@ -24,30 +26,107 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
-import io.netty.incubator.channel.uring.IOUring;
-import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
-import io.netty.incubator.channel.uring.IOUringSocketChannel;
 
+@SuppressWarnings("unchecked")
 public final class IOUringSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IOUringSupport.class);
 
     public static final String NAME = "IO_URING";
 
-    public static boolean isAvailable(TransportOptions transportOptions) {
+    private static final boolean AVAILABLE;
+    private static final boolean INCUBATOR_VARIANT;
+    private static final Class<? extends Channel> SOCKET_CHANNEL_CLASS;
+    private static final Constructor<?> EVENTLOOP_CONSTRUCTOR;
+    private static final Method IOHANDLER_FACTORY;
+
+    static {
+        boolean available = false;
+        boolean incubator = false;
+        Class<? extends Channel> socketChannelClass = null;
+        Constructor<?> constructor = null;
+        Method ioHandlerFactory = null;
+
+        // Try for new Netty built in IoUring before falling back to incubator 
checks
         try {
-            return transportOptions.allowNativeIO() && IOUring.isAvailable();
-        } catch (NoClassDefFoundError ncdfe) {
-            LOG.debug("Unable to check for IO_Uring support due to missing 
class definition", ncdfe);
-            return false;
+            final Class<?> ioUring = 
Class.forName("io.netty.channel.uring.IoUring");
+            final Method isAvailable = 
ioUring.getDeclaredMethod("isAvailable", (Class<?>[])null);
+            final Class<?> eventLoopGroup = 
Class.forName("io.netty.channel.MultiThreadIoEventLoopGroup");
+            final Class<?> ioUringHandler = 
Class.forName("io.netty.channel.uring.IoUringIoHandler");
+            final Class<?> ioUringHandlerFactory = 
Class.forName("io.netty.channel.IoHandlerFactory");
+
+            constructor = eventLoopGroup.getDeclaredConstructor(int.class, 
ThreadFactory.class, ioUringHandlerFactory);
+            ioHandlerFactory = ioUringHandler.getDeclaredMethod("newFactory");
+            socketChannelClass = (Class<? extends Channel>) 
Class.forName("io.netty.channel.uring.IoUringSocketChannel");
+            available = (boolean) isAvailable.invoke(null);
+        } catch (Exception e) {
+            LOG.debug("Unable to enable netty io_uring support due to error", 
e);
         }
+
+        if (!available) {
+            try {
+                final Class<?> ioUring = 
Class.forName("io.netty.incubator.channel.uring.IOUring");
+                final Method isAvailable = 
ioUring.getDeclaredMethod("isAvailable");
+                final Class<?> eventLoopGroup = 
Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup");
+
+                socketChannelClass = (Class<? extends Channel>) 
Class.forName("io.netty.incubator.channel.uring.IOUringSocketChannel");
+                constructor = eventLoopGroup.getDeclaredConstructor(int.class, 
ThreadFactory.class);
+                available = (boolean) isAvailable.invoke(null);
+                incubator = true;
+            } catch (Exception e) {
+                LOG.debug("Unable to enable netty incubator io_uring support 
due to error", e);
+            }
+        }
+
+        AVAILABLE = available;
+        INCUBATOR_VARIANT = incubator;
+        SOCKET_CHANNEL_CLASS = socketChannelClass;
+        EVENTLOOP_CONSTRUCTOR = constructor;
+        IOHANDLER_FACTORY = ioHandlerFactory;
+    }
+
+    public static boolean isAvailable(TransportOptions transportOptions) {
+        return transportOptions.allowNativeIO() && AVAILABLE;
+    }
+
+    public static boolean isAvailable() {
+        return AVAILABLE;
     }
 
     public static EventLoopGroup createGroup(int nThreads, ThreadFactory 
ioThreadFactory) {
-        return new IOUringEventLoopGroup(nThreads, ioThreadFactory);
+        ensureAvailability();
+
+        Exception createError = null;
+
+        if (INCUBATOR_VARIANT) {
+            try {
+                return (EventLoopGroup) 
EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory);
+            } catch (Exception e) {
+                LOG.debug("Unable to create Netty incubator io_uring 
EventLoopGroup due to error", e);
+                createError = e;
+            }
+        } else {
+            try {
+                return (EventLoopGroup) 
EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory, 
IOHANDLER_FACTORY.invoke(null));
+            } catch (Exception e) {
+                LOG.debug("Unable to create Netty io_uring EventLoopGroup due 
to error", e);
+                createError = e;
+            }
+        }
+
+        throw (Error) new UnsupportedOperationException("Netty io_uring failed 
to create resource").initCause(createError);
     }
 
     public static Class<? extends Channel> getChannelClass() {
-        return IOUringSocketChannel.class;
+        ensureAvailability();
+
+        return SOCKET_CHANNEL_CLASS;
+    }
+
+    public static void ensureAvailability() {
+        if (!AVAILABLE) {
+            throw new UnsupportedOperationException(
+                "Netty io_ring support is not enabled because the Netty 
library indicates it is not present or disabled");
+        }
     }
 }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
index 287da94d..4c8ef723 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java
@@ -35,8 +35,12 @@ public final class KQueueSupport {
     public static final String NAME = "KQUEUE";
 
     public static boolean isAvailable(TransportOptions transportOptions) {
+        return transportOptions.allowNativeIO() && isAvailable();
+    }
+
+    public static boolean isAvailable() {
         try {
-            return transportOptions.allowNativeIO() && KQueue.isAvailable();
+            return KQueue.isAvailable();
         } catch (NoClassDefFoundError ncdfe) {
             LOG.debug("Unable to check for KQueue support due to missing class 
definition", ncdfe);
             return false;
@@ -44,10 +48,21 @@ public final class KQueueSupport {
     }
 
     public static EventLoopGroup createGroup(int nThreads, ThreadFactory 
ioThreadFactory) {
+        ensureAvailability();
+
         return new KQueueEventLoopGroup(nThreads, ioThreadFactory);
     }
 
     public static Class<? extends Channel> getChannelClass() {
+        ensureAvailability();
+
         return KQueueSocketChannel.class;
     }
+
+    public static void ensureAvailability() {
+        if (!isAvailable()) {
+            throw new UnsupportedOperationException(
+                "Netty KQueue support is not enabled because the Netty library 
indicates it is not present or disabled");
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
index 8acea840..105479a7 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java
@@ -57,14 +57,10 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.kqueue.KQueue;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
-import io.netty.incubator.channel.uring.IOUring;
-import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
+import io.netty.util.concurrent.EventExecutor;
 
 /**
  * Test basic functionality of the Netty based TCP transport.
@@ -862,7 +858,7 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
     }
 
     private void doTestEpollSupport(boolean useEpoll) throws Exception {
-        assumeTrue(Epoll.isAvailable());
+        assumeTrue(EpollSupport.isAvailable());
 
         try (NettyEchoServer server = createEchoServer()) {
             server.start();
@@ -907,7 +903,7 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
     }
 
     private void doTestIORingSupport(boolean useIOUring) throws Exception {
-        assumeTrue(IOUring.isAvailable());
+        assumeTrue(IOUringSupport.isAvailable());
 
         try (NettyEchoServer server = createEchoServer()) {
             server.start();
@@ -941,62 +937,6 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
         assertTrue(data.isEmpty());
     }
 
-    private void assertEpoll(String message, boolean expected, Transport 
transport) throws Exception {
-        Field bootstrap = null;
-        Class<?> transportType = transport.getClass();
-
-        while (transportType != null && bootstrap == null) {
-            try {
-                bootstrap = transportType.getDeclaredField("bootstrap");
-            } catch (NoSuchFieldException error) {
-                transportType = transportType.getSuperclass();
-                if (Object.class.equals(transportType)) {
-                    transportType = null;
-                }
-            }
-        }
-
-        assertNotNull(bootstrap, "Transport implementation unknown");
-
-        bootstrap.setAccessible(true);
-
-        Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport);
-
-        if (expected) {
-            assertTrue(transportBootstrap.config().group() instanceof 
EpollEventLoopGroup, message);
-        } else {
-            assertFalse(transportBootstrap.config().group() instanceof 
EpollEventLoopGroup, message);
-        }
-    }
-
-    private void assertIOUring(String message, boolean expected, Transport 
transport) throws Exception {
-        Field bootstrap = null;
-        Class<?> transportType = transport.getClass();
-
-        while (transportType != null && bootstrap == null) {
-            try {
-                bootstrap = transportType.getDeclaredField("bootstrap");
-            } catch (NoSuchFieldException error) {
-                transportType = transportType.getSuperclass();
-                if (Object.class.equals(transportType)) {
-                    transportType = null;
-                }
-            }
-        }
-
-        assertNotNull(bootstrap, "Transport implementation unknown");
-
-        bootstrap.setAccessible(true);
-
-        Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport);
-
-        if (expected) {
-            assertTrue(transportBootstrap.config().group() instanceof 
IOUringEventLoopGroup, message);
-        } else {
-            assertFalse(transportBootstrap.config().group() instanceof 
IOUringEventLoopGroup, message);
-        }
-    }
-
     @Test
     public void testConnectToServerWithKQueueEnabled() throws Exception {
         doTestKQueueSupport(true);
@@ -1008,7 +948,7 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
     }
 
     private void doTestKQueueSupport(boolean useKQueue) throws Exception {
-        assumeTrue(KQueue.isAvailable());
+        assumeTrue(KQueueSupport.isAvailable());
 
         try (NettyEchoServer server = createEchoServer()) {
             server.start();
@@ -1043,14 +983,21 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
 
     @Test
     public void testFallbackToNioWhenNativeIOConfiguredNotSupportedEpoll() 
throws Exception {
-        assumeFalse(Epoll.isAvailable());
+        assumeFalse(EpollSupport.isAvailable());
 
         doTestFallbackToNioWhenNativeLayerNotSupported(EpollSupport.NAME);
     }
 
+    @Test
+    public void testFallbackToNioWhenNativeIOConfiguredNotSupportedIOUring() 
throws Exception {
+        assumeFalse(IOUringSupport.isAvailable());
+
+        doTestFallbackToNioWhenNativeLayerNotSupported(IOUringSupport.NAME);
+    }
+
     @Test
     public void testFallbackToNioWhenNativeIOConfiguredNotSupportedKQueue() 
throws Exception {
-        assumeFalse(KQueue.isAvailable());
+        assumeFalse(KQueueSupport.isAvailable());
 
         doTestFallbackToNioWhenNativeLayerNotSupported(KQueueSupport.NAME);
     }
@@ -1088,13 +1035,47 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
         assertTrue(data.isEmpty());
     }
 
+    private void assertEpoll(String message, boolean expected, Transport 
transport) throws Exception {
+        final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+        if (expected) {
+            assertTrue(IOSubsystem.EPOLL.equals(ioHandler));
+        } else {
+            assertFalse(IOSubsystem.EPOLL.equals(ioHandler));
+        }
+    }
+
+    private void assertIOUring(String message, boolean expected, Transport 
transport) throws Exception {
+        final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+        if (expected) {
+            assertTrue(IOSubsystem.IO_URING.equals(ioHandler));
+        } else {
+            assertFalse(IOSubsystem.IO_URING.equals(ioHandler));
+        }
+    }
+
     private void assertKQueue(String message, boolean expected, Transport 
transport) throws Exception {
-        Field group = null;
+        final IOSubsystem ioHandler = extractIOSubsysten(transport);
+
+        if (expected) {
+            assertTrue(IOSubsystem.KQUEUE.equals(ioHandler));
+        } else {
+            assertFalse(IOSubsystem.KQUEUE.equals(ioHandler));
+        }
+    }
+
+    private enum IOSubsystem {
+        NIO, EPOLL, KQUEUE, IO_URING;
+    }
+
+    private IOSubsystem extractIOSubsysten(Transport transport) throws 
Exception {
+        Field bootstrap = null;
         Class<?> transportType = transport.getClass();
 
-        while (transportType != null && group == null) {
+        while (transportType != null && bootstrap == null) {
             try {
-                group = transportType.getDeclaredField("group");
+                bootstrap = transportType.getDeclaredField("bootstrap");
             } catch (NoSuchFieldException error) {
                 transportType = transportType.getSuperclass();
                 if (Object.class.equals(transportType)) {
@@ -1103,13 +1084,65 @@ public class TcpTransportTest extends 
ImperativeClientTestCase {
             }
         }
 
-        assertNotNull(group, "Transport implementation unknown");
+        assertNotNull(bootstrap, "Transport implementation unknown");
+
+        bootstrap.setAccessible(true);
+
+        final Bootstrap transportBootstrap = (Bootstrap) 
bootstrap.get(transport);
+        final EventLoopGroup group = transportBootstrap.config().group();
+        final String eventLoopGroupName = group.getClass().getSimpleName();
+
+        // Prior to Netty 4.2 the EventLoopGroup implementation indicates the 
IO layer in use
+        if (eventLoopGroupName.startsWith("Nio")) {
+            return IOSubsystem.NIO;
+        } else if (eventLoopGroupName.startsWith("Epoll")) {
+            return IOSubsystem.EPOLL;
+        } else if (eventLoopGroupName.startsWith("IOUring")) {
+            return IOSubsystem.IO_URING;
+        } else if (eventLoopGroupName.startsWith("KQueue")) {
+            return IOSubsystem.KQUEUE;
+        }
+
+        // Post Netty 4.2 we need to find the IOHandler that drives the 
EventLoopGroup
+        Field children = null;
+        Class<?> groupType = group.getClass();
 
-        group.setAccessible(true);
-        if (expected) {
-            assertTrue(group.get(transport) instanceof KQueueEventLoopGroup, 
message);
+        while (groupType != null && children == null) {
+            try {
+                children = groupType.getDeclaredField("children");
+            } catch (NoSuchFieldException error) {
+                groupType = groupType.getSuperclass();
+                if (Object.class.equals(groupType)) {
+                    groupType = null;
+                }
+            }
+        }
+
+        children.setAccessible(true);
+
+        final EventExecutor[] executors = (EventExecutor[]) 
children.get(group);
+        final EventExecutor executor = executors[0];
+        final Field ioHandlerField = 
executor.getClass().getDeclaredField("ioHandler");
+
+        if (ioHandlerField != null) {
+            ioHandlerField.setAccessible(true);
+
+            final String ioHandlerName = ioHandlerField.get(executor) != null ?
+                ioHandlerField.get(executor) .getClass().getSimpleName() : 
"Nio";
+
+            if (ioHandlerName.startsWith("Nio")) {
+                return IOSubsystem.NIO;
+            } else if (ioHandlerName.startsWith("Epoll")) {
+                return IOSubsystem.EPOLL;
+            } else if (ioHandlerName.startsWith("IoUring")) {
+                return IOSubsystem.IO_URING;
+            } else if (ioHandlerName.startsWith("KQueue")) {
+                return IOSubsystem.KQUEUE;
+            } else {
+                return IOSubsystem.NIO;
+            }
         } else {
-            assertFalse(group.get(transport) instanceof KQueueEventLoopGroup, 
message);
+            return IOSubsystem.NIO; // Safe default since we found nothing we 
recognize
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to