This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 5c80869 AMQ-8023 - rework fix to deal with addSub interleaved with
removeDestination advisory processing, serialise add/remove dest such that add
is not lost and new sub resubscribes ok, extra verifications in the test
5c80869 is described below
commit 5c8086961fb9e052e30a5846e6b738267de7c4a0
Author: gtully <[email protected]>
AuthorDate: Thu Aug 27 16:31:50 2020 +0100
AMQ-8023 - rework fix to deal with addSub interleaved with
removeDestination advisory processing, serialise add/remove dest such that add
is not lost and new sub resubscribes ok, extra verifications in the test
---
.../network/DemandForwardingBridgeSupport.java | 55 +++++++---------------
.../mqtt/MQTTVirtualTopicSubscriptionsTest.java | 12 +++++
2 files changed, 29 insertions(+), 38 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 6441c92..602e6eb 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -979,7 +978,6 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
}
}
} else if (data.getClass() == DestinationInfo.class) {
- // It's a destination info - we want to pass up information about
temporary destinations
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
@@ -1003,21 +1001,21 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
configuration.getBrokerName(), (destInfo.isAddOperation()
? "add" : "remove"), localBroker, remoteBrokerName, destInfo
});
if (destInfo.isRemoveOperation()) {
- // Serialize with removeSub operations such that all removeSub
advisories
- // are generated
- serialExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- localBroker.oneway(destInfo);
- } catch (IOException e) {
- LOG.warn("failed to deliver remove command for
destination: {}", destInfo.getDestination(), e);
- }
- }
- });
- } else {
- localBroker.oneway(destInfo);
+ // not synced with addSubs so we will need to ignore any
potential new subs with a timeout!=0
+ destInfo.setTimeout(1);
}
+ // Serialize both add/remove dest with removeSub operations such
that all removeSub advisories are generated
+ serialExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ localBroker.oneway(destInfo);
+ } catch (IOException e) {
+ LOG.warn("failed to deliver remove command for
destination: {}", destInfo.getDestination(), e);
+ }
+ }
+ });
+
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
@@ -1149,28 +1147,9 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
return duplexInitiatingConnection != null ? duplexInitiatingConnection
: DemandForwardingBridgeSupport.this;
}
- protected void addSubscription(final DemandSubscription sub) throws
IOException {
+ protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
- // Serialize with remove operations such that new sub does not
cause remove/purge to fail
- // remain synchronous b/c duplicate suppression depends on add
completion
- FutureTask syncTask = new FutureTask(new Runnable() {
- @Override
- public void run() {
- try {
- localBroker.oneway(sub.getLocalInfo());
- } catch (IOException e) {
- LOG.warn("failed to deliver add sub command: {},
cause: {}", sub.getLocalInfo(), e);
- LOG.debug("detail", e);
- }
- }
- }, null);
- try {
- serialExecutor.execute(syncTask);
- syncTask.get();
- } catch (Exception e) {
- LOG.warn("failed to execute add sub command: {}, cause: {}",
sub.getLocalInfo(), e);
- LOG.debug("detail", e);
- }
+ localBroker.oneway(sub.getLocalInfo());
}
}
@@ -1182,7 +1161,7 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
- // continue removal in separate thread to free up tshis thread for
outstanding responses
+ // continue removal in separate thread to free up this thread for
outstanding responses
// Serialize with removeDestination operations so that removeSubs
are serialized with
// removeDestinations such that all removeSub advisories are
generated
serialExecutor.execute(new Runnable() {
diff --git
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
index 4eedc1c..c601a8e 100644
---
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
+++
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.network.NetworkBridge;
@@ -330,6 +331,17 @@ public class MQTTVirtualTopicSubscriptionsTest extends
MQTTTest {
// release bridge remove ops *after* new/re subscription
removeOp.countDown();
+ assertTrue("All destinations and subs recreated and consumers
connected on brokerTwo via network", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ BrokerView brokerView = brokerTwo.getAdminView();
+ int numQueues = brokerView.getQueues().length;
+ int numSubscriptions = brokerView.getQueueSubscribers().length;
+
+ LOG.info("#Queues: " + numQueues + ", #Subs: " +
numSubscriptions);
+ return numQueues == numDests && numSubscriptions == numDests;
+ }
+ }));
Message msg = notClean.receive(500, TimeUnit.MILLISECONDS);
assertNull(msg);
notClean.disconnect();