Author: xedin
Date: Wed Oct 5 22:59:31 2011
New Revision: 1179467
URL: http://svn.apache.org/viewvc?rev=1179467&view=rev
Log:
off-heap cache to use sun.misc.Unsafe instead of JNA
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3271
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Oct 5 22:59:31 2011
@@ -3,7 +3,7 @@
* Thrift sockets are not properly buffered (CASSANDRA-3261)
* performance improvement for bytebufferutil compare function (CASSANDRA-3286)
* add system.versions ColumnFamily (CASSANDRA-3140)
-
+ * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
1.0.0-final
* fix bug preventing obsolete commitlog segments from being removed
Modified:
cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/FreeableMemory.java Wed
Oct 5 22:59:31 2011
@@ -20,11 +20,9 @@ package org.apache.cassandra.cache;
*
*/
-
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-import com.sun.jna.Memory;
+import org.apache.cassandra.io.util.Memory;
public class FreeableMemory extends Memory
{
@@ -58,23 +56,16 @@ public class FreeableMemory extends Memo
free();
}
- private void free()
- {
- assert peer != 0;
- super.finalize(); // calls free and sets peer to zero
- }
-
- /**
- * avoid re-freeing already-freed memory
- */
@Override
- protected void finalize()
+ protected void finalize() throws Throwable
{
assert references.get() <= 0;
assert peer == 0;
+ super.finalize();
}
-
- public byte getValidByte(long offset)
+
+ @Override
+ public byte getByte(long offset)
{
assert peer != 0;
return super.getByte(offset);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java
Wed Oct 5 22:59:31 2011
@@ -20,26 +20,11 @@ package org.apache.cassandra.cache;
*
*/
-import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
-import com.sun.jna.Memory;
-
public class SerializingCacheProvider implements IRowCacheProvider
{
- public SerializingCacheProvider() throws ConfigurationException
- {
- try
- {
- Memory.class.getName();
- }
- catch (NoClassDefFoundError e)
- {
- throw new ConfigurationException("Cannot initialize
SerializationCache without JNA in the class path");
- }
- }
-
public ICache<DecoratedKey, ColumnFamily> create(int capacity, String
tableName, String cfName)
{
return new SerializingCache<DecoratedKey, ColumnFamily>(capacity,
ColumnFamily.serializer(), tableName, cfName);
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed
Oct 5 22:59:31 2011
@@ -67,7 +67,7 @@ public final class CFMetaData
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
- public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER =
initDefaultRowCacheProvider();
+ public final static IRowCacheProvider DEFAULT_ROW_CACHE_PROVIDER = new
SerializingCacheProvider();
public final static String DEFAULT_COMPACTION_STRATEGY_CLASS =
"SizeTieredCompactionStrategy";
public final static ByteBuffer DEFAULT_KEY_NAME =
ByteBufferUtil.bytes("KEY");
@@ -97,18 +97,6 @@ public final class CFMetaData
}
}
- private static IRowCacheProvider initDefaultRowCacheProvider()
- {
- try
- {
- return new SerializingCacheProvider();
- }
- catch (ConfigurationException e)
- {
- return new ConcurrentLinkedHashCacheProvider();
- }
- }
-
//REQUIRED
public final Integer cfId; // internal id, never
exposed to user
public final String ksName; // name of keyspace
Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java?rev=1179467&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/Memory.java Wed Oct
5 22:59:31 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.io.util;
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+
+public class Memory
+{
+ private static final Unsafe unsafe;
+
+ static
+ {
+ try
+ {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ unsafe = (sun.misc.Unsafe) field.get(null);
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ protected long peer;
+ // size of the memory region
+ private final long size;
+
+ protected Memory(long bytes)
+ {
+ size = bytes;
+ peer = unsafe.allocateMemory(size);
+ }
+
+ public static Memory allocate(long bytes)
+ {
+ if (bytes < 0)
+ throw new IllegalArgumentException();
+
+ return new Memory(bytes);
+ }
+
+ public void setByte(long offset, byte b)
+ {
+ checkPosition(offset);
+ unsafe.putByte(peer + offset, b);
+ }
+
+ /**
+ * Transfers count bytes from buffer to Memory
+ *
+ * @param memoryOffset start offset in the memory
+ * @param buffer the data buffer
+ * @param bufferOffset start offset of the buffer
+ * @param count number of bytes to transfer
+ */
+ public void setBytes(long memoryOffset, byte[] buffer, int bufferOffset,
int count)
+ {
+ if (buffer == null)
+ throw new NullPointerException();
+ else if (bufferOffset < 0
+ || count < 0
+ || bufferOffset + count > buffer.length)
+ throw new IndexOutOfBoundsException();
+ else if (count == 0)
+ return;
+
+ checkPosition(memoryOffset);
+ long end = memoryOffset + count;
+ checkPosition(end - 1);
+ while (memoryOffset < end)
+ unsafe.putByte(peer + memoryOffset++, buffer[bufferOffset++]);
+ }
+
+ public byte getByte(long offset)
+ {
+ checkPosition(offset);
+ return unsafe.getByte(peer + offset);
+ }
+
+ /**
+ * Transfers count bytes from Memory starting at memoryOffset to buffer
starting at bufferOffset
+ *
+ * @param memoryOffset start offset in the memory
+ * @param buffer the data buffer
+ * @param bufferOffset start offset of the buffer
+ * @param count number of bytes to transfer
+ */
+ public void getBytes(long memoryOffset, byte[] buffer, int bufferOffset,
int count)
+ {
+ if (buffer == null)
+ throw new NullPointerException();
+ else if (bufferOffset < 0 || count < 0 || count > buffer.length -
bufferOffset)
+ throw new IndexOutOfBoundsException();
+ else if (count == 0)
+ return;
+
+ checkPosition(memoryOffset);
+ long end = memoryOffset + count;
+ checkPosition(end - 1);
+ while (memoryOffset < end)
+ buffer[bufferOffset++] = unsafe.getByte(peer + memoryOffset++);
+ }
+
+ private void checkPosition(long offset)
+ {
+ if (peer == 0)
+ throw new IllegalStateException("Memory was freed");
+
+ if (offset < 0 || offset >= size)
+ throw new IndexOutOfBoundsException("Illegal offset: " + offset +
", size: " + size);
+ }
+
+ public void free()
+ {
+ assert peer != 0;
+ unsafe.freeMemory(peer);
+ peer = 0;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ try
+ {
+ if (peer != 0)
+ free();
+ }
+ finally
+ {
+ super.finalize();
+ }
+ }
+
+ public long size()
+ {
+ return size;
+ }
+}
+
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
Wed Oct 5 22:59:31 2011
@@ -38,9 +38,15 @@ public class MemoryInputStream extends A
public int read() throws IOException
{
- return mem.getValidByte(position++) & 0xFF;
+ return mem.getByte(position++) & 0xFF;
}
-
+
+ public void readFully(byte[] buffer, int offset, int count) throws
IOException
+ {
+ mem.getBytes(position, buffer, offset, count);
+ position += count;
+ }
+
protected void seekInternal(int pos)
{
position = pos;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/util/MemoryOutputStream.java
Wed Oct 5 22:59:31 2011
@@ -21,10 +21,9 @@ package org.apache.cassandra.io.util;
*/
+import java.io.IOException;
import java.io.OutputStream;
-import com.sun.jna.Memory;
-
/**
* This class provides a way to stream the writes into the {@link Memory}
*/
@@ -39,16 +38,20 @@ public class MemoryOutputStream extends
this.mem = mem;
}
- @Override
public void write(int b)
{
- mem.setByte(this.position, (byte)b);
- this.position++;
+ mem.setByte(position++, (byte) b);
}
-
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ mem.setBytes(position, b, off, len);
+ position += len;
+ }
+
public int position()
{
- return this.position;
+ return position;
}
-
}
Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
(original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Wed Oct
5 22:59:31 2011
@@ -544,13 +544,12 @@ commands:
Supported values are:
- ConcurrentLinkedHashCacheProvider
- - SerializingCacheProvider (requires JNA)
+ - SerializingCacheProvider
It is also valid to specify the fully-qualified class name to a class
that implements org.apache.cassandra.cache.IRowCacheProvider.
- row_cache_provider defaults to SerializingCacheProvider if you have JNA
- enabled, otherwise ConcurrentLinkedHashCacheProvider.
+ row_cache_provider defaults to SerializingCacheProvider.
SerializingCacheProvider serialises the contents of the row and stores
it in native memory, i.e., off the JVM Heap. Serialized rows take
significantly less memory than "live" rows in the JVM, so you can cache
@@ -805,19 +804,17 @@ commands:
Supported values are:
- ConcurrentLinkedHashCacheProvider
- - SerializingCacheProvider (requires JNA)
+ - SerializingCacheProvider
It is also valid to specify the fully-qualified class name to a class
that implements org.apache.cassandra.cache.IRowCacheProvider.
- row_cache_provider defaults to ConcurrentLinkedHashCacheProvider,
- but if you have JNA installed you should usually use
- SerializingCacheProvider, which serialises the contents of the
- row and stores it in native memory, i.e., off the JVM
- Heap. Serialized rows take significantly less memory than
- "live" rows in the JVM, so you can cache more rows in a given
- memory footprint. And storing the cache off-heap means you
- can use smaller heap sizes, reducing the impact of GC pauses.
+ row_cache_provider defaults to SerializingCacheProvider.
+ SerializingCacheProvider serialises the contents of the row and stores
+ it in native memory, i.e., off the JVM Heap. Serialized rows take
+ significantly less memory than "live" rows in the JVM, so you can cache
+ more rows in a given memory footprint. And storing the cache off-heap
+ means you can use smaller heap sizes, reducing the impact of GC pauses.
- compression_options: Options related to compression.
Options have the form {key:value}.
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java?rev=1179467&r1=1179466&r2=1179467&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
Wed Oct 5 22:59:31 2011
@@ -117,6 +117,6 @@ public class CacheProviderTest extends S
ICache<String, ColumnFamily> cache = new SerializingCache<String,
ColumnFamily>(CAPACITY, ColumnFamily.serializer(), tableName, cfName);
ColumnFamily cf = createCF();
simpleCase(cf, cache);
- // concurrentCase(cf, cache);
+ concurrentCase(cf, cache);
}
}