This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit be1015dcc229f20bcd6b8ba5d2f52700d8793f72 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Thu May 9 14:26:29 2024 -0500 make sure StreamSummarySerializer uses the appropriate IPartitioner during deserialization --- .../dht/IPartitionerDependentSerializer.java | 25 +++++---------------- src/java/org/apache/cassandra/dht/Token.java | 16 +++++++++++++ .../io/IVersionedAsymmetricSerializer.java | 6 ++--- .../cassandra/repair/messages/SyncResponse.java | 9 ++++---- .../apache/cassandra/streaming/SessionSummary.java | 4 ++-- .../streaming/StreamDeserializingTask.java | 3 +-- .../apache/cassandra/streaming/StreamSummary.java | 20 ++++++++++++----- .../streaming/messages/CompleteMessage.java | 3 +-- .../streaming/messages/IncomingStreamMessage.java | 3 +-- .../streaming/messages/KeepAliveMessage.java | 3 +-- .../streaming/messages/OutgoingStreamMessage.java | 3 +-- .../streaming/messages/PrepareAckMessage.java | 3 +-- .../streaming/messages/PrepareSynAckMessage.java | 5 ++--- .../streaming/messages/PrepareSynMessage.java | 5 ++--- .../streaming/messages/ReceivedMessage.java | 3 +-- .../streaming/messages/SessionFailedMessage.java | 3 +-- .../streaming/messages/StreamInitMessage.java | 3 +-- .../streaming/messages/StreamMessage.java | 7 +++--- .../serialization/5.1/service.SyncComplete.bin | Bin 346 -> 346 bytes .../test/tcm/RepairMetadataKeyspaceTest.java | 2 ++ .../messages/RepairMessageSerializationsTest.java | 19 ++++++++++++---- .../cassandra/service/SerializationsTest.java | 8 ++++--- .../async/StreamingInboundHandlerTest.java | 4 +--- 23 files changed, 86 insertions(+), 71 deletions(-) diff --git a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java index 5c75788c0b..a70eb83771 100644 --- a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java +++ b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java @@ -19,8 +19,8 @@ package org.apache.cassandra.dht; import java.io.IOException; +import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; /** * Versioned serializer where the serialization depends on partitioner. @@ -28,18 +28,8 @@ import org.apache.cassandra.io.util.DataOutputPlus; * On serialization the partitioner is given by the entity being serialized. To deserialize the partitioner used must * be known to the calling method. */ -public interface IPartitionerDependentSerializer<T> +public interface IPartitionerDependentSerializer<T> extends IVersionedSerializer<T> { - /** - * Serialize the specified type into the specified DataOutputStream instance. - * - * @param t type that needs to be serialized - * @param out DataOutput into which serialization needs to happen. - * @param version protocol version - * @throws java.io.IOException if serialization fails - */ - public void serialize(T t, DataOutputPlus out, int version) throws IOException; - /** * Deserialize into the specified DataInputStream instance. * @param in DataInput from which deserialization needs to happen. @@ -51,11 +41,8 @@ public interface IPartitionerDependentSerializer<T> */ public T deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException; - /** - * Calculate serialized size of object without actually serializing. - * @param t object to calculate serialized size - * @param version protocol version - * @return serialized size of object t - */ - public long serializedSize(T t, int version); + default T deserialize(DataInputPlus in, int version) throws IOException + { + return deserialize(in, null, version); + } } diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java index fa886410c6..41c617fbc0 100644 --- a/src/java/org/apache/cassandra/dht/Token.java +++ b/src/java/org/apache/cassandra/dht/Token.java @@ -20,6 +20,12 @@ package org.apache.cassandra.dht; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.TypeSizes; @@ -35,6 +41,8 @@ import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse; public abstract class Token implements RingPosition<Token>, Serializable { + private static final Logger logger = LoggerFactory.getLogger(Token.class); + private static final long serialVersionUID = 1L; public static final TokenSerializer serializer = new TokenSerializer(); @@ -178,11 +186,17 @@ public abstract class Token implements RingPosition<Token>, Serializable } } + public static volatile boolean logPartitioner = false; + public static final Set<Class<? extends IPartitioner>> serializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>()); + public static final Set<Class<? extends IPartitioner>> deserializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>()); + public static class CompactTokenSerializer implements IPartitionerDependentSerializer<Token> { public void serialize(Token token, DataOutputPlus out, int version) throws IOException { IPartitioner p = token.getPartitioner(); + if (logPartitioner && serializePartitioners.add(p.getClass())) + logger.debug("Serializing token with partitioner " + p); if (!p.isFixedLength()) out.writeUnsignedVInt32(p.getTokenFactory().byteSize(token)); p.getTokenFactory().serialize(token, out); @@ -191,6 +205,8 @@ public abstract class Token implements RingPosition<Token>, Serializable public Token deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException { int size = p.isFixedLength() ? p.getMaxTokenSize() : in.readUnsignedVInt32(); + if (logPartitioner && deserializePartitioners.add(p.getClass())) + logger.debug("Deserializing token with partitioner " + p); byte[] bytes = new byte[size]; in.readFully(bytes); return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes)); diff --git a/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java b/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java index 8ad2c285c3..ff89110e33 100644 --- a/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java +++ b/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java @@ -32,7 +32,7 @@ public interface IVersionedAsymmetricSerializer<In, Out> * @param version protocol version * @throws IOException if serialization fails */ - public void serialize(In t, DataOutputPlus out, int version) throws IOException; + void serialize(In t, DataOutputPlus out, int version) throws IOException; /** * Deserialize into the specified DataInputStream instance. @@ -41,7 +41,7 @@ public interface IVersionedAsymmetricSerializer<In, Out> * @return the type that was deserialized * @throws IOException if deserialization fails */ - public Out deserialize(DataInputPlus in, int version) throws IOException; + Out deserialize(DataInputPlus in, int version) throws IOException; /** * Calculate serialized size of object without actually serializing. @@ -49,5 +49,5 @@ public interface IVersionedAsymmetricSerializer<In, Out> * @param version protocol version * @return serialized size of object t */ - public long serializedSize(In t, int version); + long serializedSize(In t, int version); } diff --git a/src/java/org/apache/cassandra/repair/messages/SyncResponse.java b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java index 0c528a3796..e7b5446bad 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncResponse.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java @@ -24,7 +24,7 @@ import java.util.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.dht.IPartitionerDependentSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; @@ -79,7 +79,7 @@ public class SyncResponse extends RepairMessage return Objects.hash(desc, success, nodes, summaries); } - public static final IVersionedSerializer<SyncResponse> serializer = new IVersionedSerializer<SyncResponse>() + public static final IPartitionerDependentSerializer<SyncResponse> serializer = new IPartitionerDependentSerializer<SyncResponse>() { public void serialize(SyncResponse message, DataOutputPlus out, int version) throws IOException { @@ -94,7 +94,8 @@ public class SyncResponse extends RepairMessage } } - public SyncResponse deserialize(DataInputPlus in, int version) throws IOException + @Override + public SyncResponse deserialize(DataInputPlus in, IPartitioner partitioner, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version); @@ -104,7 +105,7 @@ public class SyncResponse extends RepairMessage List<SessionSummary> summaries = new ArrayList<>(numSummaries); for (int i=0; i<numSummaries; i++) { - summaries.add(SessionSummary.serializer.deserialize(in, IPartitioner.global(), version)); + summaries.add(SessionSummary.serializer.deserialize(in, partitioner, version)); } return new SyncResponse(desc, nodes, success, summaries); diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java index f5bcfa31be..8bb1a1eb81 100644 --- a/src/java/org/apache/cassandra/streaming/SessionSummary.java +++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java @@ -108,14 +108,14 @@ public class SessionSummary List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd); for (int i=0; i<numRcvd; i++) { - receivingSummaries.add(StreamSummary.serializer.deserialize(in, partitioner, version)); + receivingSummaries.add(StreamSummary.serializer.deserialize(in, version)); } int numSent = in.readInt(); List<StreamSummary> sendingSummaries = new ArrayList<>(numRcvd); for (int i=0; i<numSent; i++) { - sendingSummaries.add(StreamSummary.serializer.deserialize(in, partitioner, version)); + sendingSummaries.add(StreamSummary.serializer.deserialize(in, version)); } return new SessionSummary(coordinator, peer, receivingSummaries, sendingSummaries); diff --git a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java index 139416a5f3..2d05322543 100644 --- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.db.guardrails.GuardrailViolatedException; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.locator.InetAddressAndPort; @@ -58,7 +57,7 @@ public class StreamDeserializingTask implements Runnable try { StreamMessage message; - while (null != (message = StreamMessage.deserialize(input, IPartitioner.global(), messagingVersion))) + while (null != (message = StreamMessage.deserialize(input, messagingVersion))) { // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after // wrt session lifecycle, due to races), just log that we received the message and carry on diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java index 34aa56c66d..b6b3545bf2 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSummary.java +++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java @@ -23,16 +23,17 @@ import java.util.List; import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; - import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.IPartitionerDependentSerializer; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.CollectionSerializers; /** @@ -40,7 +41,7 @@ import org.apache.cassandra.utils.CollectionSerializers; */ public class StreamSummary implements Serializable { - public static final IPartitionerDependentSerializer<StreamSummary> serializer = new StreamSummarySerializer(); + public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer(); public final TableId tableId; public final List<Range<Token>> ranges; @@ -86,25 +87,34 @@ public class StreamSummary implements Serializable return sb.toString(); } - public static class StreamSummarySerializer implements IPartitionerDependentSerializer<StreamSummary> + public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary> { public void serialize(StreamSummary summary, DataOutputPlus out, int version) throws IOException { summary.tableId.serialize(out); out.writeInt(summary.files); out.writeLong(summary.totalSize); + Token.logPartitioner = true; if (version >= MessagingService.VERSION_51) CollectionSerializers.serializeCollection(summary.ranges, out, version, Range.rangeSerializer); + Token.logPartitioner = false; } - public StreamSummary deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException + public StreamSummary deserialize(DataInputPlus in, int version) throws IOException { TableId tableId = TableId.deserialize(in); + int files = in.readInt(); long totalSize = in.readLong(); List<Range<Token>> ranges = ImmutableList.of(); if (version >= MessagingService.VERSION_51) + { + TableMetadata tableMetadata = Schema.instance.getTableMetadata(tableId); + IPartitioner p = tableMetadata != null ? tableMetadata.partitioner : IPartitioner.global(); + Token.logPartitioner = true; ranges = CollectionSerializers.deserializeList(in, p, version, Range.rangeSerializer); + Token.logPartitioner = false; + } return new StreamSummary(tableId, ranges, files, totalSize); } diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java index 86620c3859..afb1c6c7b4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.messages; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingDataOutputPlus; @@ -26,7 +25,7 @@ public class CompleteMessage extends StreamMessage { public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>() { - public CompleteMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) + public CompleteMessage deserialize(DataInputPlus in, int version) { return new CompleteMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java index 4ee726ee83..e48d115e35 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Objects; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.IncomingStream; import org.apache.cassandra.streaming.StreamManager; @@ -34,7 +33,7 @@ public class IncomingStreamMessage extends StreamMessage { public static Serializer<IncomingStreamMessage> serializer = new Serializer<IncomingStreamMessage>() { - public IncomingStreamMessage deserialize(DataInputPlus input, IPartitioner partitioner, int version) throws IOException + public IncomingStreamMessage deserialize(DataInputPlus input, int version) throws IOException { StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version); StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, header.sendByFollower); diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java index 928783f401..42be1e99a1 100644 --- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.messages; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingDataOutputPlus; @@ -38,7 +37,7 @@ public class KeepAliveMessage extends StreamMessage public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>() { - public KeepAliveMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) + public KeepAliveMessage deserialize(DataInputPlus in, int version) { return new KeepAliveMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java index dcd3b755e8..b83d7863fc 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java @@ -21,7 +21,6 @@ import java.io.IOException; import com.google.common.annotations.VisibleForTesting; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.OutgoingStream; @@ -33,7 +32,7 @@ public class OutgoingStreamMessage extends StreamMessage { public static Serializer<OutgoingStreamMessage> serializer = new Serializer<OutgoingStreamMessage>() { - public OutgoingStreamMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) + public OutgoingStreamMessage deserialize(DataInputPlus in, int version) { throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing stream"); } diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java index 72d61d29cb..f93b5afe30 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java @@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingDataOutputPlus; @@ -34,7 +33,7 @@ public class PrepareAckMessage extends StreamMessage //nop } - public PrepareAckMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) throws IOException + public PrepareAckMessage deserialize(DataInputPlus in, int version) throws IOException { return new PrepareAckMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java index e29e651824..e052f4c301 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamSummary; @@ -39,12 +38,12 @@ public class PrepareSynAckMessage extends StreamMessage StreamSummary.serializer.serialize(summary, out, version); } - public PrepareSynAckMessage deserialize(DataInputPlus input, IPartitioner partitioner, int version) throws IOException + public PrepareSynAckMessage deserialize(DataInputPlus input, int version) throws IOException { PrepareSynAckMessage message = new PrepareSynAckMessage(); int numSummaries = input.readInt(); for (int i = 0; i < numSummaries; i++) - message.summaries.add(StreamSummary.serializer.deserialize(input, partitioner, version)); + message.summaries.add(StreamSummary.serializer.deserialize(input, version)); return message; } diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java index e901365e5e..c856f46983 100644 --- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamRequest; import org.apache.cassandra.streaming.StreamSession; @@ -32,7 +31,7 @@ public class PrepareSynMessage extends StreamMessage { public static Serializer<PrepareSynMessage> serializer = new Serializer<PrepareSynMessage>() { - public PrepareSynMessage deserialize(DataInputPlus input, IPartitioner partitioner, int version) throws IOException + public PrepareSynMessage deserialize(DataInputPlus input, int version) throws IOException { PrepareSynMessage message = new PrepareSynMessage(); // requests @@ -42,7 +41,7 @@ public class PrepareSynMessage extends StreamMessage // summaries int numSummaries = input.readInt(); for (int i = 0; i < numSummaries; i++) - message.summaries.add(StreamSummary.serializer.deserialize(input, partitioner, version)); + message.summaries.add(StreamSummary.serializer.deserialize(input, version)); return message; } diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java index c6b7a0f638..6755959614 100644 --- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java @@ -19,7 +19,6 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.StreamSession; @@ -29,7 +28,7 @@ public class ReceivedMessage extends StreamMessage { public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>() { - public ReceivedMessage deserialize(DataInputPlus input, IPartitioner partitioner, int version) throws IOException + public ReceivedMessage deserialize(DataInputPlus input, int version) throws IOException { return new ReceivedMessage(TableId.deserialize(input), input.readInt()); } diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java index f05be58aa6..7fa82d8f67 100644 --- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.streaming.messages; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingDataOutputPlus; @@ -26,7 +25,7 @@ public class SessionFailedMessage extends StreamMessage { public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>() { - public SessionFailedMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) + public SessionFailedMessage deserialize(DataInputPlus in, int version) { return new SessionFailedMessage(); } diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 2fd65d7dff..e78442334b 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages; import java.io.IOException; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.PreviewKind; @@ -94,7 +93,7 @@ public class StreamInitMessage extends StreamMessage out.writeInt(message.previewKind.getSerializationVal()); } - public StreamInitMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) throws IOException + public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException { InetAddressAndPort from = inetAddressAndPortSerializer.deserialize(in, version); int sessionIndex = in.readInt(); diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 6e5dc08f88..186ac3274a 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamingChannel; @@ -45,16 +44,16 @@ public abstract class StreamMessage return 1 + message.type.outSerializer.serializedSize(message, version); } - public static StreamMessage deserialize(DataInputPlus in, IPartitioner partitioner, int version) throws IOException + public static StreamMessage deserialize(DataInputPlus in, int version) throws IOException { Type type = Type.lookupById(in.readByte()); - return type.inSerializer.deserialize(in, partitioner, version); + return type.inSerializer.deserialize(in, version); } /** StreamMessage serializer */ public static interface Serializer<V extends StreamMessage> { - V deserialize(DataInputPlus in, IPartitioner partitioner, int version) throws IOException; + V deserialize(DataInputPlus in, int version) throws IOException; void serialize(V message, StreamingDataOutputPlus out, int version, StreamSession session) throws IOException; long serializedSize(V message, int version) throws IOException; } diff --git a/test/data/serialization/5.1/service.SyncComplete.bin b/test/data/serialization/5.1/service.SyncComplete.bin index aede3aae15..6f8c8ee777 100644 Binary files a/test/data/serialization/5.1/service.SyncComplete.bin and b/test/data/serialization/5.1/service.SyncComplete.bin differ diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java index 074a64913f..70d1acfddd 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.ClusterUtils; @@ -63,6 +64,7 @@ public class RepairMetadataKeyspaceTest extends TestBaseImpl IInvokableInstance toRepair = cluster.get(3); stopUnchecked(toRepair); + DatabaseDescriptor.clientInitialization(); String targetDir = DistributedMetadataLogKeyspace.TABLE_NAME + '-' + DistributedMetadataLogKeyspace.LOG_TABLE_ID.toHexString(); for (File datadir : getDataDirectories(toRepair)) { diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java index 0a9333cfbe..15025b7a0b 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.UUID; import com.google.common.collect.Lists; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.*; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -32,10 +34,7 @@ import org.junit.Test; import org.apache.cassandra.CassandraTestBase; import org.apache.cassandra.CassandraTestBase.DDDaemonInitialization; import org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; @@ -112,7 +111,19 @@ public class RepairMessageSerializationsTest extends CassandraTestBase buf.flip(); DataInputPlus in = new DataInputBuffer(buf, false); - T deserialized = serializer.deserialize(in, PROTOCOL_VERSION); + + T deserialized = null; + + if (serializer instanceof IPartitionerDependentSerializer) + { + IPartitionerDependentSerializer<T> pds = (IPartitionerDependentSerializer<T>) serializer; + deserialized = pds.deserialize(in, DatabaseDescriptor.getPartitioner(), PROTOCOL_VERSION); + } + else + { + deserialized = serializer.deserialize(in, PROTOCOL_VERSION); + } + Assert.assertEquals(msg, deserialized); Assert.assertEquals(msg.hashCode(), deserialized.hashCode()); return deserialized; diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index 5ed6d2c409..f2dc88c5c3 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.UUID; import com.google.common.collect.Lists; import org.junit.AfterClass; @@ -60,6 +59,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.streaming.StreamSummary; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; @@ -70,6 +70,7 @@ import static java.util.Collections.emptyList; public class SerializationsTest extends AbstractSerializationsTester { private static PartitionerSwitcher partitionerSwitcher; + private static TableId TABLE_ID; private static TimeUUID RANDOM_UUID; private static Range<Token> FULL_RANGE; private static RepairJobDesc DESC; @@ -84,6 +85,7 @@ public class SerializationsTest extends AbstractSerializationsTester ClusterMetadataTestHelper.setInstanceForTest(); SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("Keyspace1", KeyspaceParams.simple(3))); SchemaTestUtil.announceNewTable(TableMetadata.minimal("Keyspace1", "Standard1")); + TABLE_ID = ClusterMetadata.current().schema.getKeyspaceMetadata("Keyspace1").getTableOrViewNullable("Standard1").id(); RANDOM_UUID = TimeUUID.fromString("743325d0-4c4b-11ec-8a88-2d67081686db"); FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), Util.testPartitioner().getMinimumToken()); DESC = new RepairJobDesc(RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", Arrays.asList(FULL_RANGE)); @@ -222,8 +224,8 @@ public class SerializationsTest extends AbstractSerializationsTester // sync success List<SessionSummary> summaries = new ArrayList<>(); summaries.add(new SessionSummary(src, dest, - Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUID.randomUUID()), emptyList(), 5, 100)), - Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUID.randomUUID()), emptyList(), 500, 10)) + Lists.newArrayList(new StreamSummary(TABLE_ID, emptyList(), 5, 100)), + Lists.newArrayList(new StreamSummary(TABLE_ID, emptyList(), 500, 10)) )); SyncResponse success = new SyncResponse(DESC, src, dest, true, summaries); // sync fail diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java index 069d0fb58d..904272f7b8 100644 --- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import io.netty.buffer.ByteBuf; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -56,7 +55,6 @@ import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; public class StreamingInboundHandlerTest { - private NettyStreamingChannel streamingChannel; private EmbeddedChannel channel; private ByteBuf buf; @@ -125,7 +123,7 @@ public class StreamingInboundHandlerTest temp.flip(); DataInputPlus in = new DataInputBuffer(temp, false); // session not found - IncomingStreamMessage.serializer.deserialize(in, IPartitioner.global(), MessagingService.current_version); + IncomingStreamMessage.serializer.deserialize(in, MessagingService.current_version); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org