gemmellr commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r736741170
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -215,6 +259,17 @@ public void close() throws JMSException {
connectionConsumer.shutdown();
}
+ final ThreadPoolExecutor completionExecutor =
this.completionExecutor;
+
Review comment:
No need for newline here, its all one unit.
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ do {
+ if (!completionThread.compareAndSet(null, Thread.currentThread()))
{
+ return;
+ }
+ try {
+ Runnable completionTask;
+ while ((completionTask = completionTasks.poll()) != null) {
+ try {
+ completionTask.run();
+ } catch (Throwable t) {
+ LOG.debug("errored on processCompletions duty cycle",
t);
+ }
+ }
+ } finally {
+ completionThread.set(null);
+ }
+ } while (!completionTasks.isEmpty());
Review comment:
The queue used was explicitly single-consumer, to which its doc adds a
"one thread!" note. This line will be potentially executable concurrently (with
both itself, and with the poll in the loop) since the thread ref is nulled
before the loop check, meaning a second (or more) thread can technically get
into the loop as another completes and potentially also then exit it and have
more than one at this point too. Can the queue handle that?
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -71,6 +69,7 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import io.netty.util.internal.PlatformDependent;
Review comment:
I don't like it using Netty internal APIs.
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -183,6 +189,44 @@ JmsConnection connect() throws JMSException {
return this;
}
+ ExecutorService getCompletionExecutor() {
+ ThreadPoolExecutor exec = completionExecutor;
+ if (exec == null) {
+ synchronized (this) {
+ exec = completionExecutor;
+ if (exec == null) {
+ // it can grow "unbounded" to serve multiple concurrent
session completions:
+ // in reality it is bounded by the amount of concurrent
completion requests
+ exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
TimeUnit.SECONDS, new LinkedTransferQueue<>(),
+ new
QpidJMSThreadFactory("JmsConnection ["+ connectionInfo.getId() + "] completion
dispatcher", connectionInfo.isUseDaemonThread()));
Review comment:
These threads will all end up having the same name when operating even
if there are many at once, and as they come and go over time, which could be
confusing. Adding some kind of index might be an idea, though I guess that
could also look odd if it just always increases if they come and go (and the
thread factory would need changed to accommodate it, its aimed at single thread
creation)
Previously they were named based on their individual session. It might be
good to at least reinstate that name while operating for a given session?
I think I might be inclined to have core size be 1. The executor wont be
created until it is needed during cleanup or if they use completions in which
case it is needed.
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ do {
+ if (!completionThread.compareAndSet(null, Thread.currentThread()))
{
+ return;
+ }
+ try {
Review comment:
Newline would be nice before this, distinct units.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]