gemmellr commented on a change in pull request #45: URL: https://github.com/apache/qpid-jms/pull/45#discussion_r818501543
########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPoolFactory.java ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.Objects; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NettyEventLoopGroupPoolFactory { + + private NettyEventLoopGroupPoolFactory() { + + } + + public interface Ref<T> extends AutoCloseable { + + T ref(); + + @Override + void close(); + } + + public interface EventLoopGroupPool { + + Ref<EventLoopGroup> sharedGroupWith(int threads); + + } + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + + private static EventLoopGroup createEventLoopGroup(final TransportOptions transportOptions, + final ThreadFactory ioThreadFactory) { + final int threads = transportOptions.getSharedEventLoopThreads() > 0 ? transportOptions.getSharedEventLoopThreads() : 1; + final boolean useKQueue = KQueueSupport.isAvailable(transportOptions); + final boolean useEpoll = EpollSupport.isAvailable(transportOptions); + if (useKQueue) { + return createEventLoopGroup(threads, NettyEventLoopGroupPool.KQUEUE, ioThreadFactory); + } + if (useEpoll) { + return createEventLoopGroup(threads, NettyEventLoopGroupPool.EPOLL, ioThreadFactory); + } + return createEventLoopGroup(threads, NettyEventLoopGroupPool.NIO, ioThreadFactory); + } + + private static EventLoopGroup createEventLoopGroup(final int threads, + final NettyEventLoopGroupPool poolType, + final ThreadFactory ioThreadFactory) { + switch (Objects.requireNonNull(poolType)) { + + case EPOLL: + LOG.trace("Netty Transport using Epoll mode"); + return EpollSupport.createGroup(threads, ioThreadFactory); + case KQUEUE: + LOG.trace("Netty Transport using KQueue mode"); + return KQueueSupport.createGroup(threads, ioThreadFactory); + case NIO: + LOG.trace("Netty Transport using Nio mode"); + return new NioEventLoopGroup(threads, ioThreadFactory); + default: + throw new AssertionError("unexpected poolType: " + poolType); + } + } + + public static Ref<EventLoopGroup> unsharedGroupWith(final TransportOptions transportOptions, + final ThreadFactory threadFactory) { + + final EventLoopGroup ref = createEventLoopGroup(transportOptions, threadFactory); + + return new Ref<>() { + @Override + public EventLoopGroup ref() { + return ref; + } + + @Override + public void close() { + Future<?> fut = ref.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + }; + } + + private static final class AtomicCloseableRef<T> implements Ref<T> { + + private final Ref<T> ref; + private final AtomicBoolean closed; + + public AtomicCloseableRef(final Ref<T> ref) { + this.ref = ref; + this.closed = new AtomicBoolean(); + } + + @Override + public T ref() { + return ref.ref(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + ref.close(); + } + } + } + + public static EventLoopGroupPool sharedGroupPoolWith(final TransportOptions transportOptions) { + if (transportOptions.getSharedEventLoopThreads() <= 0) { + throw new IllegalArgumentException("sharedEventLoopThreads must be > 0"); + } + final boolean useKQueue = KQueueSupport.isAvailable(transportOptions); + final boolean useEpoll = EpollSupport.isAvailable(transportOptions); + if (useKQueue) { + return NettyEventLoopGroupPool.KQUEUE; + } + if (useEpoll) { + return NettyEventLoopGroupPool.EPOLL; + } + return NettyEventLoopGroupPool.NIO; + } + + private enum NettyEventLoopGroupPool implements EventLoopGroupPool { + EPOLL, KQUEUE, NIO; + + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + private final class SharedGroupRef implements Ref<EventLoopGroup> { + + private final EventLoopGroup group; + private final AtomicInteger refCnt; + private final int threads; + + private SharedGroupRef(final EventLoopGroup group, final int threads) { + this.threads = threads; + this.group = group; + refCnt = new AtomicInteger(1); + } + + public int getThreads() { + if (refCnt.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return threads; + } + + public boolean retain() { + while (true) { + final int currValue = refCnt.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refCnt.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + @Override + public EventLoopGroup ref() { + if (refCnt.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + @Override + public void close() { + while (true) { + final int currValue = refCnt.get(); + if (currValue == 0) { + return; + } + if (refCnt.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); + + @Override + public Ref<EventLoopGroup> sharedGroupWith(final int threads) { + if (threads <= 0) { + throw new IllegalArgumentException("threads must be > 0"); + } + while (true) { + SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + validateSharedRef(sharedGroupRef, threads); + return new AtomicCloseableRef<>(sharedGroupRef); + } + synchronized (this) { + sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + validateSharedRef(sharedGroupRef, threads); + return new AtomicCloseableRef<>(sharedGroupRef); + } + sharedGroupRef = new SharedGroupRef(createEventLoopGroup(threads, this, new QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" + SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true)), threads); + sharedEventLoopGroup.set(sharedGroupRef); Review comment: Im not sure you fully read my comment. What I said was that this would seem to give the threads in the pool all the exact same name, which just seems silly. Giving threads their own name isnt unusual or complicated. Have the factory add a counter in this case or something, the same way its presumably done in all the existing thread pool implementations that tend to do something like that (e.g think 'pool-X-thread-Y' style naming). I clearly wouldnt want the factory to report _incorrect_ connection details in the thread name. I merely qualified that with the additional loss of being able to tell which connection the thread was operating for to illustrate the full extent of the change it would mean, going from having a very specific name based on the connection, to one that not only isnt connection-related, but actually has no separable name at all from the other threads in the pool. (Though it certainly would be possible to reinstate the name on a per-operation basis easily enough if desired, at least for the bits occurring outside Netty. Alternatively a bunch of logging will probably need to change to make it possible to discern the connection, otherwise it will likely be unsupportable). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
