Repository: activemq
Updated Branches:
  refs/heads/master 2b99ffcc2 -> 18571ce09


https://issues.apache.org/jira/browse/AMQ-6430

Modifying patch so that only stores that persist the noLocal flag will
check if this flag has changed to prevent a subscription from being
deleted by mistake


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/18571ce0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/18571ce0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/18571ce0

Branch: refs/heads/master
Commit: 18571ce09b6385d8560200928a353e9da1a1ffe4
Parents: 2b99ffc
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Sep 23 10:58:26 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Fri Sep 23 10:59:32 2016 -0400

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Topic.java    | 11 ++++++----
 .../activemq/broker/region/TopicRegion.java     | 10 ++++++---
 .../store/NoLocalSubscriptionAware.java         | 22 ++++++++++++++++++++
 .../store/memory/MemoryPersistenceAdapter.java  | 11 +++++++++-
 .../store/kahadb/KahaDBPersistenceAdapter.java  | 13 +++++++++++-
 .../activemq/store/kahadb/KahaDBStore.java      | 12 ++++++++++-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   | 14 ++++++++++++-
 .../DurableSubscriptionOffline3Test.java        |  2 +-
 .../DurableSubscriptionWithNoLocalTest.java     |  5 -----
 9 files changed, 83 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index bf803c1..2aa6e18 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -51,6 +51,8 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -211,7 +213,7 @@ public class Topic extends BaseDestination implements Task {
         }
     }
 
-    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo 
info2) {
+    private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo 
info2) throws IOException {
         if (hasSelectorChanged(info1, info2)) {
             return true;
         }
@@ -219,9 +221,10 @@ public class Topic extends BaseDestination implements Task 
{
         return hasNoLocalChanged(info1, info2);
     }
 
-    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo 
info2) {
-        // Prior to V11 the broker did not store the noLocal value for durable 
subs.
-        if (brokerService.getStoreOpenWireVersion() >= 11) {
+    private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo 
info2) throws IOException {
+        //Not all persistence adapters store the noLocal value for a 
subscription
+        PersistenceAdapter adapter = 
broker.getBrokerService().getPersistenceAdapter();
+        if (adapter instanceof NoLocalSubscriptionAware) {
             if (info1.isNoLocal() ^ info2.isNoLocal()) {
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index eca3449..1ea3e62 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -40,6 +41,8 @@ import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
@@ -373,15 +376,16 @@ public class TopicRegion extends AbstractRegion {
         }
     }
 
-    private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo 
info2) {
+    private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo 
info2) throws IOException {
         if (info1.getSelector() != null ^ info2.getSelector() != null) {
             return true;
         }
         if (info1.getSelector() != null && 
!info1.getSelector().equals(info2.getSelector())) {
             return true;
         }
-        // Prior to V11 the broker did not store the noLocal value for durable 
subs.
-        if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) {
+        //Not all persistence adapters store the noLocal value for a 
subscription
+        PersistenceAdapter adapter = 
broker.getBrokerService().getPersistenceAdapter();
+        if (adapter instanceof NoLocalSubscriptionAware) {
             if (info1.isNoLocal() ^ info2.isNoLocal()) {
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java
new file mode 100644
index 0000000..ae58cba
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/NoLocalSubscriptionAware.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+public interface NoLocalSubscriptionAware {
+
+    public boolean isPersistNoLocal();
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index c16ea14..5c073c3 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -31,6 +31,7 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
@@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
 /**
  * @org.apache.xbean.XBean
  */
-public class MemoryPersistenceAdapter implements PersistenceAdapter {
+public class MemoryPersistenceAdapter implements PersistenceAdapter, 
NoLocalSubscriptionAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
 
     MemoryTransactionStore transactionStore;
@@ -241,4 +242,12 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter {
         // We could eventuall implement an in memory scheduler.
         throw new UnsupportedOperationException();
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
+     */
+    @Override
+    public boolean isPersistNoLocal() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 2fbdcae..5eef750 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -42,6 +42,7 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.JournaledStore;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.SharedFileLocker;
 import org.apache.activemq.store.TopicMessageStore;
@@ -61,7 +62,9 @@ import org.apache.activemq.util.ServiceStopper;
  * @org.apache.xbean.XBean element="kahaDB"
  *
  */
-public class KahaDBPersistenceAdapter extends LockableServiceSupport 
implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport 
implements PersistenceAdapter,
+    JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware {
+
     private final KahaDBStore letter = new KahaDBStore();
 
     /**
@@ -788,4 +791,12 @@ public class KahaDBPersistenceAdapter extends 
LockableServiceSupport implements
     public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
         return this.letter.createJobSchedulerStore();
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
+     */
+    @Override
+    public boolean isPersistNoLocal() {
+        return this.letter.isPersistNoLocal();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 24862e3..66d616b 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -63,6 +63,7 @@ import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionIdTransformer;
@@ -86,7 +87,7 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter 
{
+public class KahaDBStore extends MessageDatabase implements 
PersistenceAdapter, NoLocalSubscriptionAware {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
 
@@ -1567,4 +1568,13 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter {
     public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
         return new JobSchedulerStoreImpl();
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
+     */
+    @Override
+    public boolean isPersistNoLocal() {
+        // Prior to v11 the broker did not store the noLocal value for durable 
subs.
+        return brokerService.getStoreOpenWireVersion() >= 11;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 31268a3..68f0ed6 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -48,6 +48,7 @@ import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.SharedFileLocker;
 import org.apache.activemq.store.TopicMessageStore;
@@ -69,7 +70,9 @@ import org.slf4j.LoggerFactory;
  *
  * @org.apache.xbean.XBean element="mKahaDB"
  */
-public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport 
implements PersistenceAdapter, BrokerServiceAware {
+public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport 
implements PersistenceAdapter,
+    BrokerServiceAware, NoLocalSubscriptionAware {
+
     static final Logger LOG = 
LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
 
     final static ActiveMQDestination matchAll = new AnyDestination(new 
ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
@@ -532,4 +535,13 @@ public class MultiKahaDBPersistenceAdapter extends 
LockableServiceSupport implem
     public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
         return new JobSchedulerStoreImpl();
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
+     */
+    @Override
+    public boolean isPersistNoLocal() {
+        // Prior to v11 the broker did not store the noLocal value for durable 
subs.
+        return brokerService.getStoreOpenWireVersion() >= 11;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
index c0aee13..899c6a3 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline3Test.java
@@ -388,7 +388,7 @@ public class DurableSubscriptionOffline3Test extends 
DurableSubscriptionOfflineT
         // test offline subs
         con = createConnection("offCli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", null, 
true);
+        consumer = session.createDurableSubscriber(topic, "SubsId", null, 
false);
         consumer.setMessageListener(listener);
 
         Thread.sleep(3 * 1000);

http://git-wip-us.apache.org/repos/asf/activemq/blob/18571ce0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
index ecbfac1..55da21b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
@@ -25,8 +25,6 @@ import java.util.Collection;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -40,13 +38,10 @@ import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;

Reply via email to