Updated Branches: refs/heads/trunk efb988655 -> ef619b6a9
https://issues.apache.org/jira/browse/AMQ-4947 Updated fix for this issue allows for enabling non-forced metadata updates to the file channel via FileChannel#force(false) enable this by defining "org.apache.activemq.kahaDB.files.skipMetadataUpdate=true" Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ef619b6a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ef619b6a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ef619b6a Branch: refs/heads/trunk Commit: ef619b6a9bb9fd215a3996c8dc49f21e04380547 Parents: efb9886 Author: Timothy Bish <[email protected]> Authored: Thu Jan 9 17:21:56 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Thu Jan 9 17:21:56 2014 -0500 ---------------------------------------------------------------------- .../CallerBufferingDataFileAppender.java | 23 ++-- .../kahadb/disk/journal/DataFileAccessor.java | 17 +-- .../kahadb/disk/journal/DataFileAppender.java | 11 +- .../store/kahadb/disk/page/PageFile.java | 34 +++-- .../store/kahadb/disk/util/DiskBenchmark.java | 133 ++++++++----------- .../util/RecoverableRandomAccessFile.java | 28 +++- 6 files changed, 134 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java index ff11848..d7c4a28 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java @@ -17,11 +17,11 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.apache.activemq.util.ByteSequence; + import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.RecoverableRandomAccessFile; /** @@ -30,7 +30,7 @@ import org.apache.activemq.util.RecoverableRandomAccessFile; * does. * The thread calling enqueue does the file open and buffering of the data, which * reduces the round trip of the write thread. - * + * */ class CallerBufferingDataFileAppender extends DataFileAppender { @@ -49,6 +49,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender { append(write); } + @Override public void append(Journal.WriteCommand write) throws IOException { super.append(write); forceToDisk |= appendToBuffer(write, buff); @@ -124,15 +125,15 @@ class CallerBufferingDataFileAppender extends DataFileAppender { final boolean forceToDisk = wb.forceToDisk; ByteSequence sequence = buff.toByteSequence(); - - // Now we can fill in the batch control record properly. + + // Now we can fill in the batch control record properly. buff.reset(); buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length); buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE); if( journal.isChecksum() ) { - Checksum checksum = new Adler32(); - checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE); - buff.writeLong(checksum.getValue()); + Checksum checksum = new Adler32(); + checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE); + buff.writeLong(checksum.getValue()); } // Now do the 1 big write. @@ -151,11 +152,11 @@ class CallerBufferingDataFileAppender extends DataFileAppender { file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); ReplicationTarget replicationTarget = journal.getReplicationTarget(); if( replicationTarget!=null ) { - replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk); + replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk); } - + if (forceToDisk) { - file.getFD().sync(); + file.sync(); } Journal.WriteCommand lastWrite = wb.writes.getTail(); http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 2896196..2046031 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -17,7 +17,6 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.Map; import org.apache.activemq.util.ByteSequence; @@ -26,8 +25,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile; /** * Optimized Store reader and updater. Single threaded and synchronous. Use in * conjunction with the DataFileAccessorPool of concurrent use. - * - * + * + * */ final class DataFileAccessor { @@ -38,7 +37,7 @@ final class DataFileAccessor { /** * Construct a Store reader - * + * * @param fileId * @throws IOException */ @@ -70,7 +69,7 @@ final class DataFileAccessor { throw new IOException("Invalid location: " + location); } - Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location)); + Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location)); if (asyncWrite != null) { return asyncWrite.data; } @@ -93,7 +92,7 @@ final class DataFileAccessor { throw new IOException("Invalid location: " + location + ", : " + e, e); } } - + public void readFully(long offset, byte data[]) throws IOException { file.seek(offset); file.readFully(data); @@ -105,7 +104,7 @@ final class DataFileAccessor { } public void readLocationDetails(Location location) throws IOException { - Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location)); + Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location)); if (asyncWrite != null) { location.setSize(asyncWrite.location.getSize()); location.setType(asyncWrite.location.getType()); @@ -155,9 +154,7 @@ final class DataFileAccessor { int size = Math.min(data.getLength(), location.getSize()); file.write(data.getData(), data.getOffset(), size); if (sync) { - file.getFD().sync(); + file.sync(); } - } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index 095db52..969584e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -18,16 +18,15 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.IOException; import java.io.InterruptedIOException; -import java.io.RandomAccessFile; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,10 +66,12 @@ class DataFileAppender implements FileAppender { hash = (int)(file ^ offset); } + @Override public int hashCode() { return hash; } + @Override public boolean equals(Object obj) { if (obj instanceof WriteKey) { WriteKey di = (WriteKey)obj; @@ -132,6 +133,7 @@ class DataFileAppender implements FileAppender { this.syncOnComplete = this.journal.isEnableAsyncDiskSync(); } + @Override public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { // Write the packet our internal buffer. @@ -160,6 +162,7 @@ class DataFileAppender implements FileAppender { return location; } + @Override public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException { // Write the packet our internal buffer. int size = data.getLength() + Journal.RECORD_HEAD_SPACE; @@ -185,6 +188,7 @@ class DataFileAppender implements FileAppender { if (!running) { running = true; thread = new Thread() { + @Override public void run() { processQueue(); } @@ -246,6 +250,7 @@ class DataFileAppender implements FileAppender { return new WriteBatch(file, file.getLength(), write); } + @Override public void close() throws IOException { synchronized (enqueueMutex) { if (!shutdown) { @@ -365,7 +370,7 @@ class DataFileAppender implements FileAppender { } if (forceToDisk) { - file.getFD().sync(); + file.sync(); } Journal.WriteCommand lastWrite = wb.writes.getTail(); http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index 3f107a6..17d6a54 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -42,9 +42,15 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; import java.util.zip.Checksum; -import org.apache.activemq.util.*; import org.apache.activemq.store.kahadb.disk.util.Sequence; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.LFUCache; +import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.RecoverableRandomAccessFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +81,7 @@ public class PageFile { private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); // A PageFile will use a couple of files in this directory - private File directory; + private final File directory; // And the file names in that directory will be based on this name. private final String name; @@ -97,7 +103,7 @@ public class PageFile { // The number of pages in the current recovery buffer private int recoveryPageCount; - private AtomicBoolean loaded = new AtomicBoolean(); + private final AtomicBoolean loaded = new AtomicBoolean(); // The number of pages we are aiming to write every time we // write to disk. int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; @@ -118,23 +124,23 @@ public class PageFile { private boolean enabledWriteThread = false; // These are used if enableAsyncWrites==true - private AtomicBoolean stopWriter = new AtomicBoolean(); + private final AtomicBoolean stopWriter = new AtomicBoolean(); private Thread writerThread; private CountDownLatch checkpointLatch; // Keeps track of writes that are being written to disk. - private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); + private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); // Keeps track of free pages. private final AtomicLong nextFreePageId = new AtomicLong(); private SequenceSet freeList = new SequenceSet(); - private AtomicLong nextTxid = new AtomicLong(); + private final AtomicLong nextTxid = new AtomicLong(); // Persistent settings stored in the page file. private MetaData metaData; - private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); + private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); private boolean useLFRUEviction = false; private float LFUEvictionFactor = 0.2f; @@ -521,6 +527,7 @@ public class PageFile { } + @Override public String toString() { return "Page File: " + getMainPageFile(); } @@ -610,10 +617,10 @@ public class PageFile { // So we don't loose it.. write it 2 times... writeFile.seek(0); writeFile.write(d); - writeFile.getFD().sync(); + writeFile.sync(); writeFile.seek(PAGE_FILE_HEADER_SIZE / 2); writeFile.write(d); - writeFile.getFD().sync(); + writeFile.sync(); } private void storeFreeList() throws IOException { @@ -880,14 +887,17 @@ public class PageFile { private <T> void write(Page<T> page, byte[] data) throws IOException { final PageWrite write = new PageWrite(page, data); Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() { + @Override public Long getKey() { return write.getPage().getPageId(); } + @Override public PageWrite getValue() { return write; } + @Override public PageWrite setValue(PageWrite value) { return null; } @@ -1081,9 +1091,9 @@ public class PageFile { if (enableDiskSyncs) { // Sync to make sure recovery buffer writes land on disk.. if (enableRecoveryFile) { - recoveryFile.getFD().sync(); + writeFile.sync(); } - writeFile.getFD().sync(); + writeFile.sync(); } } finally { synchronized (writes) { @@ -1185,7 +1195,7 @@ public class PageFile { } // And sync it to disk - writeFile.getFD().sync(); + writeFile.sync(); return nextTxId; } http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java index 2805f5f..641fe79 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java @@ -27,13 +27,16 @@ import java.util.Arrays; */ public class DiskBenchmark { + private static final boolean SKIP_METADATA_UPDATE = + Boolean.getBoolean("org.apache.activemq.file.skipMetadataUpdate"); + boolean verbose; // reads and writes work with 4k of data at a time. - int bs=1024*4; + int bs = 1024 * 4; // Work with 100 meg file. - long size=1024*1024*500; - long sampleInterval = 10*1000; - + long size = 1024 * 1024 * 500; + long sampleInterval = 10 * 1000; + public static void main(String[] args) { DiskBenchmark benchmark = new DiskBenchmark(); @@ -67,79 +70,69 @@ public class DiskBenchmark { } } - + public static class Report { public int size; - + public int writes; public long writeDuration; - + public int syncWrites; public long syncWriteDuration; - + public int reads; public long readDuration; @Override public String toString() { - return - "Writes: \n" + - " "+writes+" writes of size "+size+" written in "+(writeDuration/1000.0)+" seconds.\n"+ - " "+getWriteRate()+" writes/second.\n"+ - " "+getWriteSizeRate()+" megs/second.\n"+ - "\n"+ - "Sync Writes: \n" + - " "+syncWrites+" writes of size "+size+" written in "+(syncWriteDuration/1000.0)+" seconds.\n"+ - " "+getSyncWriteRate()+" writes/second.\n"+ - " "+getSyncWriteSizeRate()+" megs/second.\n"+ - "\n"+ - "Reads: \n" + - " "+reads+" reads of size "+size+" read in "+(readDuration/1000.0)+" seconds.\n"+ - " "+getReadRate()+" writes/second.\n"+ - " "+getReadSizeRate()+" megs/second.\n"+ - "\n"+ - ""; + return "Writes: \n" + " " + writes + " writes of size " + size + " written in " + (writeDuration / 1000.0) + " seconds.\n" + " " + getWriteRate() + + " writes/second.\n" + " " + getWriteSizeRate() + " megs/second.\n" + "\n" + "Sync Writes: \n" + " " + syncWrites + " writes of size " + + size + " written in " + (syncWriteDuration / 1000.0) + " seconds.\n" + " " + getSyncWriteRate() + " writes/second.\n" + " " + + getSyncWriteSizeRate() + " megs/second.\n" + "\n" + "Reads: \n" + " " + reads + " reads of size " + size + " read in " + + (readDuration / 1000.0) + " seconds.\n" + " " + getReadRate() + " writes/second.\n" + " " + getReadSizeRate() + " megs/second.\n" + "\n" + + ""; } private float getWriteSizeRate() { float rc = writes; rc *= size; - rc /= (1024*1024); // put it in megs - rc /= (writeDuration/1000.0); // get rate. + rc /= (1024 * 1024); // put it in megs + rc /= (writeDuration / 1000.0); // get rate. return rc; } private float getWriteRate() { float rc = writes; - rc /= (writeDuration/1000.0); // get rate. + rc /= (writeDuration / 1000.0); // get rate. return rc; } - + private float getSyncWriteSizeRate() { float rc = syncWrites; rc *= size; - rc /= (1024*1024); // put it in megs - rc /= (syncWriteDuration/1000.0); // get rate. + rc /= (1024 * 1024); // put it in megs + rc /= (syncWriteDuration / 1000.0); // get rate. return rc; } private float getSyncWriteRate() { float rc = syncWrites; - rc /= (syncWriteDuration/1000.0); // get rate. + rc /= (syncWriteDuration / 1000.0); // get rate. return rc; } + private float getReadSizeRate() { float rc = reads; rc *= size; - rc /= (1024*1024); // put it in megs - rc /= (readDuration/1000.0); // get rate. + rc /= (1024 * 1024); // put it in megs + rc /= (readDuration / 1000.0); // get rate. return rc; } private float getReadRate() { float rc = reads; - rc /= (readDuration/1000.0); // get rate. + rc /= (readDuration / 1000.0); // get rate. return rc; } @@ -200,64 +193,63 @@ public class DiskBenchmark { } } - public Report benchmark(File file) throws IOException { Report rc = new Report(); - + // Initialize the block we will be writing to disk. - byte []data = new byte[bs]; + byte[] data = new byte[bs]; for (int i = 0; i < data.length; i++) { - data[i] = (byte)('a'+(i%26)); + data[i] = (byte) ('a' + (i % 26)); } - + rc.size = data.length; RandomAccessFile raf = new RandomAccessFile(file, "rw"); raf.setLength(size); - + // Figure out how many writes we can do in the sample interval. long start = System.currentTimeMillis(); long now = System.currentTimeMillis(); - int ioCount=0; - while( true ) { - if( (now-start)>sampleInterval ) { + int ioCount = 0; + while (true) { + if ((now - start) > sampleInterval) { break; } raf.seek(0); - for( long i=0; i+data.length < size; i+=data.length) { + for (long i = 0; i + data.length < size; i += data.length) { raf.write(data); ioCount++; now = System.currentTimeMillis(); - if( (now-start)>sampleInterval ) { + if ((now - start) > sampleInterval) { break; } } - // Sync to disk so that the we actually write the data to disk.. otherwise - // OS buffering might not really do the write. - raf.getFD().sync(); + // Sync to disk so that the we actually write the data to disk.. + // otherwise OS buffering might not really do the write. + raf.getChannel().force(!SKIP_METADATA_UPDATE); } - raf.getFD().sync(); + raf.getChannel().force(!SKIP_METADATA_UPDATE); raf.close(); now = System.currentTimeMillis(); - + rc.size = data.length; rc.writes = ioCount; - rc.writeDuration = (now-start); + rc.writeDuration = (now - start); raf = new RandomAccessFile(file, "rw"); start = System.currentTimeMillis(); now = System.currentTimeMillis(); - ioCount=0; - while( true ) { - if( (now-start)>sampleInterval ) { + ioCount = 0; + while (true) { + if ((now - start) > sampleInterval) { break; } - for( long i=0; i+data.length < size; i+=data.length) { + for (long i = 0; i + data.length < size; i += data.length) { raf.seek(i); raf.write(data); - raf.getFD().sync(); + raf.getChannel().force(false); ioCount++; now = System.currentTimeMillis(); - if( (now-start)>sampleInterval ) { + if ((now - start) > sampleInterval) { break; } } @@ -265,72 +257,63 @@ public class DiskBenchmark { raf.close(); now = System.currentTimeMillis(); rc.syncWrites = ioCount; - rc.syncWriteDuration = (now-start); + rc.syncWriteDuration = (now - start); raf = new RandomAccessFile(file, "rw"); start = System.currentTimeMillis(); now = System.currentTimeMillis(); - ioCount=0; - while( true ) { - if( (now-start)>sampleInterval ) { + ioCount = 0; + while (true) { + if ((now - start) > sampleInterval) { break; } raf.seek(0); - for( long i=0; i+data.length < size; i+=data.length) { + for (long i = 0; i + data.length < size; i += data.length) { raf.seek(i); raf.readFully(data); ioCount++; now = System.currentTimeMillis(); - if( (now-start)>sampleInterval ) { + if ((now - start) > sampleInterval) { break; } } } raf.close(); - + rc.reads = ioCount; - rc.readDuration = (now-start); + rc.readDuration = (now - start); return rc; } - public boolean isVerbose() { return verbose; } - public void setVerbose(boolean verbose) { this.verbose = verbose; } - public int getBs() { return bs; } - public void setBs(int bs) { this.bs = bs; } - public long getSize() { return size; } - public void setSize(long size) { this.size = size; } - public long getSampleInterval() { return sampleInterval; } - public void setSampleInterval(long sampleInterval) { this.sampleInterval = sampleInterval; } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java index 35c1586..1b0cb4c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java @@ -16,10 +16,18 @@ */ package org.apache.activemq.util; -import java.io.*; +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable { + private static final boolean SKIP_METADATA_UPDATE = + Boolean.getBoolean("org.apache.activemq.kahaDB.files.skipMetadataUpdate"); + RandomAccessFile raf; File file; String mode; @@ -389,6 +397,24 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io. } } + public void sync() throws IOException { + try { + getRaf().getChannel().force(!SKIP_METADATA_UPDATE);; + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + + public FileChannel getChannel() throws IOException { + try { + return getRaf().getChannel(); + } catch (IOException ioe) { + handleException(); + throw ioe; + } + } + public int read(byte[] b, int off, int len) throws IOException { try { return getRaf().read(b, off, len);
