Repository: activemq
Updated Branches:
  refs/heads/master bfbdd3c5a -> b07821ab6


[AMQ-6625] remove kahadbioexceptionhandler by pushing allowIOResumption into 
persistence adapter. This allows the lease locker to still be used with kahadb 
for stopStartConnectors support


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b07821ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b07821ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b07821ab

Branch: refs/heads/master
Commit: b07821ab6494b43e1ae8877eaf740effc7896b84
Parents: bfbdd3c
Author: gtully <gary.tu...@gmail.com>
Authored: Thu Jun 15 17:27:47 2017 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Thu Jun 15 17:28:31 2017 +0100

----------------------------------------------------------------------
 .../activemq/store/PersistenceAdapter.java      |  2 +
 .../store/memory/MemoryPersistenceAdapter.java  |  3 ++
 .../util/DefaultIOExceptionHandler.java         |  5 +++
 .../store/jdbc/JDBCPersistenceAdapter.java      |  3 ++
 .../journal/JournalPersistenceAdapter.java      |  5 +++
 .../store/kahadb/KahaDBIOExceptionHandler.java  | 43 --------------------
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  5 +++
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |  7 ++++
 .../activemq/store/kahadb/TempKahaDBStore.java  |  7 ++++
 .../apache/activemq/leveldb/LevelDBStore.scala  |  2 +
 .../leveldb/replicated/ProxyLevelDBStore.scala  |  2 +
 .../RedeliveryRestartWithExceptionTest.java     |  7 +++-
 12 files changed, 47 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 01a9634..07063b4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -206,4 +206,6 @@ public interface PersistenceAdapter extends Service {
      * @return the last stored sequence id or -1 if no suppression needed
      */
     long getLastProducerSequenceId(ProducerId id) throws IOException;
+
+    void allowIOResumption();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 5c073c3..a1233e0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -238,6 +238,9 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter, NoLocalSubs
     }
 
     @Override
+    public void allowIOResumption() {}
+
+    @Override
     public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
         // We could eventuall implement an in memory scheduler.
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
 
b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
index 0ee6743..7668364 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
@@ -166,6 +166,11 @@ import org.slf4j.LoggerFactory;
     }
 
     protected void allowIOResumption() {
+        try {
+            broker.getPersistenceAdapter().allowIOResumption();
+        } catch (IOException e) {
+            LOG.warn("Failed to allow IO resumption", e);
+        }
     }
 
     private void stopBroker(Exception exception) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 5da7592..a6ad870 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -292,6 +292,9 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
     }
 
     @Override
+    public void allowIOResumption() {}
+
+    @Override
     public void init() throws Exception {
         
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 787b277..10b5c7a 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -800,6 +800,11 @@ public class JournalPersistenceAdapter implements 
PersistenceAdapter, JournalEve
     }
 
     @Override
+    public void allowIOResumption() {
+        longTermPersistence.allowIOResumption();
+    }
+
+    @Override
     public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
         return longTermPersistence.createJobSchedulerStore();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java
deleted file mode 100644
index 9ddc6d2..0000000
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * @org.apache.xbean.XBean
- */
-public class KahaDBIOExceptionHandler extends DefaultIOExceptionHandler {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(KahaDBIOExceptionHandler.class);
-
-    protected void allowIOResumption() {
-        try {
-            if (broker.getPersistenceAdapter() instanceof 
KahaDBPersistenceAdapter) {
-                KahaDBPersistenceAdapter kahaDBPersistenceAdapter = 
(KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-                kahaDBPersistenceAdapter.getStore().allowIOResumption();
-            }
-        } catch (IOException e) {
-            LOG.warn("Failed to allow IO resumption", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 297f844..c4f480c 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -163,6 +163,11 @@ public class KahaDBPersistenceAdapter extends 
LockableServiceSupport implements
         return this.letter.getLastProducerSequenceId(id);
     }
 
+    @Override
+    public void allowIOResumption() {
+        this.letter.allowIOResumption();
+    }
+
     /**
      * @param destination
      * @see 
org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4fa6b3d..4a30d0a 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -289,6 +289,13 @@ public class MultiKahaDBPersistenceAdapter extends 
LockableServiceSupport implem
     }
 
     @Override
+    public void allowIOResumption() {
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            persistenceAdapter.allowIOResumption();
+        }
+    }
+
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         PersistenceAdapter adapter = null;
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 9686913..7048b09 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -648,6 +648,13 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
     }
 
     @Override
+    public void allowIOResumption() {
+        if (pageFile != null) {
+            pageFile.allowIOResumption();
+        }
+    }
+
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index cb28173..593ec9e 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -1147,4 +1147,6 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
   def rollbackTransaction(context: ConnectionContext): Unit = {}
 
   def createClient = new LevelDBClient(this);
+
+  def allowIOResumption() = {}
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
index 7b62a95..9822fe2 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala
@@ -132,4 +132,6 @@ abstract class ProxyLevelDBStore extends 
LockableServiceSupport with BrokerServi
   def removePList(name: String): Boolean = {
     return proxy_target.removePList(name)
   }
+
+  def allowIOResumption() = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
index 2d840ab..b032e25 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -416,7 +416,12 @@ public class RedeliveryRestartWithExceptionTest extends 
TestSupport {
         public long getLastProducerSequenceId(ProducerId id) throws 
IOException {
             return kahaDB.getLastProducerSequenceId(id);
         }
-        
+
+        @Override
+        public void allowIOResumption() {
+            kahaDB.allowIOResumption();
+        }
+
     }
     
     private class ProxyMessageStoreWithUpdateException extends 
ProxyMessageStore {

Reply via email to