[
https://issues.apache.org/jira/browse/QPIDJMS-553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502376#comment-17502376
]
ASF GitHub Bot commented on QPIDJMS-553:
----------------------------------------
gemmellr commented on a change in pull request #45:
URL: https://github.com/apache/qpid-jms/pull/45#discussion_r820801073
##########
File path:
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -394,6 +403,88 @@ public void testZeroSizedSentNoErrors() throws Exception {
assertTrue(data.isEmpty());
}
+ @Test(timeout = 60 * 1000, expected = IllegalStateException.class)
Review comment:
Move checking the exception into the test to verify its actually thrown
from the intended point.
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+ private NettyEventLoopGroupFactory() {
+
+ }
+
+ public interface Ref<T> extends AutoCloseable {
+
+ T ref();
+
+ @Override
+ void close();
+ }
+
+ public interface EventLoopGroupRef extends Ref<EventLoopGroup> {
+
+ EventLoopType type();
+ }
Review comment:
Seems like these interfaces should just be combined. Also, as its in an
EventLoopGroupFactory, I think its safe and would be far more readable for the
'Ref.ref()' method returning a group to actually use the name 'group()'.
Not convinced it needs to be or is nicer with it being in the same file at
the impl.
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -214,8 +209,8 @@ public void operationComplete(ChannelFuture future) throws
Exception {
}
});
}
-
- return group;
+ // returning the channel's event loop: groupRef::ref is multi-threaded
Review comment:
Just call it the 'event loop groop', its both simpler by being
immediately understandable, and less likely to go stale as the field+method
names can and will change (e.g as suggested).
##########
File path:
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -394,6 +403,88 @@ public void testZeroSizedSentNoErrors() throws Exception {
assertTrue(data.isEmpty());
}
+ @Test(timeout = 60 * 1000, expected = IllegalStateException.class)
+ public void testCannotDeferenceSharedClosedEventLoopGroup() throws
Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions()))
{
+
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions sharedTransportOptions =
createClientOptions();
+ sharedTransportOptions.setUseKQueue(false);
+ sharedTransportOptions.setUseEpoll(false);
+ sharedTransportOptions.setSharedEventLoopThreads(1);
+ Transport nioTransport = createConnectedTransport(serverLocation,
sharedTransportOptions);
+ EventLoopGroupRef groupRef = getGroupRef(nioTransport);
+ assertNotNull(groupRef.ref());
+
+ nioTransport.close();
+
+ server.stop();
+
+ groupRef.ref();
+ fail();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the
event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSharedEventLoopGroups() throws Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions()))
{
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions sharedTransportOptions =
createClientOptions();
+ sharedTransportOptions.setUseKQueue(false);
+ sharedTransportOptions.setUseEpoll(false);
+ sharedTransportOptions.setSharedEventLoopThreads(1);
+ Transport nioTransport = createConnectedTransport(serverLocation,
sharedTransportOptions);
+ Transport sharedNioTransport =
createConnectedTransport(serverLocation, sharedTransportOptions);
Review comment:
Very confusingly named, suggests one is shared and one isnt, when
actually both are, as this and later bits of the test show.
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+ private NettyEventLoopGroupFactory() {
+
+ }
+
+ public interface Ref<T> extends AutoCloseable {
+
+ T ref();
+
+ @Override
+ void close();
+ }
+
+ public interface EventLoopGroupRef extends Ref<EventLoopGroup> {
+
+ EventLoopType type();
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+ private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE =
new AtomicLong(0);
+ private static final int SHUTDOWN_TIMEOUT = 50;
+
+ public enum EventLoopType {
+ EPOLL, KQUEUE, NIO;
+
+ static EventLoopType valueOf(final TransportOptions transportOptions) {
+ final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+ final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+ if (useKQueue) {
+ return KQUEUE;
+ }
+ if (useEpoll) {
+ return EPOLL;
+ }
+ return NIO;
+ }
+ }
+
+ private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final
EventLoopType type, final int threads) {
+ return new QpidJMSThreadFactory("SharedNettyEventLoopGroup " + type +
":( threads = " + threads + " - id = " +
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")", true);
+ }
+
+ private static EventLoopGroup createSharedEventLoopGroup(final
TransportOptions transportOptions) {
+ final EventLoopType eventLoopType =
EventLoopType.valueOf(transportOptions);
+ return
createEventLoopGroup(transportOptions.getSharedEventLoopThreads(),
eventLoopType, createSharedQpidJMSThreadFactory(eventLoopType,
transportOptions.getSharedEventLoopThreads()));
+ }
+
+ private static EventLoopGroup createEventLoopGroup(final int threads,
+ final EventLoopType type,
+ final ThreadFactory
ioThreadFactory) {
+ switch (Objects.requireNonNull(type)) {
+
+ case EPOLL:
+ LOG.trace("Netty Transport using Epoll mode");
+ return EpollSupport.createGroup(threads, ioThreadFactory);
+ case KQUEUE:
+ LOG.trace("Netty Transport using KQueue mode");
+ return KQueueSupport.createGroup(threads, ioThreadFactory);
+ case NIO:
+ LOG.trace("Netty Transport using Nio mode");
+ return new NioEventLoopGroup(threads, ioThreadFactory);
+ default:
+ throw new AssertionError("unexpected type: " + type);
+ }
+ }
+
+ private static EventLoopGroupRef unsharedGroupWith(final TransportOptions
transportOptions,
+ final ThreadFactory
threadFactory) {
+ assert transportOptions.getSharedEventLoopThreads() <= 0;
+ final EventLoopType type = EventLoopType.valueOf(transportOptions);
+ final EventLoopGroup ref = createEventLoopGroup(1, type, threadFactory);
+
+ return new EventLoopGroupRef() {
+ @Override
+ public EventLoopType type() {
+ return type;
+ }
+
+ @Override
+ public EventLoopGroup ref() {
+ return ref;
+ }
+
+ @Override
+ public void close() {
+ Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in
allotted time");
+ }
+ }
+ };
+ }
+
+ private static final class AtomicCloseableEventLoopGroupRef<T> implements
EventLoopGroupRef {
+
+ private final EventLoopGroupRef ref;
+ private final AtomicBoolean closed;
+
+ public AtomicCloseableEventLoopGroupRef(final EventLoopGroupRef ref) {
+ this.ref = ref;
+ this.closed = new AtomicBoolean();
+ }
+
+ @Override
+ public EventLoopGroup ref() {
+ return ref.ref();
+ }
+
+ @Override
+ public EventLoopType type() {
+ return ref.type();
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ ref.close();
+ }
+ }
+ }
+
+ static Optional<EventLoopGroupRef> sharedExistingGroupWith(final
TransportOptions transportOptions) {
Review comment:
I see no need for the factory to be returning optionals, or this method
to really exist at all. For the test just close the last existing shared group
and create a new one.
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -233,15 +228,13 @@ public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
try {
- if (channel != null) {
+ if (channel != null && groupRef != null) {
Review comment:
Why does it matter if groupRef isnt null here? Its not used in this bit?
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+ private NettyEventLoopGroupFactory() {
+
+ }
+
+ public interface Ref<T> extends AutoCloseable {
+
+ T ref();
+
+ @Override
+ void close();
+ }
+
+ public interface EventLoopGroupRef extends Ref<EventLoopGroup> {
+
+ EventLoopType type();
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+ private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE =
new AtomicLong(0);
+ private static final int SHUTDOWN_TIMEOUT = 50;
+
+ public enum EventLoopType {
+ EPOLL, KQUEUE, NIO;
+
+ static EventLoopType valueOf(final TransportOptions transportOptions) {
+ final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+ final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+ if (useKQueue) {
+ return KQUEUE;
+ }
+ if (useEpoll) {
+ return EPOLL;
+ }
+ return NIO;
+ }
+ }
+
+ private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final
EventLoopType type, final int threads) {
+ return new QpidJMSThreadFactory("SharedNettyEventLoopGroup " + type +
":( threads = " + threads + " - id = " +
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")", true);
+ }
+
+ private static EventLoopGroup createSharedEventLoopGroup(final
TransportOptions transportOptions) {
+ final EventLoopType eventLoopType =
EventLoopType.valueOf(transportOptions);
+ return
createEventLoopGroup(transportOptions.getSharedEventLoopThreads(),
eventLoopType, createSharedQpidJMSThreadFactory(eventLoopType,
transportOptions.getSharedEventLoopThreads()));
+ }
+
+ private static EventLoopGroup createEventLoopGroup(final int threads,
+ final EventLoopType type,
+ final ThreadFactory
ioThreadFactory) {
+ switch (Objects.requireNonNull(type)) {
+
+ case EPOLL:
+ LOG.trace("Netty Transport using Epoll mode");
+ return EpollSupport.createGroup(threads, ioThreadFactory);
+ case KQUEUE:
+ LOG.trace("Netty Transport using KQueue mode");
+ return KQueueSupport.createGroup(threads, ioThreadFactory);
+ case NIO:
+ LOG.trace("Netty Transport using Nio mode");
+ return new NioEventLoopGroup(threads, ioThreadFactory);
+ default:
+ throw new AssertionError("unexpected type: " + type);
+ }
+ }
+
+ private static EventLoopGroupRef unsharedGroupWith(final TransportOptions
transportOptions,
+ final ThreadFactory
threadFactory) {
+ assert transportOptions.getSharedEventLoopThreads() <= 0;
+ final EventLoopType type = EventLoopType.valueOf(transportOptions);
+ final EventLoopGroup ref = createEventLoopGroup(1, type, threadFactory);
+
+ return new EventLoopGroupRef() {
+ @Override
+ public EventLoopType type() {
+ return type;
+ }
+
+ @Override
+ public EventLoopGroup ref() {
+ return ref;
+ }
+
+ @Override
+ public void close() {
+ Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT,
TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in
allotted time");
+ }
+ }
+ };
+ }
+
+ private static final class AtomicCloseableEventLoopGroupRef<T> implements
EventLoopGroupRef {
+
+ private final EventLoopGroupRef ref;
+ private final AtomicBoolean closed;
+
+ public AtomicCloseableEventLoopGroupRef(final EventLoopGroupRef ref) {
+ this.ref = ref;
+ this.closed = new AtomicBoolean();
+ }
+
+ @Override
+ public EventLoopGroup ref() {
+ return ref.ref();
+ }
+
+ @Override
+ public EventLoopType type() {
+ return ref.type();
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ ref.close();
+ }
+ }
+ }
+
+ static Optional<EventLoopGroupRef> sharedExistingGroupWith(final
TransportOptions transportOptions) {
+ Objects.requireNonNull(transportOptions);
+ if (transportOptions.getSharedEventLoopThreads() <= 0) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(SharedGroupRef.of(transportOptions, false));
+ }
+
+ public static EventLoopGroupRef groupWith(final TransportOptions
transportOptions,
+ final ThreadFactory
threadfactory) {
+ Objects.requireNonNull(transportOptions);
+ if (transportOptions.getSharedEventLoopThreads() > 0) {
+ return SharedGroupRef.of(transportOptions, true);
+ }
+ return unsharedGroupWith(transportOptions, threadfactory);
+ }
+
+ private static class EventLoopGroupKey {
+
+ private final EventLoopType type;
+ private final int eventLoopThreads;
+
+ private EventLoopGroupKey(final EventLoopType type, final int
eventLoopThreads) {
+ if (eventLoopThreads <= 0) {
+ throw new IllegalArgumentException("eventLoopThreads must be > 0");
+ }
+ this.type = Objects.requireNonNull(type);
+ this.eventLoopThreads = eventLoopThreads;
+ }
+
+ public EventLoopType type() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ final EventLoopGroupKey that = (EventLoopGroupKey) o;
+
+ if (eventLoopThreads != that.eventLoopThreads)
+ return false;
+ return type == that.type;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + eventLoopThreads;
+ return result;
+ }
+
+ private static EventLoopGroupKey of(final TransportOptions
transportOptions) {
+ return new EventLoopGroupKey(EventLoopType.valueOf(transportOptions),
transportOptions.getSharedEventLoopThreads());
+ }
+ }
+
+ private static final class SharedGroupRef implements EventLoopGroupRef {
+
+ private static final Map<EventLoopGroupKey, SharedGroupRef>
SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+ private final EventLoopGroupKey key;
+ private final EventLoopGroup group;
+ private final AtomicInteger refCnt;
+
+ private SharedGroupRef(final EventLoopGroup group, final
EventLoopGroupKey key) {
+ this.group = Objects.requireNonNull(group);
+ this.key = Objects.requireNonNull(key);
+ refCnt = new AtomicInteger(1);
+ }
+
+ @Override
+ public EventLoopType type() {
+ return key.type();
+ }
+
+ public boolean retain() {
+ while (true) {
+ final int currValue = refCnt.get();
+ if (currValue == 0) {
+ return false;
+ }
+ if (refCnt.compareAndSet(currValue, currValue + 1)) {
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public EventLoopGroup ref() {
+ if (refCnt.get() == 0) {
+ throw new IllegalStateException("the event loop group cannot be
reused");
+ }
+ return group;
+ }
+
+ @Override
+ public void close() {
+ while (true) {
+ final int currValue = refCnt.get();
+ if (currValue == 0) {
+ return;
+ }
+ if (refCnt.compareAndSet(currValue, currValue - 1)) {
+ if (currValue == 1) {
+ synchronized (SHARED_EVENT_LOOP_GROUPS) {
+ // SharedGroupRef::of can race with this and replace it
+ SHARED_EVENT_LOOP_GROUPS.remove(key, this);
+ }
+ Future<?> fut = group.shutdownGracefully(0,
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in
allotted time");
+ }
+ }
+ return;
+ }
+ }
+ }
+
+ public static EventLoopGroupRef of(final TransportOptions
transportOptions, final boolean canCreate) {
Review comment:
As above, seems like we can get rid of the optionally 'doesnt create'
unnecessary complexity.
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+ private NettyEventLoopGroupFactory() {
+
+ }
+
+ public interface Ref<T> extends AutoCloseable {
+
+ T ref();
+
+ @Override
+ void close();
+ }
+
+ public interface EventLoopGroupRef extends Ref<EventLoopGroup> {
+
+ EventLoopType type();
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+ private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE =
new AtomicLong(0);
+ private static final int SHUTDOWN_TIMEOUT = 50;
+
+ public enum EventLoopType {
+ EPOLL, KQUEUE, NIO;
+
+ static EventLoopType valueOf(final TransportOptions transportOptions) {
+ final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+ final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+ if (useKQueue) {
+ return KQUEUE;
+ }
+ if (useEpoll) {
+ return EPOLL;
+ }
+ return NIO;
+ }
+ }
+
+ private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final
EventLoopType type, final int threads) {
+ return new QpidJMSThreadFactory("SharedNettyEventLoopGroup " + type +
":( threads = " + threads + " - id = " +
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")", true);
Review comment:
This is _still_ going to give every thread in the group the same name.
Having e.g 5 threads all called "SharedNettyEventLoopGroupEPOLL:( threads = 5 -
id = 1)" just seems silly.
##########
File path:
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -730,14 +821,30 @@ private void assertEpoll(String message, boolean
expected, Transport transport)
}
}
- assertNotNull("Transport implementation unknown", group);
+ assertNotNull("Transport implementation unknown", groupRefField);
- group.setAccessible(true);
- if (expected) {
- assertTrue(message, group.get(transport) instanceof
EpollEventLoopGroup);
- } else {
- assertFalse(message, group.get(transport) instanceof
EpollEventLoopGroup);
+ groupRefField.setAccessible(true);
+ return (EventLoopGroupRef) groupRefField.get(transport);
+ }
+
+ private static void assertEventLoopGroupType(String message, Transport
transport) throws Exception {
+ final EventLoopGroupRef groupRef = getGroupRef(transport);
+ Class<? extends EventLoopGroup> eventLoopGroupClass = null;
+ switch (groupRef.type()) {
+
+ case EPOLL:
+ eventLoopGroupClass = EpollEventLoopGroup.class;
+ break;
+ case KQUEUE:
+ eventLoopGroupClass = KQueueEventLoopGroup.class;
+ break;
+ case NIO:
+ eventLoopGroupClass = NioEventLoopGroup.class;
+ break;
+ default:
+ fail("Unsupported type: " + groupRef.type());
Review comment:
This seemingly entirely breaks the usefullness of the assertion.
Previously it asserted the group type matched expectation. Now it essentially
asserts the the 'getType' value of the 'Ref' aligns with the type of the
'ref()' group value, but does no verification that either of those are correct
according to expectations, i.e it would still pass with entirely the wrong
group type being returned, defeating the whole point of the original test.
If you want to combine the old methods into something general, pass in the
*expected* value(s) so it can do what it was doing, as well as also check the
Ref type and group values align.
##########
File path:
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -394,6 +403,88 @@ public void testZeroSizedSentNoErrors() throws Exception {
assertTrue(data.isEmpty());
}
+ @Test(timeout = 60 * 1000, expected = IllegalStateException.class)
+ public void testCannotDeferenceSharedClosedEventLoopGroup() throws
Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions()))
{
+
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final TransportOptions sharedTransportOptions =
createClientOptions();
+ sharedTransportOptions.setUseKQueue(false);
+ sharedTransportOptions.setUseEpoll(false);
+ sharedTransportOptions.setSharedEventLoopThreads(1);
+ Transport nioTransport = createConnectedTransport(serverLocation,
sharedTransportOptions);
+ EventLoopGroupRef groupRef = getGroupRef(nioTransport);
+ assertNotNull(groupRef.ref());
+
+ nioTransport.close();
+
+ server.stop();
+
+ groupRef.ref();
+ fail();
Review comment:
Also say why its should fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Shared Netty event loop group
> -----------------------------
>
> Key: QPIDJMS-553
> URL: https://issues.apache.org/jira/browse/QPIDJMS-553
> Project: Qpid JMS
> Issue Type: New Feature
> Reporter: Francesco Nigro
> Priority: Major
>
> One of the most interesting feature of Netty while using KQueue/NIO/Epoll in
> non-blocking mode is to be able to handle many connections with few threads;
> this is going to be critical and even more important with the upcoming
> IO_URING support, where the time spent on the Netty event loop to handle
> network syscalls will be further reduced, allowing syscall batching across
> different connections.
> Having the chance to handle many client connections with few Netty threads is
> already beneficial in constrained environments (containers with few cores) in
> order to reduce the native and heap memory usage.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]