This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 25fd8e9058 ARTEMIS-5377 BridgeImpl deadlock fix on handle vs operation
25fd8e9058 is described below

commit 25fd8e90587d8b931040dfead5dfa5077d118599
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Jun 6 10:42:08 2025 -0400

    ARTEMIS-5377 BridgeImpl deadlock fix on handle vs operation
    
    Prevent a deadlock when the handler method holds the a lock from the
    Queue but needs the local status lock which is held by scaledown or
    other state changing operation that needs the Queue lock.
---
 .../core/server/cluster/impl/BridgeImpl.java       | 244 ++++++++++++---------
 1 file changed, 138 insertions(+), 106 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c5b50ff04a..c52e4eab2b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
@@ -150,6 +152,8 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    private final OperationContextImpl bridgeContext;
 
+   private final Lock statusLock = new ReentrantLock();
+
    public BridgeImpl(final ServerLocatorInternal serverLocator,
                      final BridgeConfiguration configuration,
                      final UUID nodeUUID,
@@ -250,24 +254,29 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    @Override
-   public synchronized void start() throws Exception {
-      State localState = this.state;
-      if (localState == State.STARTING || localState == State.STARTED || 
localState == State.STOPPING || localState == State.PAUSING) {
-         logger.debug("Bridge {} state is {}. Ignoring call to start.", 
configuration.getName(), localState);
-         if (localState == State.STOPPING || localState == State.PAUSING) {
-            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "started", localState);
-         } else {
-            return;
+   public void start() throws Exception {
+      statusLock.lock();
+      try {
+         State localState = this.state;
+         if (localState == State.STARTING || localState == State.STARTED || 
localState == State.STOPPING || localState == State.PAUSING) {
+            logger.debug("Bridge {} state is {}. Ignoring call to start.", 
configuration.getName(), localState);
+            if (localState == State.STOPPING || localState == State.PAUSING) {
+               throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "started", localState);
+            } else {
+               return;
+            }
          }
-      }
 
-      state = State.STARTING;
+         state = State.STARTING;
 
-      logger.debug("Bridge {} is starting", configuration.getName());
+         logger.debug("Bridge {} is starting", configuration.getName());
 
-      executor.execute(new ConnectRunnable());
+         executor.execute(new ConnectRunnable());
 
-      sendNotification(CoreNotificationType.BRIDGE_STARTED);
+         sendNotification(CoreNotificationType.BRIDGE_STARTED);
+      } finally {
+         statusLock.unlock();
+      }
    }
 
    @Override
@@ -364,45 +373,55 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    @Override
-   public synchronized void stop() throws Exception {
-      State localState = state;
-      if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING) {
-         logger.debug("Bridge {} state is {}. Ignoring call to stop.", 
configuration.getName(), localState);
-         if (localState == State.PAUSING) {
-            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "stopped", localState);
-         } else {
-            return;
+   public void stop() throws Exception {
+      statusLock.lock();
+      try {
+         State localState = state;
+         if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING) {
+            logger.debug("Bridge {} state is {}. Ignoring call to stop.", 
configuration.getName(), localState);
+            if (localState == State.PAUSING) {
+               throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "stopped", localState);
+            } else {
+               return;
+            }
          }
-      }
 
-      state = State.STOPPING;
+         state = State.STOPPING;
 
-      logger.debug("Bridge {} is stopping", configuration.getName());
+         logger.debug("Bridge {} is stopping", configuration.getName());
 
-      if (scheduledReconnection != null) {
-         scheduledReconnection.cancel(true);
-      }
+         if (scheduledReconnection != null) {
+            scheduledReconnection.cancel(true);
+         }
 
-      executor.execute(new StopRunnable());
+         executor.execute(new StopRunnable());
+      } finally {
+         statusLock.unlock();
+      }
    }
 
    @Override
-   public synchronized void pause() throws Exception {
-      State localState = state;
-      if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING || localState == State.PAUSED) {
-         logger.debug("Bridge {} state is {}. Ignoring call to pause.", 
configuration.getName(), localState);
-         if (localState == State.STOPPING || localState == State.STOPPED) {
-            throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "paused", localState);
-         } else {
-            return;
+   public void pause() throws Exception {
+      statusLock.lock();
+      try {
+         State localState = state;
+         if (localState == State.STOPPING || localState == State.STOPPED || 
localState == State.PAUSING || localState == State.PAUSED) {
+            logger.debug("Bridge {} state is {}. Ignoring call to pause.", 
configuration.getName(), localState);
+            if (localState == State.STOPPING || localState == State.STOPPED) {
+               throw 
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
 "paused", localState);
+            } else {
+               return;
+            }
          }
-      }
 
-      state = State.PAUSING;
+         state = State.PAUSING;
 
-      logger.info("Bridge {} is pausing", configuration.getName());
+         logger.info("Bridge {} is pausing", configuration.getName());
 
-      executor.execute(new PauseRunnable());
+         executor.execute(new PauseRunnable());
+      } finally {
+         statusLock.unlock();
+      }
    }
 
    @Override
@@ -575,74 +594,80 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
          return HandleStatus.NO_MATCH;
       }
 
-      synchronized (this) {
-         if (state != State.STARTED || !session.isWritable(this)) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("{}::Ignoring reference on bridge as it is set to 
inactive ref {}, active = false", this, ref);
+      if (statusLock.tryLock()) {
+         try {
+            if (state != State.STARTED || !session.isWritable(this)) {
+               if (logger.isDebugEnabled()) {
+                  logger.debug("{}::Ignoring reference on bridge as it is set 
to inactive ref {}, active = false", this, ref);
+               }
+               return HandleStatus.BUSY;
             }
-            return HandleStatus.BUSY;
-         }
 
-         if (blockedOnFlowControl) {
-            logger.debug("Bridge {} is blocked on flow control, cannot receive 
{}", configuration.getName(), ref);
-            return HandleStatus.BUSY;
-         }
+            if (blockedOnFlowControl) {
+               logger.debug("Bridge {} is blocked on flow control, cannot 
receive {}", configuration.getName(), ref);
+               return HandleStatus.BUSY;
+            }
 
-         if (deliveringLargeMessage) {
-            logger.trace("Bridge {} is busy delivering a large message", 
configuration.getName());
-            return HandleStatus.BUSY;
-         }
+            if (deliveringLargeMessage) {
+               logger.trace("Bridge {} is busy delivering a large message", 
configuration.getName());
+               return HandleStatus.BUSY;
+            }
 
-         logger.trace("Bridge {} is handling reference {} ", 
configuration.getName(), ref);
+            logger.trace("Bridge {} is handling reference {} ", 
configuration.getName(), ref);
 
-         ref.handled();
+            ref.handled();
 
-         synchronized (refs) {
-            refs.put(ref.getMessage().getMessageID(), ref);
-         }
+            synchronized (refs) {
+               refs.put(ref.getMessage().getMessageID(), ref);
+            }
 
-         final SimpleString dest;
+            final SimpleString dest;
 
-         if (configuration.getForwardingAddress() != null) {
-            dest = SimpleString.of(configuration.getForwardingAddress());
-         } else {
-            // Preserve the original address
-            dest = ref.getMessage().getAddressSimpleString();
-         }
+            if (configuration.getForwardingAddress() != null) {
+               dest = SimpleString.of(configuration.getForwardingAddress());
+            } else {
+               // Preserve the original address
+               dest = ref.getMessage().getAddressSimpleString();
+            }
 
-         final Message message = beforeForward(ref.getMessage(), dest);
+            final Message message = beforeForward(ref.getMessage(), dest);
 
-         pendingAcks.countUp();
+            pendingAcks.countUp();
 
-         try {
-            if (server.hasBrokerBridgePlugins()) {
-               server.callBrokerBridgePlugins(plugin -> 
plugin.beforeDeliverBridge(this, ref));
-            }
+            try {
+               if (server.hasBrokerBridgePlugins()) {
+                  server.callBrokerBridgePlugins(plugin -> 
plugin.beforeDeliverBridge(this, ref));
+               }
 
-            final HandleStatus status;
-            if (message.isLargeMessage()) {
-               deliveringLargeMessage = true;
-               deliverLargeMessage(dest, ref, (LargeServerMessage) message);
-               status = HandleStatus.HANDLED;
-            } else {
-               status = deliverStandardMessage(dest, ref, message, 
ref.getMessage());
-            }
+               final HandleStatus status;
+               if (message.isLargeMessage()) {
+                  deliveringLargeMessage = true;
+                  deliverLargeMessage(dest, ref, (LargeServerMessage) message);
+                  status = HandleStatus.HANDLED;
+               } else {
+                  status = deliverStandardMessage(dest, ref, message, 
ref.getMessage());
+               }
 
-            //Only increment messages pending acknowledgement if handled by 
bridge
-            if (status == HandleStatus.HANDLED) {
-               metrics.incrementMessagesPendingAcknowledgement();
-            }
+               //Only increment messages pending acknowledgement if handled by 
bridge
+               if (status == HandleStatus.HANDLED) {
+                  metrics.incrementMessagesPendingAcknowledgement();
+               }
 
-            if (server.hasBrokerBridgePlugins()) {
-               server.callBrokerBridgePlugins(plugin -> 
plugin.afterDeliverBridge(this, ref, status));
-            }
+               if (server.hasBrokerBridgePlugins()) {
+                  server.callBrokerBridgePlugins(plugin -> 
plugin.afterDeliverBridge(this, ref, status));
+               }
 
-            return status;
-         } catch (Exception e) {
-            // If an exception happened, we must count down immediately
-            pendingAcks.countDown();
-            throw e;
+               return status;
+            } catch (Exception e) {
+               // If an exception happened, we must count down immediately
+               pendingAcks.countDown();
+               throw e;
+            }
+         } finally {
+            statusLock.unlock();
          }
+      } else {
+         return HandleStatus.BUSY;
       }
    }
 
@@ -701,19 +726,20 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
    }
 
    protected void scaleDown(String scaleDownTargetNodeID) {
-      synchronized (this) {
-         try {
-            if (logger.isDebugEnabled()) {
-               logger.debug("Moving {} messages from {} to {}", 
queue.getMessageCount(), queue.getName(), scaleDownTargetNodeID);
-            }
-            ((QueueImpl) 
queue).moveReferencesBetweenSnFQueues(SimpleString.of(scaleDownTargetNodeID));
+      statusLock.lock();
+      try {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Moving {} messages from {} to {}", 
queue.getMessageCount(), queue.getName(), scaleDownTargetNodeID);
+         }
+         ((QueueImpl) 
queue).moveReferencesBetweenSnFQueues(SimpleString.of(scaleDownTargetNodeID));
 
-            // stop the bridge from trying to reconnect and clean up all the 
bindings
-            fail(true, true);
+         // stop the bridge from trying to reconnect and clean up all the 
bindings
+         fail(true, true);
 
-         } catch (Exception e) {
-            logger.warn(e.getMessage(), e);
-         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      } finally {
+         statusLock.unlock();
       }
    }
 
@@ -995,7 +1021,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
       }
    }
 
-   private synchronized void unsetLargeMessageDelivery() {
+   private void unsetLargeMessageDelivery() {
       deliveringLargeMessage = false;
    }
 
@@ -1146,8 +1172,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Stopping", 
configuration.getName(), pendingAcks.getCount());
             }
 
-            synchronized (BridgeImpl.this) {
+            statusLock.lock();
+            try {
                state = State.STOPPED;
+            } finally {
+               statusLock.unlock();
             }
 
             if (session != null) {
@@ -1201,8 +1230,11 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
                
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Pausing", 
configuration.getName(), pendingAcks.getCount());
             }
 
-            synchronized (BridgeImpl.this) {
+            statusLock.lock();
+            try {
                state = State.PAUSED;
+            } finally {
+               statusLock.unlock();
             }
 
             internalCancelReferences();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to