Author: gtully
Date: Fri Jan 27 12:47:39 2012
New Revision: 1236661
URL: http://svn.apache.org/viewvc?rev=1236661&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3634 - ensure full recovery of the
index, irrespective of the load failure reason, with additional tests. Shutdown
the schedualler early to ensure no ugly errors from timer tasks during shutdown
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Jan 27 12:47:39 2012
@@ -588,6 +588,10 @@ public class BrokerService implements Se
LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " +
brokerId + ") is shutting down");
removeShutdownHook();
+ if (this.scheduler != null) {
+ this.scheduler.stop();
+ this.scheduler = null;
+ }
ServiceStopper stopper = new ServiceStopper();
if (services != null) {
for (Service service : services) {
@@ -645,10 +649,6 @@ public class BrokerService implements Se
this.taskRunnerFactory.shutdown();
this.taskRunnerFactory = null;
}
- if (this.scheduler != null) {
- this.scheduler.stop();
- this.scheduler = null;
- }
if (this.executor != null) {
this.executor.shutdownNow();
this.executor = null;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Jan 27 12:47:39 2012
@@ -300,8 +300,11 @@ public abstract class MessageDatabase ex
getJournal().start();
try {
loadPageFile();
- } catch (IOException ioe) {
- LOG.warn("Index corrupted, trying to recover ...", ioe);
+ } catch (Throwable t) {
+ LOG.warn("Index corrupted. Recovering the index through
journal replay. Cause:" + t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Index load failure", t);
+ }
// try to recover index
try {
pageFile.unload();
@@ -311,6 +314,8 @@ public abstract class MessageDatabase ex
} else {
pageFile.delete();
}
+ metadata = new Metadata();
+ pageFile = null;
loadPageFile();
}
startCheckpoint();
@@ -383,11 +388,13 @@ public abstract class MessageDatabase ex
try {
this.indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new
Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException
{
- checkpointUpdate(tx, true);
- }
- });
+ if (metadata.page != null) {
+ pageFile.tx().execute(new
Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws
IOException {
+ checkpointUpdate(tx, true);
+ }
+ });
+ }
pageFile.unload();
metadata = new Metadata();
} finally {
@@ -413,11 +420,13 @@ public abstract class MessageDatabase ex
metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation =
getFirstInProgressTxLocation();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
+ if (metadata.page != null) {
+ pageFile.tx().execute(new
Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException
{
+ tx.store(metadata.page, metadataMarshaller, true);
+ }
+ });
+ }
}
} finally {
this.indexLock.writeLock().unlock();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=1236661&r1=1236660&r2=1236661&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Fri Jan 27 12:47:39 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.*;
+import org.apache.kahadb.page.PageFile;
import java.io.File;
import java.io.RandomAccessFile;
@@ -34,6 +35,9 @@ import java.util.ArrayList;
*/
public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
+ enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt };
+ public CorruptionType failTest = CorruptionType.None;
+
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
KahaDBStore kaha = new KahaDBStore();
@@ -47,10 +51,27 @@ public class KahaDBStoreRecoveryBrokerTe
// corrupting index
File index = new File("target/activemq-data/kahadb/db.data");
- index.delete();
RandomAccessFile raf = new RandomAccessFile(index, "rw");
- raf.seek(index.length());
- raf.writeBytes("corrupt");
+ switch (failTest) {
+ case FailToLoad:
+ index.delete();
+ raf = new RandomAccessFile(index, "rw");
+ raf.seek(index.length());
+ raf.writeBytes("corrupt");
+ break;
+ case LoadInvalid:
+ // page size 0
+ raf.seek(0);
+ raf.writeBytes("corrupt and cannot load metadata");
+ break;
+ case LoadCorrupt:
+ // loadable but invalid metadata
+ // location of order index low priority index for first
destination...
+ raf.seek(8*1024 + 57);
+ raf.writeLong(Integer.MAX_VALUE-10);
+ break;
+ default:
+ }
raf.close();
// starting broker
@@ -71,7 +92,10 @@ public class KahaDBStoreRecoveryBrokerTe
junit.textui.TestRunner.run(suite());
}
-
+ public void
initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() {
+ this.addCombinationValues("failTest", new
CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid,
CorruptionType.LoadCorrupt} );
+ }
+
public void testLargeQueuePersistentMessagesNotLostOnRestart() throws
Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");