Author: jbellis
Date: Fri Dec 3 19:24:15 2010
New Revision: 1041961
URL: http://svn.apache.org/viewvc?rev=1041961&view=rev
Log:
copy bytebuffers forlocal writes toavoid retainingthe entire Thrift frame
patch by tjake; reviewed by jbellis for CASSANDRA-1801
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/ColumnDefinition.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Column.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DeletedColumn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ExpiringColumn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/IColumn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec 3 19:24:15 2010
@@ -29,6 +29,8 @@ dev
* improved validation of column_metadata (CASSANDRA-1813)
* reads at ConsistencyLevel > 1 throw UnavailableException
immediately if insufficient live nodes exist (CASSANDRA-1803)
+ * copy bytebuffers for local writes to avoid retaining the entire
+ Thrift frame (CASSANDRA-1801)
0.7.0-rc1
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/CFMetaData.java
Fri Dec 3 19:24:15 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.T
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.io.SerDeUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang.builder.EqualsBuilder;
@@ -751,7 +752,7 @@ public final class CFMetaData
org.apache.cassandra.avro.ColumnDef tcd = new
org.apache.cassandra.avro.ColumnDef();
tcd.index_name = cd.getIndexName();
tcd.index_type =
org.apache.cassandra.avro.IndexType.valueOf(cd.getIndexType().name());
- tcd.name = cd.name;
+ tcd.name = ByteBufferUtil.clone(cd.name);
tcd.validation_class = cd.validator.getClass().getName();
column_meta.add(tcd);
}
@@ -786,7 +787,7 @@ public final class CFMetaData
for (org.apache.cassandra.thrift.ColumnDef cdef :
def.getColumn_metadata())
{
org.apache.cassandra.avro.ColumnDef tdef = new
org.apache.cassandra.avro.ColumnDef();
- tdef.name = cdef.BufferForName();
+ tdef.name = ByteBufferUtil.clone(cdef.BufferForName());
tdef.validation_class = cdef.getValidation_class();
tdef.index_name = cdef.getIndex_name();
tdef.index_type = cdef.getIndex_type() == null ? null :
org.apache.cassandra.avro.IndexType.valueOf(cdef.getIndex_type().name());
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/ColumnDefinition.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/ColumnDefinition.java
Fri Dec 3 19:24:15 2010
@@ -31,6 +31,7 @@ import org.apache.avro.util.Utf8;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public class ColumnDefinition {
@@ -103,7 +104,7 @@ public class ColumnDefinition {
public static ColumnDefinition fromColumnDef(ColumnDef thriftColumnDef)
throws ConfigurationException
{
- return new ColumnDefinition(thriftColumnDef.name,
thriftColumnDef.validation_class, thriftColumnDef.index_type,
thriftColumnDef.index_name);
+ return new
ColumnDefinition(ByteBufferUtil.clone(thriftColumnDef.name),
thriftColumnDef.validation_class, thriftColumnDef.index_type,
thriftColumnDef.index_name);
}
public static ColumnDefinition
fromColumnDef(org.apache.cassandra.avro.ColumnDef avroColumnDef) throws
ConfigurationException
@@ -122,7 +123,7 @@ public class ColumnDefinition {
Map<ByteBuffer, ColumnDefinition> cds = new TreeMap<ByteBuffer,
ColumnDefinition>();
for (ColumnDef thriftColumnDef : thriftDefs)
- cds.put(thriftColumnDef.name, fromColumnDef(thriftColumnDef));
+ cds.put(ByteBufferUtil.clone(thriftColumnDef.name),
fromColumnDef(thriftColumnDef));
return Collections.unmodifiableMap(cds);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Column.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Column.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Column.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Column.java
Fri Dec 3 19:24:15 2010
@@ -210,6 +210,12 @@ public class Column implements IColumn
return result;
}
+ @Override
+ public IColumn deepCopy()
+ {
+ return new Column(ByteBufferUtil.clone(name),
ByteBufferUtil.clone(value), timestamp);
+ }
+
public String getString(AbstractType comparator)
{
StringBuilder sb = new StringBuilder();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DeletedColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DeletedColumn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DeletedColumn.java
Fri Dec 3 19:24:15 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,4 +56,10 @@ public class DeletedColumn extends Colum
{
return value.getInt(value.position()+value.arrayOffset() );
}
+
+ @Override
+ public IColumn deepCopy()
+ {
+ return new DeletedColumn(ByteBufferUtil.clone(name),
ByteBufferUtil.clone(value), timestamp);
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ExpiringColumn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ExpiringColumn.java
Fri Dec 3 19:24:15 2010
@@ -24,6 +24,7 @@ import java.security.MessageDigest;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.log4j.Logger;
/**
@@ -105,6 +106,12 @@ public class ExpiringColumn extends Colu
}
@Override
+ public IColumn deepCopy()
+ {
+ return new ExpiringColumn(ByteBufferUtil.clone(name),
ByteBufferUtil.clone(value), timestamp, timeToLive, localExpirationTime);
+ }
+
+ @Override
public String getString(AbstractType comparator)
{
StringBuilder sb = new StringBuilder();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/IColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/IColumn.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/IColumn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/IColumn.java
Fri Dec 3 19:24:15 2010
@@ -46,6 +46,9 @@ public interface IColumn
public int getLocalDeletionTime(); // for tombstone GC, so int is
sufficient granularity
public String getString(AbstractType comparator);
+ /** clones the column, making copies of any underlying byte buffers */
+ IColumn deepCopy();
+
/**
* For a simple column, live == !isMarkedForDelete.
* For a supercolumn, live means it has at least one subcolumn whose
timestamp is greater than the
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutation.java
Fri Dec 3 19:24:15 2010
@@ -40,6 +40,7 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.StringUtils;
@@ -330,6 +331,21 @@ public class RowMutation
rm.delete(new QueryPath(cfName, del.super_column), del.timestamp);
}
}
+
+ public RowMutation deepCopy()
+ {
+ RowMutation rm = new RowMutation(table_, ByteBufferUtil.clone(key_));
+
+ for (Map.Entry<Integer, ColumnFamily> e : modifications_.entrySet())
+ {
+ ColumnFamily cf = e.getValue().cloneMeShallow();
+ for (Map.Entry<ByteBuffer, IColumn> ce :
e.getValue().getColumnsMap().entrySet())
+ cf.addColumn(ce.getValue().deepCopy());
+ rm.modifications_.put(e.getKey(), cf);
+ }
+
+ return rm;
+ }
}
class RowMutationSerializer implements ICompactSerializer<RowMutation>
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SuperColumn.java
Fri Dec 3 19:24:15 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -286,6 +288,20 @@ public class SuperColumn implements ICol
this.localDeletionTime.set(localDeleteTime);
this.markedForDeleteAt.set(timestamp);
}
+
+ public IColumn deepCopy()
+ {
+ SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_),
this.getComparator());
+ sc.localDeletionTime = localDeletionTime;
+ sc.markedForDeleteAt = markedForDeleteAt;
+
+ for(Map.Entry<ByteBuffer, IColumn> c : columns_.entrySet())
+ {
+ sc.addColumn(c.getValue().deepCopy());
+ }
+
+ return sc;
+ }
public IColumn reconcile(IColumn c)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Dec 3 19:24:15 2010
@@ -200,7 +200,7 @@ public class StorageProxy implements Sto
{
public void runMayThrow() throws IOException
{
- rm.apply();
+ rm.deepCopy().apply();
responseHandler.response(null);
}
};
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1041961&r1=1041960&r2=1041961&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
Fri Dec 3 19:24:15 2010
@@ -60,8 +60,8 @@ import java.nio.charset.Charset;
* }
*
*/
-public class ByteBufferUtil {
-
+public class ByteBufferUtil
+{
public static int compareUnsigned(ByteBuffer o1, ByteBuffer o2)
{
return FBUtilities.compareUnsigned(o1.array(), o2.array(),
o1.arrayOffset()+o1.position(), o2.arrayOffset()+o2.position(),
o1.limit()+o1.arrayOffset(), o2.limit()+o2.arrayOffset());
@@ -98,4 +98,14 @@ public class ByteBufferUtil {
throw new RuntimeException(e);
}
}
+
+ public static ByteBuffer clone(ByteBuffer o)
+ {
+ ByteBuffer clone = ByteBuffer.allocate(o.remaining());
+ o.mark();
+ clone.put(o);
+ o.reset();
+ clone.flip();
+ return clone;
+ }
}