Author: jbellis
Date: Fri Dec 24 16:57:07 2010
New Revision: 1052531
URL: http://svn.apache.org/viewvc?rev=1052531&view=rev
Log:
avoid polluting pagecache with commitlog or sstable writes and seq scan
operations
patch by Pavel Yaskevich and Chris Goffinet; reviewed by tjake and jbellis for
CASSANDRA-1470
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec 24 16:57:07 2010
@@ -6,6 +6,8 @@ dev
* More-efficient cross-DC replication (CASSANDRA-1530)
* upgrade to TFastFramedTransport (CASSANDRA-1743)
* fix CLI get recognition of supercolumns (CASSANDRA-1899)
+ * avoid polluting page cache with commitlog or sstable writes
+ and seq scan operations (CASSANDRA-1470)
0.7.0-rc3
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
Fri Dec 24 16:57:07 2010
@@ -652,7 +652,7 @@ public class CompactionManager implement
CollatingIterator iter = FBUtilities.getCollatingIterator();
for (SSTableReader sstable : sstables)
{
- SSTableScanner scanner = sstable.getScanner(FILE_BUFFER_SIZE);
+ SSTableScanner scanner =
sstable.getDirectScanner(FILE_BUFFER_SIZE);
iter.addIterator(new FilterIterator(scanner, rangesPredicate));
}
return iter;
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Fri Dec 24 16:57:07 2010
@@ -211,7 +211,7 @@ public class CommitLog
for (File file : clogs)
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
- BufferedRandomAccessFile reader = new
BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
+ BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new
File(file.getAbsolutePath()), "r", bufferSize, true);
try
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Fri Dec 24 16:57:07 2010
@@ -52,6 +52,7 @@ public class CommitLogSegment
try
{
logWriter = createWriter(logFile);
+
writeHeader();
}
catch (IOException e)
@@ -72,7 +73,7 @@ public class CommitLogSegment
private static BufferedRandomAccessFile createWriter(String file) throws
IOException
{
- return new BufferedRandomAccessFile(file, "rw", 128 * 1024);
+ return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024,
true);
}
public CommitLogSegment.CommitLogContext write(RowMutation rowMutation,
Object serializedRow) throws IOException
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
Fri Dec 24 16:57:07 2010
@@ -82,7 +82,7 @@ implements Closeable, ICompactionInfo
CollatingIterator iter = FBUtilities.getCollatingIterator();
for (SSTableReader sstable : sstables)
{
- iter.addIterator(sstable.getScanner(FILE_BUFFER_SIZE));
+ iter.addIterator(sstable.getDirectScanner(FILE_BUFFER_SIZE));
}
return iter;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Fri Dec 24 16:57:07 2010
@@ -44,7 +44,10 @@ public class KeyIterator extends Abstrac
this.desc = desc;
try
{
- in = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "r");
+ in = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)),
+ "r",
+
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
+ true);
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Fri Dec 24 16:57:07 2010
@@ -266,7 +266,10 @@ public class SSTableReader extends SSTab
SegmentedFile.Builder dbuilder =
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
// we read the positions in a BRAF so we don't have to worry about an
entry spanning a mmap boundary.
- BufferedRandomAccessFile input = new
BufferedRandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");
+ BufferedRandomAccessFile input = new BufferedRandomAccessFile(new
File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
+ "r",
+
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
+ true);
try
{
if (keyCache != null && keyCache.getCapacity() -
keyCache.getSize() < keysToLoadInCache.size())
@@ -524,15 +527,6 @@ public class SSTableReader extends SSTab
/**
* @param bufferSize Buffer size in bytes for this Scanner.
- * @return A Scanner for seeking over the rows of the SSTable.
- */
- public SSTableScanner getScanner(int bufferSize)
- {
- return new SSTableScanner(this, bufferSize);
- }
-
- /**
- * @param bufferSize Buffer size in bytes for this Scanner.
* @param filter filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
@@ -541,6 +535,17 @@ public class SSTableReader extends SSTab
return new SSTableScanner(this, filter, bufferSize);
}
+ /**
+ * Direct I/O SSTableScanner
+ * @param bufferSize Buffer size in bytes for this Scanner.
+ * @return A Scanner for seeking over the rows of the SSTable.
+ * @throws IOException when I/O operation fails
+ */
+ public SSTableScanner getDirectScanner(int bufferSize)
+ {
+ return new SSTableScanner(this, bufferSize, true);
+ }
+
public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int
bufferSize)
{
long position = getPosition(decoratedKey, Operator.EQ);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
Fri Dec 24 16:57:07 2010
@@ -20,6 +20,7 @@
package org.apache.cassandra.io.sstable;
import java.io.Closeable;
+import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.util.Arrays;
@@ -49,11 +50,11 @@ public class SSTableScanner implements I
/**
* @param sstable SSTable to scan.
*/
- SSTableScanner(SSTableReader sstable, int bufferSize)
+ SSTableScanner(SSTableReader sstable, int bufferSize, boolean skipCache)
{
try
{
- this.file = new BufferedRandomAccessFile(sstable.getFilename(),
"r", bufferSize);
+ this.file = new BufferedRandomAccessFile(new
File(sstable.getFilename()), "r", bufferSize, skipCache);
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Fri Dec 24 16:57:07 2010
@@ -71,7 +71,7 @@ public class SSTableWriter extends SSTab
SSTable.defaultColumnHistogram());
iwriter = new IndexWriter(descriptor, partitioner, keyCount);
dbuilder =
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- dataFile = new BufferedRandomAccessFile(getFilename(), "rw",
DatabaseDescriptor.getInMemoryCompactionLimit());
+ dataFile = new BufferedRandomAccessFile(new File(getFilename()), "rw",
DatabaseDescriptor.getInMemoryCompactionLimit(), true);
}
public void mark()
@@ -246,7 +246,7 @@ public class SSTableWriter extends SSTab
cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
try
{
- dfile = new
BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", 8 *
1024 * 1024);
+ dfile = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true);
}
catch (IOException e)
{
@@ -362,7 +362,7 @@ public class SSTableWriter extends SSTab
{
this.desc = desc;
this.partitioner = part;
- indexFile = new
BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_INDEX), "rw", 8 *
1024 * 1024);
+ indexFile = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_INDEX)), "rw", 8 * 1024 * 1024, true);
builder =
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
summary = new IndexSummary(keyCount);
bf = BloomFilter.getFilter(keyCount, 15);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Fri Dec 24 16:57:07 2010
@@ -15,12 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.io.util;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.cassandra.utils.CLibrary;
/**
* A <code>BufferedRandomAccessFile</code> is like a
@@ -33,312 +36,363 @@ import java.io.RandomAccessFile;
* overridden here relies on the implementation of those methods in the
* superclass.
*/
-
public class BufferedRandomAccessFile extends RandomAccessFile implements
FileDataInput
{
- static final int LogBuffSz_ = 16; // 64K buffer
- public static final int BuffSz_ = (1 << LogBuffSz_);
-
- private String path_;
+ private static final long MAX_BYTES_IN_PAGE_CACHE = (long) Math.pow(2,
27); // 128mb
- /*
- * This implementation is based on the buffer implementation in Modula-3's
- * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
- */
- private boolean dirty_; // true iff unflushed bytes exist
- private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so
track sync separately
- private long curr_; // current position in file
- private long lo_, hi_; // bounds on characters in "buff"
- private byte[] buff_; // local buffer
- private long maxHi_; // this.lo + this.buff.length
- private boolean hitEOF_; // buffer contains last file block?
- private long diskPos_; // disk position
- private long fileLength = -1; // cache for file size
+ // absolute filesystem path to the file
+ private final String filePath;
+
+ // default buffer size, 64Kb
+ public static final int DEFAULT_BUFFER_SIZE = 65535;
+
+ // isDirty - true if this.buffer contains any un-synced bytes
+ // hitEOF - true if buffer capacity is less then it's maximal size
+ private boolean isDirty, syncNeeded, hitEOF = false;
+
+ // buffer which will cache file blocks
+ private ByteBuffer buffer;
+
+ // `current` as current position in file
+ // `bufferOffset` is the offset of the beginning of the buffer
+ // `bufferEnd` is `bufferOffset` + count of bytes read from file
+ private long bufferOffset, bufferEnd, current = 0;
+
+ // max buffer size is set according to (int size) parameter in the
+ // constructor
+ // or in directIO() method to the DEFAULT_DIRECT_BUFFER_SIZE
+ private long maxBufferSize;
+
+ // constant, used for caching purpose, -1 if file is open in "w" mode
+ // otherwise this will hold cached file length
+ private final long fileLength;
+
+ // channel liked with the file, used to retrieve data and force updates.
+ private final FileChannel channel;
+
+ private long markedPointer;
+
+ // file descriptor
+ private int fd;
+
+ // skip cache - used for commit log and sstable writing w/ posix_fadvise
+ private final boolean skipCache;
+
+ private long bytesSinceCacheFlush = 0;
+ private long minBufferOffset = Long.MAX_VALUE;
/*
- * To describe the above fields, we introduce the following abstractions for
- * the file "f":
- *
- * len(f) the length of the file curr(f) the current position in the file
- * c(f) the abstract contents of the file disk(f) the contents of f's
- * backing disk file closed(f) true iff the file is closed
- *
- * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
- * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
- * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
- * operation has the effect of making "disk(f)" identical to "c(f)".
- *
- * A file is said to be *valid* if the following conditions hold:
- *
- * V1. The "closed" and "curr" fields are correct:
- *
- * f.closed == closed(f) f.curr == curr(f)
- *
- * V2. The current position is either contained in the buffer, or just past
- * the buffer:
- *
- * f.lo <= f.curr <= f.hi
- *
- * V3. Any (possibly) unflushed characters are stored in "f.buff":
- *
- * (forall i in [f.lo, f.hi): c(f)[i] == f.buff[i - f.lo])
- *
- * V4. For all characters not covered by V3, c(f) and disk(f) agree:
- *
- * (forall i in [f.lo, len(f)): i not in [f.lo, f.hi) => c(f)[i] ==
- * disk(f)[i])
- *
- * V5. "f.dirty" is true iff the buffer contains bytes that should be
- * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
- *
- * f.dirty == (exists i in [f.lo, f.hi): c(f)[i] != f.buff[i - f.lo])
- *
- * V6. this.maxHi == this.lo + this.buff.length
- *
- * Note that "f.buff" can be "null" in a valid file, since the range of
- * characters in V3 is empty when "f.lo == f.hi".
- *
- * A file is said to be *ready* if the buffer contains the current position,
- * i.e., when:
- *
- * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
- *
- * When a file is ready, reading or writing a single byte can be performed
- * by reading or writing the in-memory buffer without performing a disk
- * operation.
- */
-
- /**
- * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
- * in mode <code>mode</code>, which should be "r" for reading only, or
- * "rw" for reading and writing.
- */
- public BufferedRandomAccessFile(File file, String mode) throws IOException
- {
- this(file, mode, 0);
- }
-
- public BufferedRandomAccessFile(File file, String mode, int size) throws
IOException
- {
- super(file, mode);
- path_ = file.getAbsolutePath();
- this.init(size, mode);
- }
-
- /**
* Open a new <code>BufferedRandomAccessFile</code> on the file named
* <code>name</code> in mode <code>mode</code>, which should be "r" for
* reading only, or "rw" for reading and writing.
*/
public BufferedRandomAccessFile(String name, String mode) throws
IOException
{
- this(name, mode, 0);
+ this(new File(name), mode, 0);
}
-
- public BufferedRandomAccessFile(String name, String mode, int size) throws
IOException
+
+ public BufferedRandomAccessFile(String name, String mode, int bufferSize)
throws IOException
{
- super(name, mode);
- path_ = name;
- this.init(size, mode);
+ this(new File(name), mode, bufferSize);
}
-
- private void init(int size, String mode) throws IOException
+
+ /*
+ * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code> in
+ * mode <code>mode</code>, which should be "r" for reading only, or "rw"
for
+ * reading and writing.
+ */
+ public BufferedRandomAccessFile(File file, String mode) throws IOException
{
- this.dirty_ = false;
- this.lo_ = this.curr_ = this.hi_ = 0;
- this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
- this.maxHi_ = (long) BuffSz_;
- this.hitEOF_ = false;
- this.diskPos_ = 0L;
- if ("r".equals(mode))
- {
- // read only file, we can cache file length
- this.fileLength = super.length();
- }
+ this(file, mode, 0);
}
- public String getPath()
+ public BufferedRandomAccessFile(File file, String mode, int bufferSize)
throws IOException
{
- return path_;
+ this(file, mode, bufferSize, false);
+ }
+
+ public BufferedRandomAccessFile(File file, String mode, int bufferSize,
boolean skipCache) throws IOException
+ {
+ super(file, mode);
+
+ this.skipCache = skipCache;
+
+ channel = super.getChannel();
+ filePath = file.getAbsolutePath();
+
+ maxBufferSize = Math.max(bufferSize, DEFAULT_BUFFER_SIZE);
+
+ // allocating required size of the buffer
+ buffer = ByteBuffer.allocate((int) maxBufferSize);
+
+ // if in read-only mode, caching file size
+ fileLength = (mode.equals("r")) ? this.channel.size() : -1;
+ bufferEnd = reBuffer(); // bufferBottom equals to the bytes read
+ fd = CLibrary.getfd(this.getFD());
}
public void sync() throws IOException
{
- if (syncNeeded_)
+ if (syncNeeded)
{
flush();
- getChannel().force(true); // true, because file length counts as
"metadata"
- syncNeeded_ = false;
+
+ channel.force(true); // true, because file length counts as
+ // "meta-data"
+
+ if (skipCache)
+ {
+ // clear entire file from page cache
+ CLibrary.trySkipCache(this.fd, 0, 0);
+
+ minBufferOffset = Long.MAX_VALUE;
+ bytesSinceCacheFlush = 0;
+ }
+
+ syncNeeded = false;
}
}
- public void close() throws IOException
- {
- sync();
- this.buff_ = null;
- super.close();
- }
-
- /* Flush any dirty bytes in the buffer to disk. */
public void flush() throws IOException
- {
- if (this.dirty_)
+ {
+ if (isDirty)
{
- if (this.diskPos_ != this.lo_)
- super.seek(this.lo_);
- int len = (int) (this.hi_ - this.lo_);
- super.write(this.buff_, 0, len);
- this.diskPos_ = this.hi_;
- this.dirty_ = false;
+ if (channel.position() != bufferOffset)
+ channel.position(bufferOffset);
+
+ int lengthToWrite = (int) (bufferEnd - bufferOffset);
+
+ super.write(buffer.array(), 0, lengthToWrite);
+
+ if (skipCache)
+ {
+
+ // we don't know when the data reaches disk since we aren't
+ // calling flush
+ // so we continue to clear pages we don't need from the first
+ // offset we see
+ // periodically we update this starting offset
+ bytesSinceCacheFlush += lengthToWrite;
+
+ if (bufferOffset < minBufferOffset)
+ minBufferOffset = bufferOffset;
+
+ if (bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
+ {
+ CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
+ minBufferOffset = bufferOffset;
+ bytesSinceCacheFlush = 0;
+ }
+
+ }
+
+ isDirty = false;
}
}
-
- /*
- * Read at most "this.buff.length" bytes into "this.buff", returning the
- * number of bytes read. If the return result is less than
- * "this.buff.length", then EOF was read.
- */
- private int fillBuffer() throws IOException
- {
- int count = 0;
- int remainder = this.buff_.length;
- while (remainder > 0)
- {
- int n = super.read(this.buff_, count, remainder);
- if (n < 0)
- break;
- count += n;
- remainder -= n;
- }
- this.hitEOF_ = (count < this.buff_.length);
- this.diskPos_ += count;
- return count;
- }
-
- public void seek(long pos) throws IOException
+
+ private long reBuffer() throws IOException
{
- this.curr_ = pos;
+ flush(); // synchronizing buffer and file on disk
+ buffer.clear();
+ bufferOffset = current;
+
+ if (bufferOffset > channel.size())
+ {
+ buffer.rewind();
+ bufferEnd = bufferOffset;
+ hitEOF = true;
+
+ return 0;
+ }
+
+ if (bufferOffset < minBufferOffset)
+ minBufferOffset = bufferOffset;
+
+ channel.position(bufferOffset); // setting channel position
+ long bytesRead = channel.read(buffer); // reading from that position
+
+ hitEOF = (bytesRead < maxBufferSize); // buffer is not fully loaded
with
+ // data
+ bufferEnd = bufferOffset + bytesRead;
+
+ buffer.rewind();
+
+ bytesSinceCacheFlush += bytesRead;
+
+ if (skipCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
+ {
+ CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
+ bytesSinceCacheFlush = 0;
+ minBufferOffset = Long.MAX_VALUE;
+ }
+
+ return bytesRead;
}
- /*
- * On exit from this routine <code>this.curr == this.hi</code> iff
<code>pos</code>
- * is at or past the end-of-file, which can only happen if the file was
- * opened in read-only mode.
- */
- private void reBuffer() throws IOException
+ @Override
+ // -1 will be returned if EOF is reached, RandomAccessFile is responsible
+ // for
+ // throwing EOFException
+ public int read() throws IOException
{
- this.flush();
- this.lo_ = this.curr_;
- this.maxHi_ = this.lo_ + (long) this.buff_.length;
- if (this.diskPos_ != this.lo_)
+ if (isEOF())
+ return -1; // required by RandomAccessFile
+
+ if (current < bufferOffset || current >= bufferEnd)
{
- super.seek(this.lo_);
- this.diskPos_ = this.lo_;
+ reBuffer();
+
+ if (current == bufferEnd && hitEOF)
+ return -1; // required by RandomAccessFile
}
- int n = this.fillBuffer();
- this.hi_ = this.lo_ + (long) n;
+
+ byte result = buffer.get();
+ current++;
+
+ return ((int) result) & 0xFF;
}
- public long getFilePointer()
+ @Override
+ public int read(byte[] buffer) throws IOException
{
- return this.curr_;
+ return read(buffer, 0, buffer.length);
}
- public long length() throws IOException
+ @Override
+ // -1 will be returned if EOF is reached, RandomAccessFile is responsible
+ // for
+ // throwing EOFException
+ public int read(byte[] buff, int offset, int length) throws IOException
{
- if (fileLength == -1)
+ int bytesCount = 0;
+
+ while (length > 0)
{
- // max accounts for the case where we have written past the old
file length, but not yet flushed our buffer
- return Math.max(this.curr_, super.length());
+ int bytesRead = readAtMost(buff, offset, length);
+ if (bytesRead == -1)
+ return -1; // EOF
+
+ offset += bytesRead;
+ length -= bytesRead;
+ bytesCount += bytesRead;
}
- else
+
+ return bytesCount;
+ }
+
+ private int readAtMost(byte[] buff, int offset, int length) throws
IOException
+ {
+ if (length >= bufferEnd && hitEOF)
+ return -1;
+
+ final int left = (int) maxBufferSize - buffer.position();
+ if (current < bufferOffset || left < length)
{
- // opened as read only, file length is cached
- return fileLength;
+ reBuffer();
}
+
+ length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+ buffer.get(buff, offset, length);
+ current += length;
+
+ return length;
}
- public int read() throws IOException
+ @Override
+ public void write(int val) throws IOException
{
- if (this.lo_ > this.curr_ || this.curr_ >= this.hi_)
- {
- this.reBuffer();
- if (this.curr_ == this.hi_ && this.hitEOF_)
- return -1;
- }
- byte res = this.buff_[(int) (this.curr_ - this.lo_)];
- this.curr_++;
- return ((int) res) & 0xFF; // convert byte -> int
+ byte[] b = new byte[1];
+ b[0] = (byte) val;
+ this.write(b, 0, b.length);
}
-
- public int read(byte[] b) throws IOException
+
+ @Override
+ public void write(byte[] b) throws IOException
{
- return this.read(b, 0, b.length);
+ write(b, 0, b.length);
}
-
- public int read(byte[] b, int off, int len) throws IOException
+
+ @Override
+ public void write(byte[] buff, int offset, int length) throws IOException
{
- if (this.lo_ > this.curr_ || this.curr_ >= this.hi_)
+ while (length > 0)
{
- this.reBuffer();
- if (this.curr_ == this.hi_ && this.hitEOF_)
- return -1;
- }
- len = Math.min(len, (int) (this.hi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(this.buff_, buffOff, b, off, len);
- this.curr_ += len;
- return len;
+ int n = writeAtMost(buff, offset, length);
+ offset += n;
+ length -= n;
+ isDirty = true;
+ syncNeeded = true;
+ }
}
-
- public void write(int b) throws IOException
+
+ /*
+ * Write at most "length" bytes from "b" starting at position "offset", and
+ * return the number of bytes written. caller is responsible for setting
+ * isDirty.
+ */
+ private int writeAtMost(byte[] buff, int offset, int length) throws
IOException
{
- if (this.lo_ > this.curr_ || this.curr_ > this.hi_ || this.curr_ >=
maxHi_)
+ final int left = (int) maxBufferSize - buffer.position();
+ if (current < bufferOffset || left < length)
{
- this.reBuffer();
+ reBuffer();
}
- this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
- this.curr_++;
- if (this.curr_ > this.hi_)
- this.hi_ = this.curr_;
- this.dirty_ = true;
- syncNeeded_ = true;
+
+ // logic is the following: we need to add bytes to the end of the
buffer
+ // starting from current buffer position and return this length
+ length = Math.min(length, (int) (maxBufferSize - buffer.position()));
+
+ buffer.put(buff, offset, length);
+ current += length;
+
+ if (current > bufferEnd)
+ bufferEnd = current;
+
+ return length;
}
-
- public void write(byte[] b) throws IOException
+
+ @Override
+ public void seek(long newPosition) throws IOException
{
- this.write(b, 0, b.length);
- }
-
- public void write(byte[] b, int off, int len) throws IOException
- {
- while (len > 0)
- {
- int n = this.writeAtMost(b, off, len);
- off += n;
- len -= n;
- this.dirty_ = true;
- syncNeeded_ = true;
+ current = newPosition;
+
+ if (newPosition >= bufferEnd || newPosition < bufferOffset)
+ {
+ reBuffer(); // this will set bufferEnd for us
}
+
+ final int delta = (int) (newPosition - bufferOffset);
+ buffer.position(delta);
}
-
- /*
- * Write at most "len" bytes to "b" starting at position "off", and return
- * the number of bytes written. caller is responsible for setting dirty,
syncNeeded.
- */
- private int writeAtMost(byte[] b, int off, int len) throws IOException
+
+ @Override
+ public int skipBytes(int count) throws IOException
{
- if (this.lo_ > this.curr_ || this.curr_ > this.hi_ || this.curr_ >=
maxHi_)
+ if (count > 0)
{
- this.reBuffer();
+ long currentPos = getFilePointer(), eof = length();
+ int newCount = (int) ((currentPos + count > eof) ? eof -
currentPos : count);
+
+ seek(currentPos + newCount);
+ return newCount;
}
- len = Math.min(len, (int) (this.maxHi_ - this.curr_));
- int buffOff = (int) (this.curr_ - this.lo_);
- System.arraycopy(b, off, this.buff_, buffOff, len);
- this.curr_ += len;
- if (this.curr_ > this.hi_)
- this.hi_ = this.curr_;
- return len;
+
+ return 0;
+ }
+
+ public long length() throws IOException
+ {
+ return (fileLength == -1) ? Math.max(current, channel.size()) :
fileLength;
+ }
+
+ public long getFilePointer()
+ {
+ return bufferOffset + buffer.position();
+ }
+
+ public String getPath()
+ {
+ return filePath;
}
public boolean isEOF() throws IOException
@@ -351,9 +405,39 @@ public class BufferedRandomAccessFile ex
return length() - getFilePointer();
}
+ @Override
+ public void close() throws IOException
+ {
+ sync();
+ buffer = null;
+
+ if (skipCache && bytesSinceCacheFlush > 0)
+ {
+ CLibrary.trySkipCache(this.fd, 0, 0);
+ }
+
+ super.close();
+ }
+
+ public void reset() throws IOException
+ {
+ seek(markedPointer);
+ }
+
+ public int bytesPastMark()
+ {
+ long bytes = getFilePointer() - markedPointer;
+
+ assert bytes >= 0;
+ if (bytes > Integer.MAX_VALUE)
+ throw new UnsupportedOperationException("Overflow: " + bytes);
+ return (int) bytes;
+ }
+
public FileMark mark()
{
- return new BufferedRandomAccessFileMark(getFilePointer());
+ markedPointer = getFilePointer();
+ return new BufferedRandomAccessFileMark(markedPointer);
}
public void reset(FileMark mark) throws IOException
@@ -366,17 +450,21 @@ public class BufferedRandomAccessFile ex
{
assert mark instanceof BufferedRandomAccessFileMark;
long bytes = getFilePointer() - ((BufferedRandomAccessFileMark)
mark).pointer;
+
assert bytes >= 0;
if (bytes > Integer.MAX_VALUE)
throw new UnsupportedOperationException("Overflow: " + bytes);
return (int) bytes;
}
- private static class BufferedRandomAccessFileMark implements FileMark
+ /**
+ * Class to hold a mark to the position of the file
+ */
+ protected static class BufferedRandomAccessFileMark implements FileMark
{
long pointer;
- BufferedRandomAccessFileMark(long pointer)
+ public BufferedRandomAccessFileMark(long pointer)
{
this.pointer = pointer;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/SSTableExport.java
Fri Dec 24 16:57:07 2010
@@ -185,7 +185,7 @@ public class SSTableExport
throws IOException
{
SSTableReader reader =
SSTableReader.open(Descriptor.fromFilename(ssTableFile));
- SSTableScanner scanner = reader.getScanner(INPUT_FILE_BUFFER_SIZE);
+ SSTableScanner scanner =
reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
Set<String> excludeSet = new HashSet<String>();
int i = 0;
@@ -243,7 +243,7 @@ public class SSTableExport
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[]
excludes) throws IOException
{
- SSTableScanner scanner = reader.getScanner(INPUT_FILE_BUFFER_SIZE);
+ SSTableScanner scanner =
reader.getDirectScanner(INPUT_FILE_BUFFER_SIZE);
Set<String> excludeSet = new HashSet<String>();
if (excludes != null)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
Fri Dec 24 16:57:07 2010
@@ -19,7 +19,9 @@
package org.apache.cassandra.utils;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.IOException;
+import java.lang.reflect.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,18 @@ public final class CLibrary
private static final int ENOMEM = 12;
+ private static final int F_GETFL = 3; /* get file status flags */
+ private static final int F_SETFL = 4; /* set file status flags */
+ private static final int F_NOCACHE = 48; /* Mac OS X specific flag, turns
cache on/off */
+ private static final int O_DIRECT = 040000; /* fcntl.h */
+
+ private static final int POSIX_FADV_NORMAL = 0; /* fadvise.h */
+ private static final int POSIX_FADV_RANDOM = 1; /* fadvise.h */
+ private static final int POSIX_FADV_SEQUENTIAL = 2; /* fadvise.h */
+ private static final int POSIX_FADV_WILLNEED = 3; /* fadvise.h */
+ private static final int POSIX_FADV_DONTNEED = 4; /* fadvise.h */
+ private static final int POSIX_FADV_NOREUSE = 5; /* fadvise.h */
+
static
{
try
@@ -61,6 +75,12 @@ public final class CLibrary
private static native int link(String from, String to) throws
LastErrorException;
+ // fcntl - manipulate file descriptor, `man 2 fcntl`
+ public static native int fcntl(int fd, int command, long flags) throws
LastErrorException;
+
+ // fadvice
+ public static native int posix_fadvise(int fd, int offset, int len, int
flag) throws LastErrorException;
+
private static int errno(RuntimeException e)
{
assert e instanceof LastErrorException;
@@ -168,4 +188,72 @@ public final class CLibrary
throw new RuntimeException(e);
}
}
+
+ public static void trySkipCache(int fd, int offset, int len)
+ {
+ if (fd < 0)
+ return;
+
+ try
+ {
+ if (System.getProperty("os.name").toLowerCase().contains("linux"))
+ {
+ posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED);
+ }
+ else if
(System.getProperty("os.name").toLowerCase().contains("mac"))
+ {
+ tryFcntl(fd, F_NOCACHE, 1);
+ }
+ }
+ catch (UnsatisfiedLinkError e)
+ {
+ // if JNA is unavailable just skipping Direct I/O
+ // instance of this class will act like normal RandomAccessFile
+ }
+ }
+
+ public static int tryFcntl(int fd, int command, int flags)
+ {
+ int result = -1;
+
+ try
+ {
+ result = CLibrary.fcntl(fd, command, flags);
+ assert result >= 0; // on error a value of -1 is returned and
errno is set to indicate the error.
+ }
+ catch (RuntimeException e)
+ {
+ if (!(e instanceof LastErrorException))
+ throw e;
+
+ logger.warn(String.format("fcntl(%d, %d, %d) failed, errno (%d).",
+ fd, command, flags, CLibrary.errno(e)));
+ }
+
+ return result;
+ }
+
+ /**
+ * Get system file descriptor from FileDescriptor object.
+ * @param descriptor - FileDescriptor objec to get fd from
+ * @return file descriptor, -1 or error
+ */
+ public static int getfd(FileDescriptor descriptor)
+ {
+ Field field = FBUtilities.getProtectedField(descriptor.getClass(),
"fd");
+
+ if (field == null)
+ return -1;
+
+ try
+ {
+ return field.getInt(descriptor);
+ }
+ catch (Exception e)
+ {
+ logger.warn("unable to read fd field from FileDescriptor");
+ }
+
+ return -1;
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
Fri Dec 24 16:57:07 2010
@@ -658,4 +658,27 @@ public class FBUtilities
{
return o.getClass().isArray() ? Arrays.toString((Object[]) o) :
o.toString();
}
+
+ /**
+ * Used to get access to protected/private field of the specified class
+ * @param klass - name of the class
+ * @param fieldName - name of the field
+ * @return Field or null on error
+ */
+ public static Field getProtectedField(Class klass, String fieldName)
+ {
+ Field field;
+
+ try
+ {
+ field = klass.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return field;
+ }
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1052531&r1=1052530&r2=1052531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Fri Dec 24 16:57:07 2010
@@ -43,7 +43,7 @@ public class BufferedRandomAccessFileTes
// write a chunk smaller then our buffer, so will not be flushed
// to disk
- byte[] lessThenBuffer = new byte[BufferedRandomAccessFile.BuffSz_ / 2];
+ byte[] lessThenBuffer = new
byte[BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE / 2];
rw.write(lessThenBuffer);
assertEquals(lessThenBuffer.length, rw.length());
@@ -52,7 +52,7 @@ public class BufferedRandomAccessFileTes
assertEquals(lessThenBuffer.length, rw.length());
// write more then the buffer can hold and check length
- byte[] biggerThenBuffer = new byte[BufferedRandomAccessFile.BuffSz_ *
2];
+ byte[] biggerThenBuffer = new
byte[BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE * 2];
rw.write(biggerThenBuffer);
assertEquals(biggerThenBuffer.length + lessThenBuffer.length,
rw.length());
@@ -76,11 +76,11 @@ public class BufferedRandomAccessFileTes
BufferedRandomAccessFile rw = new BufferedRandomAccessFile(tmpFile,
"rw");
// Fully write the file and sync..
- byte[] in = new byte[BufferedRandomAccessFile.BuffSz_];
+ byte[] in = new byte[BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE];
rw.write(in);
// Read it into a same size array.
- byte[] out = new byte[BufferedRandomAccessFile.BuffSz_];
+ byte[] out = new byte[BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE];
rw.read(out);
// We're really at the end.
@@ -93,7 +93,7 @@ public class BufferedRandomAccessFileTes
assert negone == -1 : "We read past the end of the file, should have
gotten EOF -1. Instead, " + negone;
// Writing will succeed
- rw.write(new byte[BufferedRandomAccessFile.BuffSz_]);
+ rw.write(new byte[BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE]);
// Forcing a rebuffer here
rw.write(42);
}