gemmellr commented on code in PR #1728:
URL: https://github.com/apache/activemq/pull/1728#discussion_r2890202383
##########
activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java:
##########
@@ -168,6 +168,7 @@ public Destination getDestination() throws JMSException {
*/
@Override
public void close() throws JMSException {
+ ActiveMQSession.checkNotInCompletionListenerCallback("close");
Review Comment:
Not seeing anything to satisfy the requirement of close() waiting for
CompletionListeners to complete (successfully or otherwise) before returning.
Similarly plays into parent Session and Connection closures that need to do
the same (somewhat similar for MessageListener where there [already seems like
there could be an
issue](https://github.com/apache/activemq/pull/1643#issuecomment-3852789499)).
EDIT: ok so the fact later revealed that this 'asynchronous' send is being
done entirely syncronously means this is probably only an issue for the
_current_ send (if there is one).
##########
activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java:
##########
@@ -235,6 +243,20 @@ public static interface DeliveryListener {
private BlobTransferPolicy blobTransferPolicy;
private long lastDeliveredSequenceId = -2;
+ // Single-threaded executor for async send: ensures one CompletionListener
callback at a time
+ // and that callbacks are invoked in the same order as the corresponding
send calls
+ // per Jakarta Messaging 3.1 spec section 7.3.8
+ private final ExecutorService asyncSendExecutor =
Executors.newSingleThreadExecutor(
+ r -> new Thread(r, "ActiveMQ async-send"));
+
+ // Set to true on the executor thread while a CompletionListener callback
is executing.
+ // Used to detect illegal session operations (close/commit/rollback) from
within a callback.
+ static final ThreadLocal<Boolean> IN_COMPLETION_LISTENER_CALLBACK =
ThreadLocal.withInitial(() -> false);
+
+ // Set to true on the dispatch thread while a MessageListener.onMessage()
callback is executing.
+ // Used to detect illegal connection/session operations from within a
MessageListener callback.
+ static final ThreadLocal<Boolean> IN_MESSAGE_LISTENER_CALLBACK =
ThreadLocal.withInitial(() -> false);
+
Review Comment:
This doesnt seem correct. The check is meant to be Session-specific, whilst
this is JVM-wide, so it will give the wrong result if crossing between
Sessions/Connections, which the completion and message listener threads are
still free to do (they primarily have restrictions on use of _their own_
Session/Connection, which is what the checks need to cover). Elsewhere we track
the Thread in the Session using an AtomicReference, then that can later be
compared to currentThread at the appropriate spots.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact