This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 01be8de9 PROTON-2899 Add support for both netty 4.1 and 4.2 01be8de9 is described below commit 01be8de9507613a40d305284d350c411b3e6ff58 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Thu Jul 17 13:39:04 2025 -0400 PROTON-2899 Add support for both netty 4.1 and 4.2 Allow the client to work with netty 4.2 and later while still defaulting to testing and supporting netty 4.1 releases --- .../client/transport/netty4/EpollSupport.java | 17 +- .../client/transport/netty4/IOUringSupport.java | 99 +++++++++-- .../client/transport/netty4/KQueueSupport.java | 17 +- .../client/transport/netty4/TcpTransportTest.java | 183 ++++++++++++--------- 4 files changed, 229 insertions(+), 87 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java index 7f815c0b..c9408652 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/EpollSupport.java @@ -35,8 +35,12 @@ public final class EpollSupport { public static final String NAME = "EPOLL"; public static boolean isAvailable(TransportOptions transportOptions) { + return transportOptions.allowNativeIO() && isAvailable(); + } + + public static boolean isAvailable() { try { - return transportOptions.allowNativeIO() && Epoll.isAvailable(); + return Epoll.isAvailable(); } catch (NoClassDefFoundError ncdfe) { LOG.debug("Unable to check for Epoll support due to missing class definition", ncdfe); return false; @@ -44,10 +48,21 @@ public final class EpollSupport { } public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) { + ensureAvailability(); + return new EpollEventLoopGroup(nThreads, ioThreadFactory); } public static Class<? extends Channel> getChannelClass() { + ensureAvailability(); + return EpollSocketChannel.class; } + + public static void ensureAvailability() { + if (!isAvailable()) { + throw new UnsupportedOperationException( + "Netty Epoll support is not enabled because the Netty library indicates it is not present or disabled"); + } + } } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java index bc33dc85..9e1d806b 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/IOUringSupport.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.protonj2.client.transport.netty4; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; import java.util.concurrent.ThreadFactory; import org.apache.qpid.protonj2.client.TransportOptions; @@ -24,30 +26,107 @@ import org.slf4j.LoggerFactory; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; -import io.netty.incubator.channel.uring.IOUring; -import io.netty.incubator.channel.uring.IOUringEventLoopGroup; -import io.netty.incubator.channel.uring.IOUringSocketChannel; +@SuppressWarnings("unchecked") public final class IOUringSupport { private static final Logger LOG = LoggerFactory.getLogger(IOUringSupport.class); public static final String NAME = "IO_URING"; - public static boolean isAvailable(TransportOptions transportOptions) { + private static final boolean AVAILABLE; + private static final boolean INCUBATOR_VARIANT; + private static final Class<? extends Channel> SOCKET_CHANNEL_CLASS; + private static final Constructor<?> EVENTLOOP_CONSTRUCTOR; + private static final Method IOHANDLER_FACTORY; + + static { + boolean available = false; + boolean incubator = false; + Class<? extends Channel> socketChannelClass = null; + Constructor<?> constructor = null; + Method ioHandlerFactory = null; + + // Try for new Netty built in IoUring before falling back to incubator checks try { - return transportOptions.allowNativeIO() && IOUring.isAvailable(); - } catch (NoClassDefFoundError ncdfe) { - LOG.debug("Unable to check for IO_Uring support due to missing class definition", ncdfe); - return false; + final Class<?> ioUring = Class.forName("io.netty.channel.uring.IoUring"); + final Method isAvailable = ioUring.getDeclaredMethod("isAvailable", (Class<?>[])null); + final Class<?> eventLoopGroup = Class.forName("io.netty.channel.MultiThreadIoEventLoopGroup"); + final Class<?> ioUringHandler = Class.forName("io.netty.channel.uring.IoUringIoHandler"); + final Class<?> ioUringHandlerFactory = Class.forName("io.netty.channel.IoHandlerFactory"); + + constructor = eventLoopGroup.getDeclaredConstructor(int.class, ThreadFactory.class, ioUringHandlerFactory); + ioHandlerFactory = ioUringHandler.getDeclaredMethod("newFactory"); + socketChannelClass = (Class<? extends Channel>) Class.forName("io.netty.channel.uring.IoUringSocketChannel"); + available = (boolean) isAvailable.invoke(null); + } catch (Exception e) { + LOG.debug("Unable to enable netty io_uring support due to error", e); } + + if (!available) { + try { + final Class<?> ioUring = Class.forName("io.netty.incubator.channel.uring.IOUring"); + final Method isAvailable = ioUring.getDeclaredMethod("isAvailable"); + final Class<?> eventLoopGroup = Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup"); + + socketChannelClass = (Class<? extends Channel>) Class.forName("io.netty.incubator.channel.uring.IOUringSocketChannel"); + constructor = eventLoopGroup.getDeclaredConstructor(int.class, ThreadFactory.class); + available = (boolean) isAvailable.invoke(null); + incubator = true; + } catch (Exception e) { + LOG.debug("Unable to enable netty incubator io_uring support due to error", e); + } + } + + AVAILABLE = available; + INCUBATOR_VARIANT = incubator; + SOCKET_CHANNEL_CLASS = socketChannelClass; + EVENTLOOP_CONSTRUCTOR = constructor; + IOHANDLER_FACTORY = ioHandlerFactory; + } + + public static boolean isAvailable(TransportOptions transportOptions) { + return transportOptions.allowNativeIO() && AVAILABLE; + } + + public static boolean isAvailable() { + return AVAILABLE; } public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) { - return new IOUringEventLoopGroup(nThreads, ioThreadFactory); + ensureAvailability(); + + Exception createError = null; + + if (INCUBATOR_VARIANT) { + try { + return (EventLoopGroup) EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory); + } catch (Exception e) { + LOG.debug("Unable to create Netty incubator io_uring EventLoopGroup due to error", e); + createError = e; + } + } else { + try { + return (EventLoopGroup) EVENTLOOP_CONSTRUCTOR.newInstance(nThreads, ioThreadFactory, IOHANDLER_FACTORY.invoke(null)); + } catch (Exception e) { + LOG.debug("Unable to create Netty io_uring EventLoopGroup due to error", e); + createError = e; + } + } + + throw (Error) new UnsupportedOperationException("Netty io_uring failed to create resource").initCause(createError); } public static Class<? extends Channel> getChannelClass() { - return IOUringSocketChannel.class; + ensureAvailability(); + + return SOCKET_CHANNEL_CLASS; + } + + public static void ensureAvailability() { + if (!AVAILABLE) { + throw new UnsupportedOperationException( + "Netty io_ring support is not enabled because the Netty library indicates it is not present or disabled"); + } } } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java index 287da94d..4c8ef723 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty4/KQueueSupport.java @@ -35,8 +35,12 @@ public final class KQueueSupport { public static final String NAME = "KQUEUE"; public static boolean isAvailable(TransportOptions transportOptions) { + return transportOptions.allowNativeIO() && isAvailable(); + } + + public static boolean isAvailable() { try { - return transportOptions.allowNativeIO() && KQueue.isAvailable(); + return KQueue.isAvailable(); } catch (NoClassDefFoundError ncdfe) { LOG.debug("Unable to check for KQueue support due to missing class definition", ncdfe); return false; @@ -44,10 +48,21 @@ public final class KQueueSupport { } public static EventLoopGroup createGroup(int nThreads, ThreadFactory ioThreadFactory) { + ensureAvailability(); + return new KQueueEventLoopGroup(nThreads, ioThreadFactory); } public static Class<? extends Channel> getChannelClass() { + ensureAvailability(); + return KQueueSocketChannel.class; } + + public static void ensureAvailability() { + if (!isAvailable()) { + throw new UnsupportedOperationException( + "Netty KQueue support is not enabled because the Netty library indicates it is not present or disabled"); + } + } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java index 8acea840..105479a7 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/transport/netty4/TcpTransportTest.java @@ -57,14 +57,10 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.incubator.channel.uring.IOUring; -import io.netty.incubator.channel.uring.IOUringEventLoopGroup; +import io.netty.channel.EventLoopGroup; import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetector.Level; +import io.netty.util.concurrent.EventExecutor; /** * Test basic functionality of the Netty based TCP transport. @@ -862,7 +858,7 @@ public class TcpTransportTest extends ImperativeClientTestCase { } private void doTestEpollSupport(boolean useEpoll) throws Exception { - assumeTrue(Epoll.isAvailable()); + assumeTrue(EpollSupport.isAvailable()); try (NettyEchoServer server = createEchoServer()) { server.start(); @@ -907,7 +903,7 @@ public class TcpTransportTest extends ImperativeClientTestCase { } private void doTestIORingSupport(boolean useIOUring) throws Exception { - assumeTrue(IOUring.isAvailable()); + assumeTrue(IOUringSupport.isAvailable()); try (NettyEchoServer server = createEchoServer()) { server.start(); @@ -941,62 +937,6 @@ public class TcpTransportTest extends ImperativeClientTestCase { assertTrue(data.isEmpty()); } - private void assertEpoll(String message, boolean expected, Transport transport) throws Exception { - Field bootstrap = null; - Class<?> transportType = transport.getClass(); - - while (transportType != null && bootstrap == null) { - try { - bootstrap = transportType.getDeclaredField("bootstrap"); - } catch (NoSuchFieldException error) { - transportType = transportType.getSuperclass(); - if (Object.class.equals(transportType)) { - transportType = null; - } - } - } - - assertNotNull(bootstrap, "Transport implementation unknown"); - - bootstrap.setAccessible(true); - - Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport); - - if (expected) { - assertTrue(transportBootstrap.config().group() instanceof EpollEventLoopGroup, message); - } else { - assertFalse(transportBootstrap.config().group() instanceof EpollEventLoopGroup, message); - } - } - - private void assertIOUring(String message, boolean expected, Transport transport) throws Exception { - Field bootstrap = null; - Class<?> transportType = transport.getClass(); - - while (transportType != null && bootstrap == null) { - try { - bootstrap = transportType.getDeclaredField("bootstrap"); - } catch (NoSuchFieldException error) { - transportType = transportType.getSuperclass(); - if (Object.class.equals(transportType)) { - transportType = null; - } - } - } - - assertNotNull(bootstrap, "Transport implementation unknown"); - - bootstrap.setAccessible(true); - - Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport); - - if (expected) { - assertTrue(transportBootstrap.config().group() instanceof IOUringEventLoopGroup, message); - } else { - assertFalse(transportBootstrap.config().group() instanceof IOUringEventLoopGroup, message); - } - } - @Test public void testConnectToServerWithKQueueEnabled() throws Exception { doTestKQueueSupport(true); @@ -1008,7 +948,7 @@ public class TcpTransportTest extends ImperativeClientTestCase { } private void doTestKQueueSupport(boolean useKQueue) throws Exception { - assumeTrue(KQueue.isAvailable()); + assumeTrue(KQueueSupport.isAvailable()); try (NettyEchoServer server = createEchoServer()) { server.start(); @@ -1043,14 +983,21 @@ public class TcpTransportTest extends ImperativeClientTestCase { @Test public void testFallbackToNioWhenNativeIOConfiguredNotSupportedEpoll() throws Exception { - assumeFalse(Epoll.isAvailable()); + assumeFalse(EpollSupport.isAvailable()); doTestFallbackToNioWhenNativeLayerNotSupported(EpollSupport.NAME); } + @Test + public void testFallbackToNioWhenNativeIOConfiguredNotSupportedIOUring() throws Exception { + assumeFalse(IOUringSupport.isAvailable()); + + doTestFallbackToNioWhenNativeLayerNotSupported(IOUringSupport.NAME); + } + @Test public void testFallbackToNioWhenNativeIOConfiguredNotSupportedKQueue() throws Exception { - assumeFalse(KQueue.isAvailable()); + assumeFalse(KQueueSupport.isAvailable()); doTestFallbackToNioWhenNativeLayerNotSupported(KQueueSupport.NAME); } @@ -1088,13 +1035,47 @@ public class TcpTransportTest extends ImperativeClientTestCase { assertTrue(data.isEmpty()); } + private void assertEpoll(String message, boolean expected, Transport transport) throws Exception { + final IOSubsystem ioHandler = extractIOSubsysten(transport); + + if (expected) { + assertTrue(IOSubsystem.EPOLL.equals(ioHandler)); + } else { + assertFalse(IOSubsystem.EPOLL.equals(ioHandler)); + } + } + + private void assertIOUring(String message, boolean expected, Transport transport) throws Exception { + final IOSubsystem ioHandler = extractIOSubsysten(transport); + + if (expected) { + assertTrue(IOSubsystem.IO_URING.equals(ioHandler)); + } else { + assertFalse(IOSubsystem.IO_URING.equals(ioHandler)); + } + } + private void assertKQueue(String message, boolean expected, Transport transport) throws Exception { - Field group = null; + final IOSubsystem ioHandler = extractIOSubsysten(transport); + + if (expected) { + assertTrue(IOSubsystem.KQUEUE.equals(ioHandler)); + } else { + assertFalse(IOSubsystem.KQUEUE.equals(ioHandler)); + } + } + + private enum IOSubsystem { + NIO, EPOLL, KQUEUE, IO_URING; + } + + private IOSubsystem extractIOSubsysten(Transport transport) throws Exception { + Field bootstrap = null; Class<?> transportType = transport.getClass(); - while (transportType != null && group == null) { + while (transportType != null && bootstrap == null) { try { - group = transportType.getDeclaredField("group"); + bootstrap = transportType.getDeclaredField("bootstrap"); } catch (NoSuchFieldException error) { transportType = transportType.getSuperclass(); if (Object.class.equals(transportType)) { @@ -1103,13 +1084,65 @@ public class TcpTransportTest extends ImperativeClientTestCase { } } - assertNotNull(group, "Transport implementation unknown"); + assertNotNull(bootstrap, "Transport implementation unknown"); + + bootstrap.setAccessible(true); + + final Bootstrap transportBootstrap = (Bootstrap) bootstrap.get(transport); + final EventLoopGroup group = transportBootstrap.config().group(); + final String eventLoopGroupName = group.getClass().getSimpleName(); + + // Prior to Netty 4.2 the EventLoopGroup implementation indicates the IO layer in use + if (eventLoopGroupName.startsWith("Nio")) { + return IOSubsystem.NIO; + } else if (eventLoopGroupName.startsWith("Epoll")) { + return IOSubsystem.EPOLL; + } else if (eventLoopGroupName.startsWith("IOUring")) { + return IOSubsystem.IO_URING; + } else if (eventLoopGroupName.startsWith("KQueue")) { + return IOSubsystem.KQUEUE; + } + + // Post Netty 4.2 we need to find the IOHandler that drives the EventLoopGroup + Field children = null; + Class<?> groupType = group.getClass(); - group.setAccessible(true); - if (expected) { - assertTrue(group.get(transport) instanceof KQueueEventLoopGroup, message); + while (groupType != null && children == null) { + try { + children = groupType.getDeclaredField("children"); + } catch (NoSuchFieldException error) { + groupType = groupType.getSuperclass(); + if (Object.class.equals(groupType)) { + groupType = null; + } + } + } + + children.setAccessible(true); + + final EventExecutor[] executors = (EventExecutor[]) children.get(group); + final EventExecutor executor = executors[0]; + final Field ioHandlerField = executor.getClass().getDeclaredField("ioHandler"); + + if (ioHandlerField != null) { + ioHandlerField.setAccessible(true); + + final String ioHandlerName = ioHandlerField.get(executor) != null ? + ioHandlerField.get(executor) .getClass().getSimpleName() : "Nio"; + + if (ioHandlerName.startsWith("Nio")) { + return IOSubsystem.NIO; + } else if (ioHandlerName.startsWith("Epoll")) { + return IOSubsystem.EPOLL; + } else if (ioHandlerName.startsWith("IoUring")) { + return IOSubsystem.IO_URING; + } else if (ioHandlerName.startsWith("KQueue")) { + return IOSubsystem.KQUEUE; + } else { + return IOSubsystem.NIO; + } } else { - assertFalse(group.get(transport) instanceof KQueueEventLoopGroup, message); + return IOSubsystem.NIO; // Safe default since we found nothing we recognize } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org