franz1981 commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r743939643
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}
+ public void setCompletionThreads(final int completionThreads) {
Review comment:
> EDIT: actually, is it? I'm not seeing where it would actually share?
The completion factory create a singleton instance of `sharedRefCnt` of
`ForkJoinPool` that allows sharing the same FJ pool unless every connection
that reference it, got closed. If that happen, the last one would dispose it,
leaving incoming connections (if any) able to create a new one, similarly to
the shared event loop group of #45
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}
+ public void setCompletionThreads(final int completionThreads) {
Review comment:
> EDIT: actually, is it? I'm not seeing where it would actually share?
The completion factory create a singleton instance of `sharedRefCnt` of
`ForkJoinPool` that allows sharing the same FJ pool unless every connection
that reference it get closed.
If that happen, the last one would dispose it, leaving incoming connections
(if any) able to create a new one, similarly to the shared event loop group of
#45
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
return null;
}
+ protected Supplier<Holder<ExecutorService>>
getCompletionExecutorServiceFactory() {
+ if (this.completionThreads == 0) {
+ return null;
+ }
+ synchronized (this) {
+ if (completionExecutorServiceFactory == null) {
+ QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new
QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+ completionExecutorServiceFactory = sharedRefCnt(() -> new
ForkJoinPool(completionThreads, fjThreadFactory, null, false),
ThreadPoolUtils::shutdown);
Review comment:
> I assume thats to try and avoid creating it?
Given that we cannot rely on finalization of connection factory, I cannot
pre-allocate it if there are no actual "users" ie connections. And I would like
it to be correctly disposed and shutdown while every connection belonging to
the connction factory got closed.
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ assert processCompletion.get();
+ completionThread = Thread.currentThread();
+ try {
+ final Runnable completionTask = completionTasks.poll();
+ if (completionTask != null) {
+ try {
+ completionTask.run();
+ } catch (Throwable t) {
+ LOG.debug("errored on processCompletions duty cycle", t);
+ }
+ }
+ } finally {
+ completionThread = null;
+ processCompletion.set(false);
+ }
+ if (completionTasks.isEmpty()) {
+ return;
+ }
+ // a racing asyncProcessCompletion has won: no need to fire a
continuation
+ if (!processCompletion.compareAndSet(false, true)) {
+ return;
+ }
+ getCompletionExecutor().execute(this::processCompletions);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask) {
+ asyncProcessCompletion(completionTask, false);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask, final
boolean ignoreSessionClosed) {
+ if (!ignoreSessionClosed) {
Review comment:
I can let them to pass, but this one was to mimic the original reject
handler installed for the completion single threaded executor: I admit I didn't
put much thoughts on this to validate if it can be saved
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}
+ public void setCompletionThreads(final int completionThreads) {
Review comment:
> EDIT: actually, is it? I'm not seeing where it would actually share?
The completion factory create a singleton instance of `sharedRefCnt` of
`ForkJoinPool` that allows sharing the same FJ pool unless every connection
that reference it, got closed. If that happen, the last one would dispose it,
leaving incoming connections (if any) able to create a new one, similarly to
the shared event loop group of #45
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}
+ public void setCompletionThreads(final int completionThreads) {
Review comment:
> EDIT: actually, is it? I'm not seeing where it would actually share?
The completion factory create a singleton instance of `sharedRefCnt` of
`ForkJoinPool` that allows sharing the same FJ pool unless every connection
that reference it get closed.
If that happen, the last one would dispose it, leaving incoming connections
(if any) able to create a new one, similarly to the shared event loop group of
#45
##########
File path:
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
return null;
}
+ protected Supplier<Holder<ExecutorService>>
getCompletionExecutorServiceFactory() {
+ if (this.completionThreads == 0) {
+ return null;
+ }
+ synchronized (this) {
+ if (completionExecutorServiceFactory == null) {
+ QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new
QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+ completionExecutorServiceFactory = sharedRefCnt(() -> new
ForkJoinPool(completionThreads, fjThreadFactory, null, false),
ThreadPoolUtils::shutdown);
Review comment:
> I assume thats to try and avoid creating it?
Given that we cannot rely on finalization of connection factory, I cannot
pre-allocate it if there are no actual "users" ie connections. And I would like
it to be correctly disposed and shutdown while every connection belonging to
the connction factory got closed.
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ assert processCompletion.get();
+ completionThread = Thread.currentThread();
+ try {
+ final Runnable completionTask = completionTasks.poll();
+ if (completionTask != null) {
+ try {
+ completionTask.run();
+ } catch (Throwable t) {
+ LOG.debug("errored on processCompletions duty cycle", t);
+ }
+ }
+ } finally {
+ completionThread = null;
+ processCompletion.set(false);
+ }
+ if (completionTasks.isEmpty()) {
+ return;
+ }
+ // a racing asyncProcessCompletion has won: no need to fire a
continuation
+ if (!processCompletion.compareAndSet(false, true)) {
+ return;
+ }
+ getCompletionExecutor().execute(this::processCompletions);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask) {
+ asyncProcessCompletion(completionTask, false);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask, final
boolean ignoreSessionClosed) {
+ if (!ignoreSessionClosed) {
Review comment:
I can let them to pass, but this one was to mimic the original reject
handler installed for the completion single threaded executor: I admit I didn't
put much thoughts on this to validate if it can be saved
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ assert processCompletion.get();
+ completionThread = Thread.currentThread();
+ try {
+ final Runnable completionTask = completionTasks.poll();
+ if (completionTask != null) {
+ try {
+ completionTask.run();
+ } catch (Throwable t) {
+ LOG.debug("errored on processCompletions duty cycle", t);
+ }
+ }
+ } finally {
+ completionThread = null;
+ processCompletion.set(false);
+ }
+ if (completionTasks.isEmpty()) {
+ return;
+ }
+ // a racing asyncProcessCompletion has won: no need to fire a
continuation
+ if (!processCompletion.compareAndSet(false, true)) {
+ return;
+ }
+ getCompletionExecutor().execute(this::processCompletions);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask) {
+ asyncProcessCompletion(completionTask, false);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask, final
boolean ignoreSessionClosed) {
+ if (!ignoreSessionClosed) {
Review comment:
> I expect this change would mean they get could be marked as failed
when they shouldnt
I've implemented it intending to mimic (by inlining) the reject policy of
the single threaded executor: I admit i didn't put much thoughts into this, but
probably can be treated differently and it's maybe introducing a slightly
different semantic, but I still don't see any harm; my expectation is that when
session::shutdown is called, any already submitted completion should be handled
(and that's visible in the new shutdown logic using CountDownLatch), but new
submissions would be processed if sent *after* shutdown is initiated (unless
part of the `shutdown` logic itself ie `ignoreSessionClosed == true`)
##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
}
}
+ private void processCompletions() {
+ assert processCompletion.get();
+ completionThread = Thread.currentThread();
+ try {
+ final Runnable completionTask = completionTasks.poll();
+ if (completionTask != null) {
+ try {
+ completionTask.run();
+ } catch (Throwable t) {
+ LOG.debug("errored on processCompletions duty cycle", t);
+ }
+ }
+ } finally {
+ completionThread = null;
+ processCompletion.set(false);
+ }
+ if (completionTasks.isEmpty()) {
+ return;
+ }
+ // a racing asyncProcessCompletion has won: no need to fire a
continuation
+ if (!processCompletion.compareAndSet(false, true)) {
+ return;
+ }
+ getCompletionExecutor().execute(this::processCompletions);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask) {
+ asyncProcessCompletion(completionTask, false);
+ }
+
+ private void asyncProcessCompletion(final Runnable completionTask, final
boolean ignoreSessionClosed) {
+ if (!ignoreSessionClosed) {
Review comment:
> I expect this change would mean they get could be marked as failed
when they shouldnt
I've implemented it intending to mimic (by inlining) the reject policy of
the single threaded executor: I admit i didn't put much thoughts into this, but
probably can be treated differently and it's maybe introducing a slightly
different semantic, but I still don't see any harm; my expectation is that when
session::shutdown is called, any already submitted completion should be handled
(and that's visible in the new shutdown logic using CountDownLatch), but new
submissions would be processed if sent *after* shutdown is initiated (unless
part of the `shutdown` logic itself ie `ignoreSessionClosed == true`) and
ignored otherwise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]