Repository: activemq
Updated Branches:
  refs/heads/master f82eccd2f -> 8c3ef6cad


[AMQ-6815] have checkpoint validate status of async writes to avoid stale 
metadata and validate location size on read to avoid potential oom on restart


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8c3ef6ca
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8c3ef6ca
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8c3ef6ca

Branch: refs/heads/master
Commit: 8c3ef6cadb46d9694c68aa649a7952eb1612279f
Parents: f82eccd
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Sep 19 16:51:00 2017 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Sep 19 16:51:00 2017 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  5 +-
 .../kahadb/disk/journal/DataFileAccessor.java   |  4 +-
 .../kahadb/disk/journal/DataFileAppender.java   |  6 +-
 .../store/kahadb/disk/journal/Location.java     | 11 ++-
 .../JournalCorruptionEofIndexRecoveryTest.java  | 56 ++++++++++++++
 .../store/kahadb/MessageDatabaseTest.java       | 81 ++++++++++++++++++++
 .../DataFileAppenderNoSpaceNoBatchTest.java     |  6 ++
 7 files changed, 159 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index a6d3cc8..b391de7 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2132,6 +2132,9 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             Location location = store(new 
KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), 
nullCompletionCallback);
             try {
                 location.getLatch().await();
+                if (location.getBatch().exception.get() != null) {
+                    throw location.getBatch().exception.get();
+                }
             } catch (InterruptedException e) {
                 throw new InterruptedIOException(e.toString());
             }
@@ -3135,7 +3138,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         return index;
     }
 
-    private Journal createJournal() throws IOException {
+    protected Journal createJournal() throws IOException {
         Journal manager = new Journal();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 548d3b1..57df143 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -84,7 +84,9 @@ final class DataFileAccessor {
             } else {
                 file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
             }
-
+            if ((long)location.getOffset() + location.getSize() > 
dataFile.length) {
+                throw new IOException("Invalid location size: " + location + 
", size: " + location.getSize());
+            }
             byte[] data = new byte[location.getSize() - 
Journal.RECORD_HEAD_SPACE];
             file.readFully(data);
             return new ByteSequence(data, 0, data.length);

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index bf1c25f..fa084f1 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -127,7 +127,7 @@ class DataFileAppender implements FileAppender {
         Journal.WriteCommand write = new Journal.WriteCommand(location, data, 
sync);
 
         WriteBatch batch = enqueue(write);
-        location.setLatch(batch.latch);
+        location.setBatch(batch);
         if (sync) {
             try {
                 batch.latch.await();
@@ -153,10 +153,8 @@ class DataFileAppender implements FileAppender {
         location.setType(type);
 
         Journal.WriteCommand write = new Journal.WriteCommand(location, data, 
onComplete);
+        location.setBatch(enqueue(write));
 
-        WriteBatch batch = enqueue(write);
-
-        location.setLatch(batch.latch);
         return location;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
index 7c02e96..f3da47a 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Location.java
@@ -36,7 +36,7 @@ public final class Location implements Comparable<Location> {
     private int offset = NOT_SET;
     private int size = NOT_SET;
     private byte type = NOT_SET_TYPE;
-    private CountDownLatch latch;
+    private DataFileAppender.WriteBatch batch;
 
     public Location() {
     }
@@ -114,11 +114,11 @@ public final class Location implements 
Comparable<Location> {
     }
 
     public CountDownLatch getLatch() {
-        return latch;
+        return batch.latch;
     }
 
-    public void setLatch(CountDownLatch latch) {
-        this.latch = latch;
+    public void setBatch(DataFileAppender.WriteBatch batch) {
+        this.batch = batch;
     }
 
     public int compareTo(Location o) {
@@ -142,4 +142,7 @@ public final class Location implements Comparable<Location> 
{
         return dataFileId ^ offset;
     }
 
+    public DataFileAppender.WriteBatch getBatch() {
+        return batch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 16598ea..614242e 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -46,8 +47,11 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -193,6 +197,58 @@ public class JournalCorruptionEofIndexRecoveryTest {
     }
 
     @Test
+    public void testRecoveryAfterCorruptionMetadataLocation() throws Exception 
{
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+
+        assertTrue("more than x files: " + numFiles, numFiles > 2);
+
+        broker.getPersistenceAdapter().checkpoint(true);
+        Location location = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getMetadata().producerSequenceIdTrackerLocation;
+
+        DataFile dataFile = ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
+        RecoverableRandomAccessFile randomAccessFile = 
dataFile.openRandomAccessFile();
+        randomAccessFile.seek(location.getOffset());
+        randomAccessFile.writeInt(Integer.MAX_VALUE);
+        randomAccessFile.getChannel().force(true);
+
+        ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().close();
+        try {
+            broker.stop();
+            broker.waitUntilStopped();
+        } catch (Exception expected) {
+        } finally {
+            broker = null;
+        }
+
+        AtomicBoolean trappedExpectedLogMessage = new AtomicBoolean(false);
+        DefaultTestAppender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel() == Level.WARN
+                        && event.getRenderedMessage().contains("Cannot recover 
message audit")
+                        && 
event.getThrowableInformation().getThrowable().getLocalizedMessage().contains("Invalid
 location size")) {
+                    trappedExpectedLogMessage.set(true);
+                }
+            }
+        };
+        org.apache.log4j.Logger.getRootLogger().addAppender(appender);
+
+
+        try {
+            restartBroker(false);
+        } finally {
+            org.apache.log4j.Logger.getRootLogger().removeAppender(appender);
+        }
+
+        assertEquals("no missing message", 50, 
broker.getAdminView().getTotalMessageCount());
+        assertTrue("Did replay records on invalid location size", 
trappedExpectedLogMessage.get());
+    }
+
+    @Test
     public void testRecoveryAfterCorruptionCheckSum() throws Exception {
         startBroker();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
new file mode 100644
index 0000000..d09be68
--- /dev/null
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class MessageDatabaseTest {
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test
+    public void testCheckPointCleanupErrorBubblesUp() throws Exception {
+
+        CountDownLatch traceCommandComplete = new CountDownLatch(1);
+        KahaDBStore kaha = new KahaDBStore() {
+            public Journal createJournal() {
+                Journal journal = new Journal() {
+                    public boolean isChecksum() {
+                        // allow trace command on start
+
+                        if (traceCommandComplete.getCount() > 0) {
+                            traceCommandComplete.countDown();
+                            return false;
+                        }
+
+                        // called from processQ, we can throw here to error 
out the async write
+                        throw new RuntimeException("Fail with error on 
processQ");
+                    }
+                };
+                journal.setDirectory(directory);
+                return journal;
+            }
+        };
+        kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha"));
+        kaha.setCheckpointInterval(0l); // disable periodic checkpoint
+        kaha.setBrokerService(new BrokerService() {
+            public void handleIOException(IOException exception) {
+                exception.printStackTrace();
+            }
+        });
+        kaha.start();
+
+        assertTrue(traceCommandComplete.await(5, TimeUnit.SECONDS));
+
+        try {
+            kaha.checkpoint(false);
+            fail("expect error on first store from checkpoint");
+        } catch (Exception expected) {
+        }
+
+        assertNull("audit location should be null", 
kaha.getMetadata().producerSequenceIdTrackerLocation);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/8c3ef6ca/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index d164ab5..a6b19ee 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -181,6 +181,12 @@ public class DataFileAppenderNoSpaceNoBatchTest {
             assertTrue("write complete", latch.await(5, TimeUnit.SECONDS));
         }
 
+        boolean someExceptions = false;
+        for (Location location: locations) {
+            someExceptions |= (location.getBatch().exception != null);
+        }
+        assertTrue(someExceptions);
+
         LOG.info("Latches count: " + latches.size());
         LOG.info("Seeks: " + seekPositions);
 

Reply via email to