Repository: activemq Updated Branches: refs/heads/master 070703133 -> c5a8b2c8b
[AMQ-6625] ensure kahadb stops operation on the first IOException to facilitate auto recovery from partial writes Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c5a8b2c8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c5a8b2c8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c5a8b2c8 Branch: refs/heads/master Commit: c5a8b2c8b11121f4999d9f05932cd040d8c9f2a7 Parents: 0707031 Author: gtully <[email protected]> Authored: Mon Mar 13 13:49:41 2017 +0000 Committer: gtully <[email protected]> Committed: Mon Mar 13 13:49:41 2017 +0000 ---------------------------------------------------------------------- .../util/DefaultIOExceptionHandler.java | 6 +++ .../store/kahadb/KahaDBIOExceptionHandler.java | 43 +++++++++++++++++ .../activemq/store/kahadb/MessageDatabase.java | 9 ++++ .../kahadb/disk/journal/DataFileAppender.java | 1 + .../store/kahadb/disk/journal/Journal.java | 7 +++ .../store/kahadb/disk/page/PageFile.java | 11 +++++ .../DataFileAppenderNoSpaceNoBatchTest.java | 51 ++++++++++++++++++-- 7 files changed, 124 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java index 06bb0ee..0ee6743 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; @Override public void handle(IOException exception) { if (ignoreAllErrors) { + allowIOResumption(); LOG.info("Ignoring IO exception, " + exception, exception); return; } @@ -62,6 +63,7 @@ import org.slf4j.LoggerFactory; String message = cause.getMessage(); if (message != null && message.contains(noSpaceMessage)) { LOG.info("Ignoring no space left exception, " + exception, exception); + allowIOResumption(); return; } cause = cause.getCause(); @@ -106,6 +108,7 @@ import org.slf4j.LoggerFactory; @Override public void run() { try { + allowIOResumption(); while (hasLockOwnership() && isPersistenceAdapterDown()) { LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); @@ -162,6 +165,9 @@ import org.slf4j.LoggerFactory; throw new SuppressReplyException("ShutdownBrokerInitiated", exception); } + protected void allowIOResumption() { + } + private void stopBroker(Exception exception) { LOG.info("Stopping " + broker + " due to exception, " + exception, exception); new Thread("IOExceptionHandler: stopping " + broker) { http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java new file mode 100644 index 0000000..9ddc6d2 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * @org.apache.xbean.XBean + */ +public class KahaDBIOExceptionHandler extends DefaultIOExceptionHandler { + + private static final Logger LOG = LoggerFactory + .getLogger(KahaDBIOExceptionHandler.class); + + protected void allowIOResumption() { + try { + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.getStore().allowIOResumption(); + } + } catch (IOException e) { + LOG.warn("Failed to allow IO resumption", e); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4fdaa01..ea53ad0 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -302,6 +302,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe unload(); } + public void allowIOResumption() { + if (pageFile != null) { + pageFile.allowIOResumption(); + } + if (journal != null) { + journal.allowIOResumption(); + } + } + private void loadPageFile() throws IOException { this.indexLock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 3153a50..bf1c25f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -353,6 +353,7 @@ class DataFileAppender implements FileAppender { } catch (Throwable error) { logger.warn("Journal failed while writing at: " + wb.dataFile.getDataFileId() + ":" + wb.offset, error); synchronized (enqueueMutex) { + shutdown = true; running = false; signalError(wb, error); if (nextWriteBatch != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index cd8a84b..a78cc65 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -109,6 +109,13 @@ public class Journal { return accessorPool; } + public void allowIOResumption() { + if (appender instanceof DataFileAppender) { + DataFileAppender dataFileAppender = (DataFileAppender)appender; + dataFileAppender.shutdown = false; + } + } + public enum PreallocationStrategy { SPARSE_FILE, OS_KERNEL_COPY, http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index f56539b..052a586 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -493,6 +493,10 @@ public class PageFile { return loaded.get(); } + public void allowIOResumption() { + loaded.set(true); + } + /** * Flush and sync all write buffers to disk. * @@ -1101,6 +1105,13 @@ public class PageFile { if (enableDiskSyncs) { writeFile.sync(); } + + } catch (IOException ioError) { + LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError); + // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates + // to ensure disk image is self consistent + loaded.set(false); + throw ioError; } finally { synchronized (writes) { for (PageWrite w : batch) { http://git-wip-us.apache.org/repos/asf/activemq/blob/c5a8b2c8/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java index ec68d13..d164ab5 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -80,6 +80,7 @@ public class DataFileAppenderNoSpaceNoBatchTest { underTest.storeItem(byteSequence, (byte) 1, true); fail("expect no space"); } catch (IOException expected) { + underTest.shutdown = false; } } @@ -88,7 +89,45 @@ public class DataFileAppenderNoSpaceNoBatchTest { } - + @Test + public void testSingleNoSpaceNextWriteSameBatch() throws Exception { + final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>()); + + final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) { + public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException { + + return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") { + + public void seek(long pos) throws IOException { + seekPositions.add(pos); + } + + public void write(byte[] bytes, int offset, int len) throws IOException { + throw new IOException("No space on device"); + } + }; + }; + }; + + underTest = new DataFileAppender(new Journal() { + @Override + public DataFile getCurrentDataFile(int capacity) throws IOException { + return currentDataFile; + }; + }); + + final ByteSequence byteSequence = new ByteSequence(new byte[4*1024]); + for (int i=0; i<2; i++) { + try { + underTest.storeItem(byteSequence, (byte) 1, true); + fail("expect no space"); + } catch (IOException expected) { + } + } + + assertEquals("got 1 seeks: " + seekPositions, 1, seekPositions.size()); + } + @Test(timeout = 10000) public void testNoSpaceNextWriteSameBatchAsync() throws Exception { final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>()); @@ -129,9 +168,13 @@ public class DataFileAppenderNoSpaceNoBatchTest { ConcurrentLinkedQueue<Location> locations = new ConcurrentLinkedQueue<Location>(); HashSet<CountDownLatch> latches = new HashSet<CountDownLatch>(); for (int i = 0; i <= 20; i++) { - Location location = underTest.storeItem(byteSequence, (byte) 1, false); - locations.add(location); - latches.add(location.getLatch()); + try { + Location location = underTest.storeItem(byteSequence, (byte) 1, false); + locations.add(location); + latches.add(location.getLatch()); + } catch (IOException expected) { + underTest.shutdown = false; + } } for (CountDownLatch latch: latches) {
