Repository: activemq Updated Branches: refs/heads/master 37da75e0e -> 6b8e743b0
AMQ-6372 - add IOExceptionHandler to kahadb read path to ensure fast shutdown on disk access errors Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b8e743b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b8e743b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b8e743b Branch: refs/heads/master Commit: 6b8e743b083b9baf201f26b6f7d9a574638ba537 Parents: 37da75e Author: gtully <[email protected]> Authored: Wed Jul 27 11:57:40 2016 +0100 Committer: gtully <[email protected]> Committed: Wed Jul 27 11:58:22 2016 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/KahaDBStore.java | 28 ++++--- .../JournalCorruptionEofIndexRecoveryTest.java | 13 +++- .../store/kahadb/JournalFdRecoveryTest.java | 78 +++++++++++++++++++- 3 files changed, 102 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 9ed7381..0d20b78 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -1230,17 +1230,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { * @throws IOException */ Message loadMessage(Location location) throws IOException { - JournalCommand<?> command = load(location); - KahaAddMessageCommand addMessage = null; - switch (command.type()) { - case KAHA_UPDATE_MESSAGE_COMMAND: - addMessage = ((KahaUpdateMessageCommand)command).getMessage(); - break; - default: - addMessage = (KahaAddMessageCommand) command; - } - Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); - return msg; + try { + JournalCommand<?> command = load(location); + KahaAddMessageCommand addMessage = null; + switch (command.type()) { + case KAHA_UPDATE_MESSAGE_COMMAND: + addMessage = ((KahaUpdateMessageCommand) command).getMessage(); + break; + default: + addMessage = (KahaAddMessageCommand) command; + } + Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); + return msg; + } catch (IOException ioe) { + LOG.error("Failed to load message at: {}", location , ioe); + brokerService.handleIOException(ioe); + throw ioe; + } } // ///////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index faf0022..6cffe3d 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -371,11 +372,15 @@ public class JournalCorruptionEofIndexRecoveryTest { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); int count = 0; - while (count < max && consumer.receive(5000) != null) { - count++; + try { + while (count < max && consumer.receive(5000) != null) { + count++; + } + } catch (JMSException ok) { + } finally { + consumer.close(); + connection.close(); } - consumer.close(); - connection.close(); return count; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6b8e743b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java index 308b18b..2d398e2 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java @@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.store.kahadb.disk.journal.DataFile; import org.apache.activemq.store.kahadb.disk.journal.Journal; @@ -28,7 +30,9 @@ import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.management.Attribute; @@ -36,8 +40,11 @@ import javax.management.ObjectName; import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class JournalFdRecoveryTest { @@ -80,6 +87,12 @@ public class JournalFdRecoveryTest { broker.setDataDirectory(KAHADB_DIRECTORY); broker.addConnector("tcp://localhost:0"); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(false); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + configurePersistence(broker); connectionUri = "vm://localhost?create=false"; @@ -115,6 +128,40 @@ public class JournalFdRecoveryTest { @Test + public void testStopOnPageInIOError() throws Exception { + startBroker(); + + int sent = produceMessagesToConsumeMultipleDataFiles(50); + + int numFiles = getNumberOfJournalFiles(); + LOG.info("Num journal files: " + numFiles); + + assertTrue("more than x files: " + numFiles, numFiles > 4); + + File dataDir = broker.getPersistenceAdapter().getDirectory(); + + for (int i=2;i<4;i++) { + whackDataFile(dataDir, i); + } + + final CountDownLatch gotShutdown = new CountDownLatch(1); + broker.addShutdownHook(new Runnable() { + @Override + public void run() { + gotShutdown.countDown(); + } + }); + + int received = tryConsume(destination, sent); + assertNotEquals("not all message received", sent, received); + assertTrue("broker got shutdown on page in error", gotShutdown.await(5, TimeUnit.SECONDS)); + } + + private void whackDataFile(File dataDir, int i) throws Exception { + whackFile(dataDir, "db-" + i + ".log"); + } + + @Test public void testRecoveryAfterCorruption() throws Exception { startBroker(); @@ -160,8 +207,12 @@ public class JournalFdRecoveryTest { return result; } - private void whackIndex(File dataDir) { - File indexToDelete = new File(dataDir, "db.data"); + private void whackIndex(File dataDir) throws Exception { + whackFile(dataDir, "db.data"); + } + + private void whackFile(File dataDir, String name) throws Exception { + File indexToDelete = new File(dataDir, name); LOG.info("Whacking index: " + indexToDelete); indexToDelete.delete(); } @@ -195,6 +246,29 @@ public class JournalFdRecoveryTest { return sent; } + private int tryConsume(Destination destination, int numToGet) throws Exception { + int got = 0; + Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < numToGet; i++) { + if (consumer.receive(4000) == null) { + // give up on timeout or error + break; + } + got++; + + } + } catch (JMSException ok) { + } finally { + connection.close(); + } + + return got; + } + private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { return produceMessages(destination, numToSend); }
