[AMQ-6606] avoid partial writes to the end of the journal - revert offset increment on ioexception, fix and test
(cherry picked from commit d53b8f8d424e3cf51646b215007fc017717edf44) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/22d5b51a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/22d5b51a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/22d5b51a Branch: refs/heads/activemq-5.14.x Commit: 22d5b51a0c69b48665f32ea19bad5046d7237426 Parents: 1a67318 Author: gtully <[email protected]> Authored: Tue Feb 21 17:03:46 2017 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Fri Feb 24 14:20:54 2017 -0500 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 2 +- .../store/kahadb/disk/journal/DataFile.java | 4 + .../kahadb/disk/journal/DataFileAppender.java | 41 +++++----- .../DataFileAppenderNoSpaceNoBatchTest.java | 80 ++++++++++++++++++++ 4 files changed, 106 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 2db07f1..5ee6c4c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1132,7 +1132,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } return location; } catch (IOException ioe) { - LOG.error("KahaDB failed to store to Journal", ioe); + LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); brokerService.handleIOException(ioe); throw ioe; } http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index 5b96adf..1532f08 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -72,6 +72,10 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil length += size; } + public synchronized void decrementLength(int size) { + length -= size; + } + @Override public synchronized String toString() { return file.getName() + " number = " + dataFileId + " , length = " + length; http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 1e87331..25c4e28 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -28,6 +28,7 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStra import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,11 +178,6 @@ class DataFileAppender implements FileAppender { thread.setDaemon(true); thread.setName("ActiveMQ Data File Writer"); thread.start(); - firstAsyncException = null; - } - - if (firstAsyncException != null) { - throw firstAsyncException; } while ( true ) { @@ -249,7 +245,6 @@ class DataFileAppender implements FileAppender { int statIdx = 0; int[] stats = new int[maxStat]; - final byte[] end = new byte[]{0}; /** * The async processing loop that writes to the data files and does the * force calls. Since the file sync() call is the slowest of all the @@ -286,7 +281,7 @@ class DataFileAppender implements FileAppender { if (file != null) { if (periodicSync) { if (logger.isTraceEnabled()) { - logger.trace("Syning file {} on rotate", dataFile.getFile().getName()); + logger.trace("Syncing file {} on rotate", dataFile.getFile().getName()); } file.sync(); } @@ -355,20 +350,13 @@ class DataFileAppender implements FileAppender { signalDone(wb); } - } catch (IOException e) { - logger.info("Journal failed while writing at: " + wb.offset); + } catch (Throwable error) { + logger.warn("Journal failed while writing at: " + wb.dataFile.getDataFileId() + ":" + wb.offset, error); synchronized (enqueueMutex) { - firstAsyncException = e; - if (wb != null) { - wb.exception.set(e); - wb.latch.countDown(); - } - if (nextWriteBatch != null) { - nextWriteBatch.exception.set(e); - nextWriteBatch.latch.countDown(); - } + running = false; + signalError(wb, error); + signalError(nextWriteBatch, error); } - } catch (InterruptedException e) { } finally { try { if (file != null) { @@ -396,7 +384,7 @@ class DataFileAppender implements FileAppender { if (!write.sync) { inflightWrites.remove(new Journal.WriteKey(write.location)); } - if (write.onComplete != null) { + if (write.onComplete != null && wb.exception.get() == null) { try { write.onComplete.run(); } catch (Throwable e) { @@ -409,4 +397,17 @@ class DataFileAppender implements FileAppender { // Signal any waiting threads that the write is on disk. wb.latch.countDown(); } + + protected void signalError(WriteBatch wb, Throwable t) { + if (wb != null) { + if (t instanceof IOException) { + wb.exception.set((IOException) t); + // revert batch increment such that next write is contiguous + wb.dataFile.decrementLength(wb.size); + } else { + wb.exception.set(IOExceptionSupport.create(t)); + } + signalDone(wb); + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/22d5b51a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java new file mode 100644 index 0000000..aa6df3f --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.disk.journal; + +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class DataFileAppenderNoSpaceNoBatchTest { + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + private DataFileAppender underTest; + + @Test + public void testNoSpaceNextWriteSameBatch() throws Exception { + final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>()); + + final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) { + public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException { + + return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") { + + public void seek(long pos) throws IOException { + seekPositions.add(pos); + } + + public void write(byte[] bytes, int offset, int len) throws IOException { + throw new IOException("No space on device"); + } + }; + }; + }; + + underTest = new DataFileAppender(new Journal() { + @Override + public DataFile getCurrentDataFile(int capacity) throws IOException { + return currentDataFile; + }; + }); + + final ByteSequence byteSequence = new ByteSequence(new byte[4*1024]); + for (int i=0; i<2; i++) { + try { + underTest.storeItem(byteSequence, (byte) 1, true); + fail("expect no space"); + } catch (IOException expected) { + } + } + + assertEquals("got 2 seeks: " + seekPositions, 2, seekPositions.size()); + assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1)); + + } +}
