remove Thrift from intra-cluster message serialization
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490a0998 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490a0998 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490a0998 Branch: refs/heads/trunk Commit: 490a0998738d07f72257a637752557a2a9626a8c Parents: 3fd08dd Author: Jonathan Ellis <[email protected]> Authored: Sat May 19 11:43:23 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon May 21 15:44:59 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/db/IndexScanCommand.java | 13 +- .../org/apache/cassandra/db/RangeSliceCommand.java | 142 ++++++++++++--- src/java/org/apache/cassandra/db/TypeSizes.java | 12 ++ .../org/apache/cassandra/utils/FBUtilities.java | 4 +- 4 files changed, 141 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/IndexScanCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IndexScanCommand.java b/src/java/org/apache/cassandra/db/IndexScanCommand.java index 859d1d4..ada0c5d 100644 --- a/src/java/org/apache/cassandra/db/IndexScanCommand.java +++ b/src/java/org/apache/cassandra/db/IndexScanCommand.java @@ -18,19 +18,18 @@ package org.apache.cassandra.db; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.thrift.IndexClause; +import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.thrift.TDeserializer; import org.apache.thrift.TSerializer; @@ -64,6 +63,8 @@ public class IndexScanCommand { public void serialize(IndexScanCommand o, DataOutput out, int version) throws IOException { + assert version < MessagingService.VERSION_12; // 1.2 only uses RangeScanCommand + out.writeUTF(o.keyspace); out.writeUTF(o.column_family); TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); @@ -74,13 +75,15 @@ public class IndexScanCommand public IndexScanCommand deserialize(DataInput in, int version) throws IOException { + assert version < MessagingService.VERSION_12; // 1.2 only uses RangeScanCommand + String keyspace = in.readUTF(); String columnFamily = in.readUTF(); - TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory()); IndexClause indexClause = new IndexClause(); - FBUtilities.deserialize(dser, indexClause, in); SlicePredicate predicate = new SlicePredicate(); + TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory()); + FBUtilities.deserialize(dser, indexClause, in); FBUtilities.deserialize(dser, predicate, in); AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in, version).toRowBounds(); return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate, range); http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index 8516e06..7c59758 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -47,10 +47,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.IReadCommand; -import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.IndexExpression; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.thrift.TDeserializer; @@ -149,8 +146,30 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm if (sc != null) ByteBufferUtil.write(sc, dos); - TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); - FBUtilities.serialize(ser, sliceCommand.predicate, dos); + if (version < MessagingService.VERSION_12) + { + FBUtilities.serialize(new TSerializer(new TBinaryProtocol.Factory()), sliceCommand.predicate, dos); + } + else + { + SliceRange range = sliceCommand.predicate.slice_range; + if (range != null) + { + dos.writeByte(0); + ByteBufferUtil.writeWithShortLength(range.start, dos); + ByteBufferUtil.writeWithShortLength(range.finish, dos); + dos.writeBoolean(range.reversed); + dos.writeInt(range.count); + } + else + { + dos.writeByte(1); + List<ByteBuffer> columns = sliceCommand.predicate.column_names; + dos.writeInt(columns.size()); + for (ByteBuffer column : columns) + ByteBufferUtil.writeWithShortLength(column, dos); + } + } if (version >= MessagingService.VERSION_11) { @@ -162,7 +181,18 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm { dos.writeInt(sliceCommand.row_filter.size()); for (IndexExpression expr : sliceCommand.row_filter) - FBUtilities.serialize(ser, expr, dos); + { + if (version < MessagingService.VERSION_12) + { + FBUtilities.serialize(new TSerializer(new TBinaryProtocol.Factory()), expr, dos); + } + else + { + ByteBufferUtil.writeWithShortLength(expr.column_name, dos); + dos.writeInt(expr.op.getValue()); + ByteBufferUtil.writeWithLength(expr.value, dos); + } + } } } AbstractBounds.serializer.serialize(sliceCommand.range, dos, version); @@ -188,9 +218,31 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm superColumn = ByteBuffer.wrap(buf); } - TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory()); SlicePredicate pred = new SlicePredicate(); - FBUtilities.deserialize(dser, pred, dis); + if (version < MessagingService.VERSION_12) + { + FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred, dis); + } + else + { + int type = dis.readByte(); + if (type == 0) + { + pred.slice_range = new SliceRange(ByteBufferUtil.readWithShortLength(dis), + ByteBufferUtil.readWithShortLength(dis), + dis.readBoolean(), + dis.readInt()); + } + else + { + assert type == 1; + int count = dis.readInt(); + List<ByteBuffer> columns = new ArrayList<ByteBuffer>(count); + for (int i = 0; i < count; i++) + columns.add(ByteBufferUtil.readWithShortLength(dis)); + pred.column_names = columns; + } + } List<IndexExpression> rowFilter = null; if (version >= MessagingService.VERSION_11) @@ -199,8 +251,18 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm rowFilter = new ArrayList<IndexExpression>(filterCount); for (int i = 0; i < filterCount; i++) { - IndexExpression expr = new IndexExpression(); - FBUtilities.deserialize(dser, expr, dis); + IndexExpression expr; + if (version < MessagingService.VERSION_12) + { + expr = new IndexExpression(); + FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), expr, dis); + } + else + { + expr = new IndexExpression(ByteBufferUtil.readWithShortLength(dis), + IndexOperator.findByValue(dis.readInt()), + ByteBufferUtil.readWithShortLength(dis)); + } rowFilter.add(expr); } } @@ -233,16 +295,39 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm size += TypeSizes.NATIVE.sizeof(0); } - TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); - try + if (version < MessagingService.VERSION_12) { - int predicateLength = ser.serialize(rsc.predicate).length; - size += TypeSizes.NATIVE.sizeof(predicateLength); - size += predicateLength; + TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); + try + { + int predicateLength = ser.serialize(rsc.predicate).length; + if (version < MessagingService.VERSION_12) + size += TypeSizes.NATIVE.sizeof(predicateLength); + size += predicateLength; + } + catch (TException e) + { + throw new RuntimeException(e); + } } - catch (TException e) + else { - throw new RuntimeException(e); + SliceRange range = rsc.predicate.slice_range; + size += 1; + if (range != null) + { + size += TypeSizes.NATIVE.sizeofWithShortLength(range.start); + size += TypeSizes.NATIVE.sizeofWithShortLength(range.finish); + size += TypeSizes.NATIVE.sizeof(range.reversed); + size += TypeSizes.NATIVE.sizeof(range.count); + } + else + { + List<ByteBuffer> columns = rsc.predicate.column_names; + size += TypeSizes.NATIVE.sizeof(columns.size()); + for (ByteBuffer column : columns) + size += TypeSizes.NATIVE.sizeofWithShortLength(column); + } } if (version >= MessagingService.VERSION_11) @@ -256,15 +341,24 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm size += TypeSizes.NATIVE.sizeof(rsc.row_filter.size()); for (IndexExpression expr : rsc.row_filter) { - try + if (version < MessagingService.VERSION_12) { - int filterLength = ser.serialize(expr).length; - size += TypeSizes.NATIVE.sizeof(filterLength); - size += filterLength; + try + { + int filterLength = new TSerializer(new TBinaryProtocol.Factory()).serialize(expr).length; + size += TypeSizes.NATIVE.sizeof(filterLength); + size += filterLength; + } + catch (TException e) + { + throw new RuntimeException(e); + } } - catch (TException e) + else { - throw new RuntimeException(e); + size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name); + size += TypeSizes.NATIVE.sizeof(expr.op.getValue()); + size += TypeSizes.NATIVE.sizeofWithLength(expr.value); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/TypeSizes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java index 26c9f7d..aac89d0 100644 --- a/src/java/org/apache/cassandra/db/TypeSizes.java +++ b/src/java/org/apache/cassandra/db/TypeSizes.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.db; +import java.nio.ByteBuffer; + import org.apache.cassandra.utils.FBUtilities; public abstract class TypeSizes @@ -59,6 +61,16 @@ public abstract class TypeSizes return utflen; } + public int sizeofWithShortLength(ByteBuffer value) + { + return sizeof((short) value.remaining()) + value.remaining(); + } + + public int sizeofWithLength(ByteBuffer value) + { + return sizeof(value.remaining()) + value.remaining(); + } + public static class NativeDBTypeSizes extends TypeSizes { public int sizeof(boolean value) http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 063d577..8fa32da 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; import org.apache.commons.lang.StringUtils; @@ -42,7 +43,6 @@ import org.apache.cassandra.cache.IRowCacheProvider; import org.apache.cassandra.concurrent.CreationTimeAwareFuture; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -240,6 +240,7 @@ public class FBUtilities new File(tmpFilename).renameTo(new File(filename)); } + @Deprecated public static void serialize(TSerializer serializer, TBase struct, DataOutput out) throws IOException { @@ -259,6 +260,7 @@ public class FBUtilities out.write(bytes); } + @Deprecated public static void deserialize(TDeserializer deserializer, TBase struct, DataInput in) throws IOException {
