Repository: activemq
Updated Branches:
  refs/heads/master 9b64e188b -> 0ee4f5b84


[AMQ-6587] additional contention window with new sub while gc is in progress - 
need to verify new sub is actually subscribed to the candidate for deletion. 
Fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0ee4f5b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0ee4f5b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0ee4f5b8

Branch: refs/heads/master
Commit: 0ee4f5b84302daf8901363556d038e274c2defd5
Parents: 9b64e18
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Feb 7 13:53:40 2017 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Feb 7 13:53:40 2017 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/AbstractRegion.java  |  8 +-
 .../broker/region/DestinationGCStressTest.java  | 78 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0ee4f5b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 6eb6e71..3f763e4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -266,8 +266,12 @@ public abstract class AbstractRegion implements Region {
         if (timeout == 0) {
             for (Iterator<Subscription> iter = 
subscriptions.values().iterator(); iter.hasNext();) {
                 Subscription sub = iter.next();
-                if (sub.matches(destination)) {
-                    throw new JMSException("Destination still has an active 
subscription: " + destination);
+                if (sub.matches(destination) ) {
+                    // may be a new sub created after gc decision, verify if 
really subscribed
+                    Destination toDelete  = destinations.get(destination);
+                    if (toDelete != null && 
toDelete.getDestinationStatistics().getConsumers().getCount() > 0 ) {
+                        throw new JMSException("Destination still has an 
active subscription: " + destination);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ee4f5b8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
index 80cd4be..8f7b123 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java
@@ -148,4 +148,82 @@ public class DestinationGCStressTest {
         assertFalse("failed on unexpected log event", failed.get());
 
     }
+
+    @Test(timeout = 60000)
+    public void testAddRemoveWildcardWithGc() throws Exception {
+
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(RegionBroker.class);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.ERROR) && 
event.getMessage().toString().startsWith("Failed to remove inactive")) {
+                    logger.info("received unexpected log message: " + 
event.getMessage());
+                    failed.set(true);
+                }
+            }
+        };
+        log4jLogger.addAppender(appender);
+        try {
+
+            final AtomicInteger max = new AtomicInteger(10000);
+
+            final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?create=false");
+            factory.setWatchTopicAdvisories(false);
+            Connection connection = factory.createConnection();
+            connection.start();
+            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            ExecutorService executorService = Executors.newCachedThreadPool();
+            for (int i = 0; i < 1; i++) {
+                executorService.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Connection c = factory.createConnection();
+                            c.start();
+                            Session s = c.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                            MessageProducer producer = s.createProducer(null);
+                            Message message = s.createTextMessage();
+                            int j;
+                            while ((j = max.decrementAndGet()) > 0) {
+                                producer.send(new ActiveMQTopic("A." + j), 
message);
+                            }
+                        } catch (Exception ignored) {
+                            ignored.printStackTrace();
+                        }
+                    }
+                });
+            }
+
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    for (int i = 0; i < 100; i++) {
+                        try {
+                            MessageConsumer messageConsumer = 
session.createConsumer(new ActiveMQTopic(">"));
+                            messageConsumer.close();
+
+                        } catch (Exception ignored) {
+                            ignored.printStackTrace();
+                        }
+                    }
+                }
+            });
+
+            executorService.shutdown();
+            executorService.awaitTermination(60, TimeUnit.SECONDS);
+
+            logger.info("Done");
+
+            connection.close();
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+        assertFalse("failed on unexpected log event", failed.get());
+
+    }
 }

Reply via email to