Repository: cassandra Updated Branches: refs/heads/trunk 6145d50f5 -> 806facc8c
Token serialization should accept partitioner explicitly patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-8268 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/806facc8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/806facc8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/806facc8 Branch: refs/heads/trunk Commit: 806facc8ca87a8d1f6fa14056c68ac43dc5bde5c Parents: 6145d50 Author: Branimir Lambov <[email protected]> Authored: Fri Jan 30 00:46:44 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri Jan 30 00:48:12 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../apache/cassandra/db/PagedRangeCommand.java | 3 +- .../apache/cassandra/db/RangeSliceCommand.java | 3 +- .../org/apache/cassandra/db/RowPosition.java | 16 ++--- .../apache/cassandra/dht/AbstractBounds.java | 33 +++++------ .../dht/IPartitionerDependentSerializer.java | 61 ++++++++++++++++++++ src/java/org/apache/cassandra/dht/Token.java | 19 +++--- .../apache/cassandra/net/MessagingService.java | 14 +++++ .../apache/cassandra/repair/RepairJobDesc.java | 3 +- .../repair/messages/AnticompactionRequest.java | 10 +++- .../repair/messages/PrepareMessage.java | 8 ++- .../repair/messages/RepairMessage.java | 2 +- .../cassandra/repair/messages/SyncRequest.java | 6 +- .../cassandra/streaming/StreamRequest.java | 14 +++-- .../org/apache/cassandra/utils/MerkleTree.java | 43 +++++++------- 15 files changed, 163 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0cd0b4d..a85a6e7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268) * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657) * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) @@ -137,8 +138,6 @@ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) * Fix sstableupgrade throws exception (CASSANDRA-8688) Merged from 2.0: -======= -2.0.13: * Fix SSTableSimpleUnsortedWriter ConcurrentModificationException (CASSANDRA-8619) * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645) * Add batch remove iterator to ABSC (CASSANDRA-8414, 8666) http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/PagedRangeCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java index 614f0f7..ebedecf 100644 --- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java +++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java @@ -129,6 +129,7 @@ public class PagedRangeCommand extends AbstractRangeCommand out.writeUTF(cmd.columnFamily); out.writeLong(cmd.timestamp); + MessagingService.validatePartitioner(cmd.keyRange); AbstractBounds.serializer.serialize(cmd.keyRange, out, version); CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily); @@ -158,7 +159,7 @@ public class PagedRangeCommand extends AbstractRangeCommand String columnFamily = in.readUTF(); long timestamp = in.readLong(); - AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds(); + AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toRowBounds(); CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index 4d2955b..6009524 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -172,6 +172,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm expr.writeTo(out); } } + MessagingService.validatePartitioner(sliceCommand.keyRange); AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version); out.writeInt(sliceCommand.maxResults); out.writeBoolean(sliceCommand.countCQL3Rows); @@ -195,7 +196,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm { rowFilter.add(IndexExpression.readFrom(in)); } - AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in, version).toRowBounds(); + AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toRowBounds(); int maxResults = in.readInt(); boolean countCQL3Rows = in.readBoolean(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RowPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java index 3bcd627..3fa0465 100644 --- a/src/java/org/apache/cassandra/db/RowPosition.java +++ b/src/java/org/apache/cassandra/db/RowPosition.java @@ -56,7 +56,7 @@ public interface RowPosition extends RingPosition<RowPosition> public Kind kind(); public boolean isMinimum(); - public static class RowPositionSerializer implements ISerializer<RowPosition> + public static class RowPositionSerializer implements IPartitionerDependentSerializer<RowPosition> { /* * We need to be able to serialize both Token.KeyBound and @@ -69,17 +69,17 @@ public interface RowPosition extends RingPosition<RowPosition> * token is recreated on the other side). In the other cases, we then * serialize the token. */ - public void serialize(RowPosition pos, DataOutputPlus out) throws IOException + public void serialize(RowPosition pos, DataOutputPlus out, int version) throws IOException { Kind kind = pos.kind(); out.writeByte(kind.ordinal()); if (kind == Kind.ROW_KEY) ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out); else - Token.serializer.serialize(pos.getToken(), out); + Token.serializer.serialize(pos.getToken(), out, version); } - public RowPosition deserialize(DataInput in) throws IOException + public RowPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException { Kind kind = Kind.fromOrdinal(in.readByte()); if (kind == Kind.ROW_KEY) @@ -89,23 +89,23 @@ public interface RowPosition extends RingPosition<RowPosition> } else { - Token t = Token.serializer.deserialize(in); + Token t = Token.serializer.deserialize(in, p, version); return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound(); } } - public long serializedSize(RowPosition pos, TypeSizes typeSizes) + public long serializedSize(RowPosition pos, int version) { Kind kind = pos.kind(); int size = 1; // 1 byte for enum if (kind == Kind.ROW_KEY) { int keySize = ((DecoratedKey)pos).getKey().remaining(); - size += typeSizes.sizeof((short) keySize) + keySize; + size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize; } else { - size += Token.serializer.serializedSize(pos.getToken(), typeSizes); + size += Token.serializer.serializedSize(pos.getToken(), version); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java index c7a3505..f045acf 100644 --- a/src/java/org/apache/cassandra/dht/AbstractBounds.java +++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java @@ -20,13 +20,12 @@ package org.apache.cassandra.dht; import java.io.DataInput; import java.io.IOException; import java.io.Serializable; -import java.util.*; +import java.util.List; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.Pair; @@ -122,7 +121,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria public abstract AbstractBounds<T> withNewRight(T newRight); - public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>> + public static class AbstractBoundsSerializer implements IPartitionerDependentSerializer<AbstractBounds<?>> { public void serialize(AbstractBounds<?> range, DataOutputPlus out, int version) throws IOException { @@ -133,13 +132,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria out.writeInt(kindInt(range)); if (range.left instanceof Token) { - Token.serializer.serialize((Token) range.left, out); - Token.serializer.serialize((Token) range.right, out); + Token.serializer.serialize((Token) range.left, out, version); + Token.serializer.serialize((Token) range.right, out, version); } else { - RowPosition.serializer.serialize((RowPosition) range.left, out); - RowPosition.serializer.serialize((RowPosition) range.right, out); + RowPosition.serializer.serialize((RowPosition) range.left, out, version); + RowPosition.serializer.serialize((RowPosition) range.right, out, version); } } @@ -151,7 +150,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria return kind; } - public AbstractBounds<?> deserialize(DataInput in, int version) throws IOException + public AbstractBounds<?> deserialize(DataInput in, IPartitioner p, int version) throws IOException { int kind = in.readInt(); boolean isToken = kind >= 0; @@ -161,13 +160,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria RingPosition<?> left, right; if (isToken) { - left = Token.serializer.deserialize(in); - right = Token.serializer.deserialize(in); + left = Token.serializer.deserialize(in, p, version); + right = Token.serializer.deserialize(in, p, version); } else { - left = RowPosition.serializer.deserialize(in); - right = RowPosition.serializer.deserialize(in); + left = RowPosition.serializer.deserialize(in, p, version); + right = RowPosition.serializer.deserialize(in, p, version); } if (kind == Type.RANGE.ordinal()) @@ -180,13 +179,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria int size = TypeSizes.NATIVE.sizeof(kindInt(ab)); if (ab.left instanceof Token) { - size += Token.serializer.serializedSize((Token) ab.left, TypeSizes.NATIVE); - size += Token.serializer.serializedSize((Token) ab.right, TypeSizes.NATIVE); + size += Token.serializer.serializedSize((Token) ab.left, version); + size += Token.serializer.serializedSize((Token) ab.right, version); } else { - size += RowPosition.serializer.serializedSize((RowPosition) ab.left, TypeSizes.NATIVE); - size += RowPosition.serializer.serializedSize((RowPosition) ab.right, TypeSizes.NATIVE); + size += RowPosition.serializer.serializedSize((RowPosition) ab.left, version); + size += RowPosition.serializer.serializedSize((RowPosition) ab.right, version); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java new file mode 100644 index 0000000..3a9a768 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.cassandra.io.util.DataOutputPlus; + +/** + * Versioned serializer where the serialization depends on partitioner. + * + * On serialization the partitioner is given by the entity being serialized. To deserialize the partitioner used must + * be known to the calling method. + */ +public interface IPartitionerDependentSerializer<T> +{ + /** + * Serialize the specified type into the specified DataOutputStream instance. + * + * @param t type that needs to be serialized + * @param out DataOutput into which serialization needs to happen. + * @param version protocol version + * @throws java.io.IOException if serialization fails + */ + public void serialize(T t, DataOutputPlus out, int version) throws IOException; + + /** + * Deserialize into the specified DataInputStream instance. + * @param in DataInput from which deserialization needs to happen. + * @param p Partitioner that will be used to construct tokens. Needs to match the partitioner that was used to + * serialize the token. + * @param version protocol version + * @return the type that was deserialized + * @throws IOException if deserialization fails + */ + public T deserialize(DataInput in, IPartitioner p, int version) throws IOException; + + /** + * Calculate serialized size of object without actually serializing. + * @param t object to calculate serialized size + * @param version protocol version + * @return serialized size of object t + */ + public long serializedSize(T t, int version); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java index 719fd46..76918a7 100644 --- a/src/java/org/apache/cassandra/dht/Token.java +++ b/src/java/org/apache/cassandra/dht/Token.java @@ -22,12 +22,10 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; public abstract class Token implements RingPosition<Token>, Serializable @@ -46,27 +44,26 @@ public abstract class Token implements RingPosition<Token>, Serializable public abstract void validate(String token) throws ConfigurationException; } - public static class TokenSerializer implements ISerializer<Token> + public static class TokenSerializer implements IPartitionerDependentSerializer<Token> { - public void serialize(Token token, DataOutputPlus out) throws IOException + public void serialize(Token token, DataOutputPlus out, int version) throws IOException { - IPartitioner p = StorageService.getPartitioner(); + IPartitioner p = token.getPartitioner(); ByteBuffer b = p.getTokenFactory().toByteArray(token); ByteBufferUtil.writeWithLength(b, out); } - public Token deserialize(DataInput in) throws IOException + public Token deserialize(DataInput in, IPartitioner p, int version) throws IOException { - IPartitioner p = StorageService.getPartitioner(); int size = in.readInt(); byte[] bytes = new byte[size]; in.readFully(bytes); return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes)); } - public long serializedSize(Token object, TypeSizes typeSizes) + public long serializedSize(Token object, int version) { - IPartitioner p = StorageService.getPartitioner(); + IPartitioner p = object.getPartitioner(); ByteBuffer b = p.getTokenFactory().toByteArray(object); return TypeSizes.NATIVE.sizeof(b.remaining()) + b.remaining(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index b33cf81..c333b04 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -35,6 +35,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import org.cliffc.high_scale_lib.NonBlockingHashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,9 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.*; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.EchoMessage; import org.apache.cassandra.gms.GossipDigestAck; @@ -1039,4 +1042,15 @@ public final class MessagingService implements MessagingServiceMBean } return result; } + + public static IPartitioner globalPartitioner() + { + return DatabaseDescriptor.getPartitioner(); + } + + public static void validatePartitioner(AbstractBounds<?> bounds) + { + if (globalPartitioner() != bounds.left.getPartitioner()) + throw new AssertionError(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/RepairJobDesc.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 5ce5969..c4a713d 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -100,6 +100,7 @@ public class RepairJobDesc UUIDSerializer.serializer.serialize(desc.sessionId, out, version); out.writeUTF(desc.keyspace); out.writeUTF(desc.columnFamily); + MessagingService.validatePartitioner(desc.range); AbstractBounds.serializer.serialize(desc.range, out, version); } @@ -114,7 +115,7 @@ public class RepairJobDesc UUID sessionId = UUIDSerializer.serializer.deserialize(in, version); String keyspace = in.readUTF(); String columnFamily = in.readUTF(); - Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version); + Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds(); return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java index 239ab0e..455e5fb 100644 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; public class AnticompactionRequest extends RepairMessage @@ -51,8 +52,11 @@ public class AnticompactionRequest extends RepairMessage { UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); out.writeInt(message.successfulRanges.size()); - for (Range r : message.successfulRanges) + for (Range<Token> r : message.successfulRanges) + { + MessagingService.validatePartitioner(r); Range.serializer.serialize(r, out, version); + } } public AnticompactionRequest deserialize(DataInput in, int version) throws IOException @@ -61,14 +65,14 @@ public class AnticompactionRequest extends RepairMessage int rangeCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangeCount); for (int i = 0; i < rangeCount; i++) - ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds()); + ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds()); return new AnticompactionRequest(parentRepairSession, ranges); } public long serializedSize(AnticompactionRequest message, int version) { long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); - for (Range r : message.successfulRanges) + for (Range<Token> r : message.successfulRanges) size += Range.serializer.serializedSize(r, version); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java index 035ccc4..d63bf70 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDSerializer; @@ -58,8 +59,11 @@ public class PrepareMessage extends RepairMessage UUIDSerializer.serializer.serialize(cfId, out, version); UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); out.writeInt(message.ranges.size()); - for (Range r : message.ranges) + for (Range<Token> r : message.ranges) + { + MessagingService.validatePartitioner(r); Range.serializer.serialize(r, out, version); + } out.writeBoolean(message.isIncremental); } @@ -73,7 +77,7 @@ public class PrepareMessage extends RepairMessage int rangeCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangeCount); for (int i = 0; i < rangeCount; i++) - ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds()); + ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds()); boolean isIncremental = in.readBoolean(); return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index d500928..6af3bb3 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -81,7 +81,7 @@ public abstract class RepairMessage return new MessageOut<>(MessagingService.Verb.REPAIR_MESSAGE, this, RepairMessage.serializer); } - public static class RepairMessageSerializer implements IVersionedSerializer<RepairMessage> + public static class RepairMessageSerializer implements MessageSerializer<RepairMessage> { public void serialize(RepairMessage message, DataOutputPlus out, int version) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index c4d0ab6..077132a 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairJobDesc; /** @@ -66,7 +67,10 @@ public class SyncRequest extends RepairMessage CompactEndpointSerializationHelper.serialize(message.dst, out); out.writeInt(message.ranges.size()); for (Range<Token> range : message.ranges) + { + MessagingService.validatePartitioner(range); AbstractBounds.serializer.serialize(range, out, version); + } } public SyncRequest deserialize(DataInput in, int version) throws IOException @@ -78,7 +82,7 @@ public class SyncRequest extends RepairMessage int rangesCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangesCount); for (int i = 0; i < rangesCount; ++i) - ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds()); + ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, MessagingService.globalPartitioner(), version).toTokenBounds()); return new SyncRequest(desc, owner, src, dst, ranges); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/streaming/StreamRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java index 9c5b974..0fe40cf 100644 --- a/src/java/org/apache/cassandra/streaming/StreamRequest.java +++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java @@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; public class StreamRequest { @@ -55,8 +56,9 @@ public class StreamRequest out.writeInt(request.ranges.size()); for (Range<Token> range : request.ranges) { - Token.serializer.serialize(range.left, out); - Token.serializer.serialize(range.right, out); + MessagingService.validatePartitioner(range); + Token.serializer.serialize(range.left, out, version); + Token.serializer.serialize(range.right, out, version); } out.writeInt(request.columnFamilies.size()); for (String cf : request.columnFamilies) @@ -71,8 +73,8 @@ public class StreamRequest List<Range<Token>> ranges = new ArrayList<>(rangeCount); for (int i = 0; i < rangeCount; i++) { - Token left = Token.serializer.deserialize(in); - Token right = Token.serializer.deserialize(in); + Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); + Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version); ranges.add(new Range<>(left, right)); } int cfCount = in.readInt(); @@ -89,8 +91,8 @@ public class StreamRequest size += TypeSizes.NATIVE.sizeof(request.ranges.size()); for (Range<Token> range : request.ranges) { - size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE); - size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE); + size += Token.serializer.serializedSize(range.left, version); + size += Token.serializer.serializedSize(range.right, version); } size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size()); for (String cf : request.columnFamilies) http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/utils/MerkleTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java index 394b12a..4fec62d 100644 --- a/src/java/org/apache/cassandra/utils/MerkleTree.java +++ b/src/java/org/apache/cassandra/utils/MerkleTree.java @@ -28,6 +28,7 @@ import com.google.common.collect.PeekingIterator; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.IPartitionerDependentSerializer; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -85,8 +86,8 @@ public class MerkleTree implements Serializable out.writeLong(mt.size); out.writeUTF(mt.partitioner.getClass().getCanonicalName()); // full range - Token.serializer.serialize(mt.fullRange.left, out); - Token.serializer.serialize(mt.fullRange.right, out); + Token.serializer.serialize(mt.fullRange.left, out, version); + Token.serializer.serialize(mt.fullRange.right, out, version); Hashable.serializer.serialize(mt.root, out, version); } @@ -106,13 +107,13 @@ public class MerkleTree implements Serializable } // full range - Token left = Token.serializer.deserialize(in); - Token right = Token.serializer.deserialize(in); + Token left = Token.serializer.deserialize(in, partitioner, version); + Token right = Token.serializer.deserialize(in, partitioner, version); Range<Token> fullRange = new Range<>(left, right); MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize); mt.size = size; - mt.root = Hashable.serializer.deserialize(in, version); + mt.root = Hashable.serializer.deserialize(in, partitioner, version); return mt; } @@ -124,8 +125,8 @@ public class MerkleTree implements Serializable + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName()); // full range - size += Token.serializer.serializedSize(mt.fullRange.left, TypeSizes.NATIVE); - size += Token.serializer.serializedSize(mt.fullRange.right, TypeSizes.NATIVE); + size += Token.serializer.serializedSize(mt.fullRange.left, version); + size += Token.serializer.serializedSize(mt.fullRange.right, version); size += Hashable.serializer.serializedSize(mt.root, version); return size; @@ -811,7 +812,7 @@ public class MerkleTree implements Serializable return buff.toString(); } - private static class InnerSerializer implements IVersionedSerializer<Inner> + private static class InnerSerializer implements IPartitionerDependentSerializer<Inner> { public void serialize(Inner inner, DataOutputPlus out, int version) throws IOException { @@ -822,20 +823,20 @@ public class MerkleTree implements Serializable out.writeInt(inner.hash.length); out.write(inner.hash); } - Token.serializer.serialize(inner.token, out); + Token.serializer.serialize(inner.token, out, version); Hashable.serializer.serialize(inner.lchild, out, version); Hashable.serializer.serialize(inner.rchild, out, version); } - public Inner deserialize(DataInput in, int version) throws IOException + public Inner deserialize(DataInput in, IPartitioner p, int version) throws IOException { int hashLen = in.readInt(); byte[] hash = hashLen >= 0 ? new byte[hashLen] : null; if (hash != null) in.readFully(hash); - Token token = Token.serializer.deserialize(in); - Hashable lchild = Hashable.serializer.deserialize(in, version); - Hashable rchild = Hashable.serializer.deserialize(in, version); + Token token = Token.serializer.deserialize(in, p, version); + Hashable lchild = Hashable.serializer.deserialize(in, p, version); + Hashable rchild = Hashable.serializer.deserialize(in, p, version); return new Inner(token, lchild, rchild); } @@ -845,7 +846,7 @@ public class MerkleTree implements Serializable ? TypeSizes.NATIVE.sizeof(-1) : TypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length; - size += Token.serializer.serializedSize(inner.token, TypeSizes.NATIVE) + size += Token.serializer.serializedSize(inner.token, version) + Hashable.serializer.serializedSize(inner.lchild, version) + Hashable.serializer.serializedSize(inner.rchild, version); return size; @@ -892,7 +893,7 @@ public class MerkleTree implements Serializable return "#<Leaf " + Hashable.toString(hash()) + ">"; } - private static class LeafSerializer implements IVersionedSerializer<Leaf> + private static class LeafSerializer implements IPartitionerDependentSerializer<Leaf> { public void serialize(Leaf leaf, DataOutputPlus out, int version) throws IOException { @@ -907,7 +908,7 @@ public class MerkleTree implements Serializable } } - public Leaf deserialize(DataInput in, int version) throws IOException + public Leaf deserialize(DataInput in, IPartitioner p, int version) throws IOException { int hashLen = in.readInt(); byte[] hash = hashLen < 0 ? null : new byte[hashLen]; @@ -955,7 +956,7 @@ public class MerkleTree implements Serializable static abstract class Hashable implements Serializable { private static final long serialVersionUID = 1L; - private static final IVersionedSerializer<Hashable> serializer = new HashableSerializer(); + private static final IPartitionerDependentSerializer<Hashable> serializer = new HashableSerializer(); protected byte[] hash; protected long sizeOfRange; @@ -1033,7 +1034,7 @@ public class MerkleTree implements Serializable return "[" + Hex.bytesToHex(hash) + "]"; } - private static class HashableSerializer implements IVersionedSerializer<Hashable> + private static class HashableSerializer implements IPartitionerDependentSerializer<Hashable> { public void serialize(Hashable h, DataOutputPlus out, int version) throws IOException { @@ -1051,13 +1052,13 @@ public class MerkleTree implements Serializable throw new IOException("Unexpected Hashable: " + h.getClass().getCanonicalName()); } - public Hashable deserialize(DataInput in, int version) throws IOException + public Hashable deserialize(DataInput in, IPartitioner p, int version) throws IOException { byte ident = in.readByte(); if (Inner.IDENT == ident) - return Inner.serializer.deserialize(in, version); + return Inner.serializer.deserialize(in, p, version); else if (Leaf.IDENT == ident) - return Leaf.serializer.deserialize(in, version); + return Leaf.serializer.deserialize(in, p, version); else throw new IOException("Unexpected Hashable: " + ident); }
