Repository: activemq Updated Branches: refs/heads/master f82eccd2f -> 8c3ef6cad
[AMQ-6815] have checkpoint validate status of async writes to avoid stale metadata and validate location size on read to avoid potential oom on restart Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8c3ef6ca Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8c3ef6ca Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8c3ef6ca Branch: refs/heads/master Commit: 8c3ef6cadb46d9694c68aa649a7952eb1612279f Parents: f82eccd Author: gtully <gary.tu...@gmail.com> Authored: Tue Sep 19 16:51:00 2017 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Sep 19 16:51:00 2017 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 5 +- .../kahadb/disk/journal/DataFileAccessor.java | 4 +- .../kahadb/disk/journal/DataFileAppender.java | 6 +- .../store/kahadb/disk/journal/Location.java | 11 ++- .../JournalCorruptionEofIndexRecoveryTest.java | 56 ++++++++++++++ .../store/kahadb/MessageDatabaseTest.java | 81 ++++++++++++++++++++ .../DataFileAppenderNoSpaceNoBatchTest.java | 6 ++ 7 files changed, 159 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index a6d3cc8..b391de7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2132,6 +2132,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); try { location.getLatch().await(); + if (location.getBatch().exception.get() != null) { + throw location.getBatch().exception.get(); + } } catch (InterruptedException e) { throw new InterruptedIOException(e.toString()); } @@ -3135,7 +3138,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return index; } - private Journal createJournal() throws IOException { + protected Journal createJournal() throws IOException { Journal manager = new Journal(); manager.setDirectory(directory); manager.setMaxFileLength(getJournalMaxFileLength()); http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 548d3b1..57df143 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -84,7 +84,9 @@ final class DataFileAccessor { } else { file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE); } - + if ((long)location.getOffset() + location.getSize() > dataFile.length) { + throw new IOException("Invalid location size: " + location + ", size: " + location.getSize()); + } byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE]; file.readFully(data); return new ByteSequence(data, 0, data.length); http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index bf1c25f..fa084f1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -127,7 +127,7 @@ class DataFileAppender implements FileAppender { Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); WriteBatch batch = enqueue(write); - location.setLatch(batch.latch); + location.setBatch(batch); if (sync) { try { batch.latch.await(); @@ -153,10 +153,8 @@ class DataFileAppender implements FileAppender { location.setType(type); Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); + location.setBatch(enqueue(write)); - WriteBatch batch = enqueue(write); - - location.setLatch(batch.latch); return location; } http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java index 7c02e96..f3da47a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java @@ -36,7 +36,7 @@ public final class Location implements Comparable<Location> { private int offset = NOT_SET; private int size = NOT_SET; private byte type = NOT_SET_TYPE; - private CountDownLatch latch; + private DataFileAppender.WriteBatch batch; public Location() { } @@ -114,11 +114,11 @@ public final class Location implements Comparable<Location> { } public CountDownLatch getLatch() { - return latch; + return batch.latch; } - public void setLatch(CountDownLatch latch) { - this.latch = latch; + public void setBatch(DataFileAppender.WriteBatch batch) { + this.batch = batch; } public int compareTo(Location o) { @@ -142,4 +142,7 @@ public final class Location implements Comparable<Location> { return dataFileId ^ offset; } + public DataFileAppender.WriteBatch getBatch() { + return batch; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 16598ea..614242e 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -46,8 +47,11 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -193,6 +197,58 @@ public class JournalCorruptionEofIndexRecoveryTest { } @Test + public void testRecoveryAfterCorruptionMetadataLocation() throws Exception { + startBroker(); + + produceMessagesToConsumeMultipleDataFiles(50); + + int numFiles = getNumberOfJournalFiles(); + + assertTrue("more than x files: " + numFiles, numFiles > 2); + + broker.getPersistenceAdapter().checkpoint(true); + Location location = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation; + + DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId())); + RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile(); + randomAccessFile.seek(location.getOffset()); + randomAccessFile.writeInt(Integer.MAX_VALUE); + randomAccessFile.getChannel().force(true); + + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().close(); + try { + broker.stop(); + broker.waitUntilStopped(); + } catch (Exception expected) { + } finally { + broker = null; + } + + AtomicBoolean trappedExpectedLogMessage = new AtomicBoolean(false); + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel() == Level.WARN + && event.getRenderedMessage().contains("Cannot recover message audit") + && event.getThrowableInformation().getThrowable().getLocalizedMessage().contains("Invalid location size")) { + trappedExpectedLogMessage.set(true); + } + } + }; + org.apache.log4j.Logger.getRootLogger().addAppender(appender); + + + try { + restartBroker(false); + } finally { + org.apache.log4j.Logger.getRootLogger().removeAppender(appender); + } + + assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount()); + assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get()); + } + + @Test public void testRecoveryAfterCorruptionCheckSum() throws Exception { startBroker(); http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java new file mode 100644 index 0000000..d09be68 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class MessageDatabaseTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testCheckPointCleanupErrorBubblesUp() throws Exception { + + CountDownLatch traceCommandComplete = new CountDownLatch(1); + KahaDBStore kaha = new KahaDBStore() { + public Journal createJournal() { + Journal journal = new Journal() { + public boolean isChecksum() { + // allow trace command on start + + if (traceCommandComplete.getCount() > 0) { + traceCommandComplete.countDown(); + return false; + } + + // called from processQ, we can throw here to error out the async write + throw new RuntimeException("Fail with error on processQ"); + } + }; + journal.setDirectory(directory); + return journal; + } + }; + kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha")); + kaha.setCheckpointInterval(0l); // disable periodic checkpoint + kaha.setBrokerService(new BrokerService() { + public void handleIOException(IOException exception) { + exception.printStackTrace(); + } + }); + kaha.start(); + + assertTrue(traceCommandComplete.await(5, TimeUnit.SECONDS)); + + try { + kaha.checkpoint(false); + fail("expect error on first store from checkpoint"); + } catch (Exception expected) { + } + + assertNull("audit location should be null", kaha.getMetadata().producerSequenceIdTrackerLocation); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java index d164ab5..a6b19ee 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -181,6 +181,12 @@ public class DataFileAppenderNoSpaceNoBatchTest { assertTrue("write complete", latch.await(5, TimeUnit.SECONDS)); } + boolean someExceptions = false; + for (Location location: locations) { + someExceptions |= (location.getBatch().exception != null); + } + assertTrue(someExceptions); + LOG.info("Latches count: " + latches.size()); LOG.info("Seeks: " + seekPositions);