This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 25fd8e9058 ARTEMIS-5377 BridgeImpl deadlock fix on handle vs operation
25fd8e9058 is described below
commit 25fd8e90587d8b931040dfead5dfa5077d118599
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Jun 6 10:42:08 2025 -0400
ARTEMIS-5377 BridgeImpl deadlock fix on handle vs operation
Prevent a deadlock when the handler method holds the a lock from the
Queue but needs the local status lock which is held by scaledown or
other state changing operation that needs the Queue lock.
---
.../core/server/cluster/impl/BridgeImpl.java | 244 ++++++++++++---------
1 file changed, 138 insertions(+), 106 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index c5b50ff04a..c52e4eab2b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
@@ -150,6 +152,8 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
private final OperationContextImpl bridgeContext;
+ private final Lock statusLock = new ReentrantLock();
+
public BridgeImpl(final ServerLocatorInternal serverLocator,
final BridgeConfiguration configuration,
final UUID nodeUUID,
@@ -250,24 +254,29 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
}
@Override
- public synchronized void start() throws Exception {
- State localState = this.state;
- if (localState == State.STARTING || localState == State.STARTED ||
localState == State.STOPPING || localState == State.PAUSING) {
- logger.debug("Bridge {} state is {}. Ignoring call to start.",
configuration.getName(), localState);
- if (localState == State.STOPPING || localState == State.PAUSING) {
- throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"started", localState);
- } else {
- return;
+ public void start() throws Exception {
+ statusLock.lock();
+ try {
+ State localState = this.state;
+ if (localState == State.STARTING || localState == State.STARTED ||
localState == State.STOPPING || localState == State.PAUSING) {
+ logger.debug("Bridge {} state is {}. Ignoring call to start.",
configuration.getName(), localState);
+ if (localState == State.STOPPING || localState == State.PAUSING) {
+ throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"started", localState);
+ } else {
+ return;
+ }
}
- }
- state = State.STARTING;
+ state = State.STARTING;
- logger.debug("Bridge {} is starting", configuration.getName());
+ logger.debug("Bridge {} is starting", configuration.getName());
- executor.execute(new ConnectRunnable());
+ executor.execute(new ConnectRunnable());
- sendNotification(CoreNotificationType.BRIDGE_STARTED);
+ sendNotification(CoreNotificationType.BRIDGE_STARTED);
+ } finally {
+ statusLock.unlock();
+ }
}
@Override
@@ -364,45 +373,55 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
}
@Override
- public synchronized void stop() throws Exception {
- State localState = state;
- if (localState == State.STOPPING || localState == State.STOPPED ||
localState == State.PAUSING) {
- logger.debug("Bridge {} state is {}. Ignoring call to stop.",
configuration.getName(), localState);
- if (localState == State.PAUSING) {
- throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"stopped", localState);
- } else {
- return;
+ public void stop() throws Exception {
+ statusLock.lock();
+ try {
+ State localState = state;
+ if (localState == State.STOPPING || localState == State.STOPPED ||
localState == State.PAUSING) {
+ logger.debug("Bridge {} state is {}. Ignoring call to stop.",
configuration.getName(), localState);
+ if (localState == State.PAUSING) {
+ throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"stopped", localState);
+ } else {
+ return;
+ }
}
- }
- state = State.STOPPING;
+ state = State.STOPPING;
- logger.debug("Bridge {} is stopping", configuration.getName());
+ logger.debug("Bridge {} is stopping", configuration.getName());
- if (scheduledReconnection != null) {
- scheduledReconnection.cancel(true);
- }
+ if (scheduledReconnection != null) {
+ scheduledReconnection.cancel(true);
+ }
- executor.execute(new StopRunnable());
+ executor.execute(new StopRunnable());
+ } finally {
+ statusLock.unlock();
+ }
}
@Override
- public synchronized void pause() throws Exception {
- State localState = state;
- if (localState == State.STOPPING || localState == State.STOPPED ||
localState == State.PAUSING || localState == State.PAUSED) {
- logger.debug("Bridge {} state is {}. Ignoring call to pause.",
configuration.getName(), localState);
- if (localState == State.STOPPING || localState == State.STOPPED) {
- throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"paused", localState);
- } else {
- return;
+ public void pause() throws Exception {
+ statusLock.lock();
+ try {
+ State localState = state;
+ if (localState == State.STOPPING || localState == State.STOPPED ||
localState == State.PAUSING || localState == State.PAUSED) {
+ logger.debug("Bridge {} state is {}. Ignoring call to pause.",
configuration.getName(), localState);
+ if (localState == State.STOPPING || localState == State.STOPPED) {
+ throw
ActiveMQMessageBundle.BUNDLE.bridgeOperationCannotBeExecuted(configuration.getName(),
"paused", localState);
+ } else {
+ return;
+ }
}
- }
- state = State.PAUSING;
+ state = State.PAUSING;
- logger.info("Bridge {} is pausing", configuration.getName());
+ logger.info("Bridge {} is pausing", configuration.getName());
- executor.execute(new PauseRunnable());
+ executor.execute(new PauseRunnable());
+ } finally {
+ statusLock.unlock();
+ }
}
@Override
@@ -575,74 +594,80 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
return HandleStatus.NO_MATCH;
}
- synchronized (this) {
- if (state != State.STARTED || !session.isWritable(this)) {
- if (logger.isDebugEnabled()) {
- logger.debug("{}::Ignoring reference on bridge as it is set to
inactive ref {}, active = false", this, ref);
+ if (statusLock.tryLock()) {
+ try {
+ if (state != State.STARTED || !session.isWritable(this)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}::Ignoring reference on bridge as it is set
to inactive ref {}, active = false", this, ref);
+ }
+ return HandleStatus.BUSY;
}
- return HandleStatus.BUSY;
- }
- if (blockedOnFlowControl) {
- logger.debug("Bridge {} is blocked on flow control, cannot receive
{}", configuration.getName(), ref);
- return HandleStatus.BUSY;
- }
+ if (blockedOnFlowControl) {
+ logger.debug("Bridge {} is blocked on flow control, cannot
receive {}", configuration.getName(), ref);
+ return HandleStatus.BUSY;
+ }
- if (deliveringLargeMessage) {
- logger.trace("Bridge {} is busy delivering a large message",
configuration.getName());
- return HandleStatus.BUSY;
- }
+ if (deliveringLargeMessage) {
+ logger.trace("Bridge {} is busy delivering a large message",
configuration.getName());
+ return HandleStatus.BUSY;
+ }
- logger.trace("Bridge {} is handling reference {} ",
configuration.getName(), ref);
+ logger.trace("Bridge {} is handling reference {} ",
configuration.getName(), ref);
- ref.handled();
+ ref.handled();
- synchronized (refs) {
- refs.put(ref.getMessage().getMessageID(), ref);
- }
+ synchronized (refs) {
+ refs.put(ref.getMessage().getMessageID(), ref);
+ }
- final SimpleString dest;
+ final SimpleString dest;
- if (configuration.getForwardingAddress() != null) {
- dest = SimpleString.of(configuration.getForwardingAddress());
- } else {
- // Preserve the original address
- dest = ref.getMessage().getAddressSimpleString();
- }
+ if (configuration.getForwardingAddress() != null) {
+ dest = SimpleString.of(configuration.getForwardingAddress());
+ } else {
+ // Preserve the original address
+ dest = ref.getMessage().getAddressSimpleString();
+ }
- final Message message = beforeForward(ref.getMessage(), dest);
+ final Message message = beforeForward(ref.getMessage(), dest);
- pendingAcks.countUp();
+ pendingAcks.countUp();
- try {
- if (server.hasBrokerBridgePlugins()) {
- server.callBrokerBridgePlugins(plugin ->
plugin.beforeDeliverBridge(this, ref));
- }
+ try {
+ if (server.hasBrokerBridgePlugins()) {
+ server.callBrokerBridgePlugins(plugin ->
plugin.beforeDeliverBridge(this, ref));
+ }
- final HandleStatus status;
- if (message.isLargeMessage()) {
- deliveringLargeMessage = true;
- deliverLargeMessage(dest, ref, (LargeServerMessage) message);
- status = HandleStatus.HANDLED;
- } else {
- status = deliverStandardMessage(dest, ref, message,
ref.getMessage());
- }
+ final HandleStatus status;
+ if (message.isLargeMessage()) {
+ deliveringLargeMessage = true;
+ deliverLargeMessage(dest, ref, (LargeServerMessage) message);
+ status = HandleStatus.HANDLED;
+ } else {
+ status = deliverStandardMessage(dest, ref, message,
ref.getMessage());
+ }
- //Only increment messages pending acknowledgement if handled by
bridge
- if (status == HandleStatus.HANDLED) {
- metrics.incrementMessagesPendingAcknowledgement();
- }
+ //Only increment messages pending acknowledgement if handled by
bridge
+ if (status == HandleStatus.HANDLED) {
+ metrics.incrementMessagesPendingAcknowledgement();
+ }
- if (server.hasBrokerBridgePlugins()) {
- server.callBrokerBridgePlugins(plugin ->
plugin.afterDeliverBridge(this, ref, status));
- }
+ if (server.hasBrokerBridgePlugins()) {
+ server.callBrokerBridgePlugins(plugin ->
plugin.afterDeliverBridge(this, ref, status));
+ }
- return status;
- } catch (Exception e) {
- // If an exception happened, we must count down immediately
- pendingAcks.countDown();
- throw e;
+ return status;
+ } catch (Exception e) {
+ // If an exception happened, we must count down immediately
+ pendingAcks.countDown();
+ throw e;
+ }
+ } finally {
+ statusLock.unlock();
}
+ } else {
+ return HandleStatus.BUSY;
}
}
@@ -701,19 +726,20 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
}
protected void scaleDown(String scaleDownTargetNodeID) {
- synchronized (this) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Moving {} messages from {} to {}",
queue.getMessageCount(), queue.getName(), scaleDownTargetNodeID);
- }
- ((QueueImpl)
queue).moveReferencesBetweenSnFQueues(SimpleString.of(scaleDownTargetNodeID));
+ statusLock.lock();
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Moving {} messages from {} to {}",
queue.getMessageCount(), queue.getName(), scaleDownTargetNodeID);
+ }
+ ((QueueImpl)
queue).moveReferencesBetweenSnFQueues(SimpleString.of(scaleDownTargetNodeID));
- // stop the bridge from trying to reconnect and clean up all the
bindings
- fail(true, true);
+ // stop the bridge from trying to reconnect and clean up all the
bindings
+ fail(true, true);
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
- }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ statusLock.unlock();
}
}
@@ -995,7 +1021,7 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
}
}
- private synchronized void unsetLargeMessageDelivery() {
+ private void unsetLargeMessageDelivery() {
deliveringLargeMessage = false;
}
@@ -1146,8 +1172,11 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Stopping",
configuration.getName(), pendingAcks.getCount());
}
- synchronized (BridgeImpl.this) {
+ statusLock.lock();
+ try {
state = State.STOPPED;
+ } finally {
+ statusLock.unlock();
}
if (session != null) {
@@ -1201,8 +1230,11 @@ public class BridgeImpl implements Bridge,
SessionFailureListener, SendAcknowled
ActiveMQServerLogger.LOGGER.timedOutWaitingForSendAcks("Pausing",
configuration.getName(), pendingAcks.getCount());
}
- synchronized (BridgeImpl.this) {
+ statusLock.lock();
+ try {
state = State.PAUSED;
+ } finally {
+ statusLock.unlock();
}
internalCancelReferences();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact