https://issues.apache.org/jira/browse/AMQ-3725 - allow kahadb to recover from failed file system
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7ec13f21 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7ec13f21 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7ec13f21 Branch: refs/heads/activemq-5.9 Commit: 7ec13f21104d46f2d4fe16e8ace522e7ac8bfa89 Parents: 50f37be Author: Dejan Bosanac <[email protected]> Authored: Thu Oct 31 17:58:40 2013 +0100 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Mar 11 21:19:06 2014 -0400 ---------------------------------------------------------------------- .../CallerBufferingDataFileAppender.java | 3 +- .../store/kahadb/disk/journal/DataFile.java | 7 +- .../kahadb/disk/journal/DataFileAccessor.java | 3 +- .../kahadb/disk/journal/DataFileAppender.java | 4 +- .../store/kahadb/disk/page/PageFile.java | 19 +- .../util/RecoverableRandomAccessFile.java | 407 +++++++++++++++++++ 6 files changed, 425 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java index 92245ab..ff11848 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java @@ -22,6 +22,7 @@ import java.util.zip.Adler32; import java.util.zip.Checksum; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; +import org.apache.activemq.util.RecoverableRandomAccessFile; /** * An optimized writer to do batch appends to a data file. This object is thread @@ -82,7 +83,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender { @Override protected void processQueue() { DataFile dataFile = null; - RandomAccessFile file = null; + RecoverableRandomAccessFile file = null; WriteBatch wb = null; try { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index e014b8e..d5762d2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -23,6 +23,7 @@ import java.io.RandomAccessFile; import org.apache.activemq.store.kahadb.disk.util.LinkedNode; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.RecoverableRandomAccessFile; /** * DataFile @@ -67,11 +68,11 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil return file.getName() + " number = " + dataFileId + " , length = " + length; } - public synchronized RandomAccessFile openRandomAccessFile() throws IOException { - return new RandomAccessFile(file.getCanonicalPath(), "rw"); + public synchronized RecoverableRandomAccessFile openRandomAccessFile() throws IOException { + return new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw"); } - public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException { + public synchronized void closeRandomAccessFile(RecoverableRandomAccessFile file) throws IOException { file.close(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 983d7de..7781b7e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -21,6 +21,7 @@ import java.io.RandomAccessFile; import java.util.Map; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.RecoverableRandomAccessFile; /** * Optimized Store reader and updater. Single threaded and synchronous. Use in @@ -32,7 +33,7 @@ final class DataFileAccessor { private final DataFile dataFile; private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites; - private final RandomAccessFile file; + private final RecoverableRandomAccessFile file; private boolean disposed; /** http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index f3f7af3..095db52 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -28,6 +28,7 @@ import java.util.zip.Checksum; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; +import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,7 +278,7 @@ class DataFileAppender implements FileAppender { */ protected void processQueue() { DataFile dataFile = null; - RandomAccessFile file = null; + RecoverableRandomAccessFile file = null; WriteBatch wb = null; try { @@ -373,6 +374,7 @@ class DataFileAppender implements FileAppender { signalDone(wb); } } catch (IOException e) { + logger.info("Journal failed while writing at: " + wb.offset); synchronized (enqueueMutex) { firstAsyncException = e; if (wb != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index 12ba275..3f107a6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -42,12 +42,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.apache.activemq.util.DataByteArrayOutputStream; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.IOHelper; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.LFUCache; -import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.*; import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.slf4j.Logger; @@ -85,11 +80,11 @@ public class PageFile { private final String name; // File handle used for reading pages.. - private RandomAccessFile readFile; + private RecoverableRandomAccessFile readFile; // File handle used for writing pages.. - private RandomAccessFile writeFile; + private RecoverableRandomAccessFile writeFile; // File handle used for writing pages.. - private RandomAccessFile recoveryFile; + private RecoverableRandomAccessFile recoveryFile; // The size of pages private int pageSize = DEFAULT_PAGE_SIZE; @@ -377,8 +372,8 @@ public class PageFile { File file = getMainPageFile(); IOHelper.mkdirs(file.getParentFile()); - writeFile = new RandomAccessFile(file, "rw"); - readFile = new RandomAccessFile(file, "r"); + writeFile = new RecoverableRandomAccessFile(file, "rw"); + readFile = new RecoverableRandomAccessFile(file, "r"); if (readFile.length() > 0) { // Load the page size setting cause that can't change once the file is created. @@ -397,7 +392,7 @@ public class PageFile { } if (enableRecoveryFile) { - recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw"); + recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); } if (metaData.isCleanShutdown()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/7ec13f21/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java new file mode 100644 index 0000000..fbb3212 --- /dev/null +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.*; + +public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable { + + RandomAccessFile raf; + File file; + String mode; + + public RecoverableRandomAccessFile(File file, String mode) throws FileNotFoundException { + this.file = file; + this.mode = mode; + raf = new RandomAccessFile(file, mode); + } + + public RecoverableRandomAccessFile(String name, String mode) throws FileNotFoundException { + this.file = new File(name); + this.mode = mode; + raf = new RandomAccessFile(file, mode); + } + + protected RandomAccessFile getRaf() throws IOException { + if (raf == null) { + raf = new RandomAccessFile(file, mode); + } + return raf; + } + + protected void handleException() throws IOException { + try { + if (raf != null) { + raf.close(); + } + } catch (Throwable ignore) { + } finally { + raf = null; + } + } + + @Override + public void close() throws IOException { + raf.close(); + } + + @Override + public void readFully(byte[] bytes) throws IOException { + try { + getRaf().readFully(bytes); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void readFully(byte[] bytes, int i, int i2) throws IOException { + try { + getRaf().readFully(bytes, i, i2); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int skipBytes(int i) throws IOException { + try { + return getRaf().skipBytes(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public boolean readBoolean() throws IOException { + try { + return getRaf().readBoolean(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public byte readByte() throws IOException { + try { + return getRaf().readByte(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readUnsignedByte() throws IOException { + try { + return getRaf().readUnsignedByte(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public short readShort() throws IOException { + try { + return getRaf().readShort(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readUnsignedShort() throws IOException { + try { + return getRaf().readUnsignedShort(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public char readChar() throws IOException { + try { + return getRaf().readChar(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public int readInt() throws IOException { + try { + return getRaf().readInt(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public long readLong() throws IOException { + try { + return getRaf().readLong(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public float readFloat() throws IOException { + try { + return getRaf().readFloat(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public double readDouble() throws IOException { + try { + return getRaf().readDouble(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public String readLine() throws IOException { + try { + return getRaf().readLine(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public String readUTF() throws IOException { + try { + return getRaf().readUTF(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(int i) throws IOException { + try { + getRaf().write(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(byte[] bytes) throws IOException { + try { + getRaf().write(bytes); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void write(byte[] bytes, int i, int i2) throws IOException { + try { + getRaf().write(bytes, i, i2); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeBoolean(boolean b) throws IOException { + try { + getRaf().writeBoolean(b); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeByte(int i) throws IOException { + try { + getRaf().writeByte(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeShort(int i) throws IOException { + try { + getRaf().writeShort(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeChar(int i) throws IOException { + try { + getRaf().writeChar(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeInt(int i) throws IOException { + try { + getRaf().writeInt(i); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeLong(long l) throws IOException { + try { + getRaf().writeLong(l); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeFloat(float v) throws IOException { + try { + getRaf().writeFloat(v); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeDouble(double v) throws IOException { + try { + getRaf().writeDouble(v); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeBytes(String s) throws IOException { + try { + getRaf().writeBytes(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeChars(String s) throws IOException { + try { + getRaf().writeChars(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + @Override + public void writeUTF(String s) throws IOException { + try { + getRaf().writeUTF(s); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + + //RAF methods + public long length() throws IOException { + try { + return getRaf().length(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public void setLength(long length) throws IOException { + try { + getRaf().setLength(length); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public void seek(long pos) throws IOException { + try { + getRaf().seek(pos); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public FileDescriptor getFD() throws IOException { + try { + return getRaf().getFD(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public int read(byte[] b, int off, int len) throws IOException { + try { + return getRaf().read(b, off, len); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public int read(byte[] b) throws IOException { + try { + return getRaf().read(b); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } +}
