This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c80869  AMQ-8023 - rework fix to deal with addSub interleaved with 
removeDestination advisory processing, serialise add/remove dest such that add 
is not lost and new sub resubscribes ok, extra verifications in the test
5c80869 is described below

commit 5c8086961fb9e052e30a5846e6b738267de7c4a0
Author: gtully <[email protected]>
AuthorDate: Thu Aug 27 16:31:50 2020 +0100

    AMQ-8023 - rework fix to deal with addSub interleaved with 
removeDestination advisory processing, serialise add/remove dest such that add 
is not lost and new sub resubscribes ok, extra verifications in the test
---
 .../network/DemandForwardingBridgeSupport.java     | 55 +++++++---------------
 .../mqtt/MQTTVirtualTopicSubscriptionsTest.java    | 12 +++++
 2 files changed, 29 insertions(+), 38 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 6441c92..602e6eb 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -979,7 +978,6 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 }
             }
         } else if (data.getClass() == DestinationInfo.class) {
-            // It's a destination info - we want to pass up information about 
temporary destinations
             final DestinationInfo destInfo = (DestinationInfo) data;
             BrokerId[] path = destInfo.getBrokerPath();
             if (path != null && networkTTL > -1 && path.length >= networkTTL) {
@@ -1003,21 +1001,21 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                     configuration.getBrokerName(), (destInfo.isAddOperation() 
? "add" : "remove"), localBroker, remoteBrokerName, destInfo
             });
             if (destInfo.isRemoveOperation()) {
-                // Serialize with removeSub operations such that all removeSub 
advisories
-                // are generated
-                serialExecutor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            localBroker.oneway(destInfo);
-                        } catch (IOException e) {
-                            LOG.warn("failed to deliver remove command for 
destination: {}", destInfo.getDestination(), e);
-                        }
-                    }
-                });
-            } else {
-                localBroker.oneway(destInfo);
+                // not synced with addSubs so we will need to ignore any 
potential new subs with a timeout!=0
+                destInfo.setTimeout(1);
             }
+            // Serialize both add/remove dest with removeSub operations such 
that all removeSub advisories are generated
+            serialExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        localBroker.oneway(destInfo);
+                    } catch (IOException e) {
+                        LOG.warn("failed to deliver remove command for 
destination: {}", destInfo.getDestination(), e);
+                    }
+                }
+            });
+
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
@@ -1149,28 +1147,9 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         return duplexInitiatingConnection != null ? duplexInitiatingConnection 
: DemandForwardingBridgeSupport.this;
     }
 
-    protected void addSubscription(final DemandSubscription sub) throws 
IOException {
+    protected void addSubscription(DemandSubscription sub) throws IOException {
         if (sub != null) {
-            // Serialize with remove operations such that new sub does not 
cause remove/purge to fail
-            // remain synchronous b/c duplicate suppression depends on add 
completion
-            FutureTask syncTask = new FutureTask(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        localBroker.oneway(sub.getLocalInfo());
-                    } catch (IOException e) {
-                        LOG.warn("failed to deliver add sub command: {}, 
cause: {}", sub.getLocalInfo(), e);
-                        LOG.debug("detail", e);
-                    }
-                }
-            }, null);
-            try {
-                serialExecutor.execute(syncTask);
-                syncTask.get();
-            } catch (Exception e) {
-                LOG.warn("failed to execute add sub command: {}, cause: {}", 
sub.getLocalInfo(), e);
-                LOG.debug("detail", e);
-            }
+            localBroker.oneway(sub.getLocalInfo());
         }
     }
 
@@ -1182,7 +1161,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
             
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
-            // continue removal in separate thread to free up tshis thread for 
outstanding responses
+            // continue removal in separate thread to free up this thread for 
outstanding responses
             // Serialize with removeDestination operations so that removeSubs 
are serialized with
             // removeDestinations such that all removeSub advisories are 
generated
             serialExecutor.execute(new Runnable() {
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
index 4eedc1c..c601a8e 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.network.NetworkBridge;
@@ -330,6 +331,17 @@ public class MQTTVirtualTopicSubscriptionsTest extends 
MQTTTest {
         // release bridge remove ops *after* new/re subscription
         removeOp.countDown();
 
+        assertTrue("All destinations and subs recreated and consumers 
connected on brokerTwo via network", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                BrokerView brokerView = brokerTwo.getAdminView();
+                int numQueues = brokerView.getQueues().length;
+                int numSubscriptions = brokerView.getQueueSubscribers().length;
+
+                LOG.info("#Queues: " + numQueues + ", #Subs: " + 
numSubscriptions);
+                return numQueues == numDests && numSubscriptions == numDests;
+            }
+        }));
         Message msg = notClean.receive(500, TimeUnit.MILLISECONDS);
         assertNull(msg);
         notClean.disconnect();

Reply via email to