This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit be1015dcc229f20bcd6b8ba5d2f52700d8793f72
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Thu May 9 14:26:29 2024 -0500

    make sure StreamSummarySerializer uses the appropriate IPartitioner during 
deserialization
---
 .../dht/IPartitionerDependentSerializer.java       |  25 +++++----------------
 src/java/org/apache/cassandra/dht/Token.java       |  16 +++++++++++++
 .../io/IVersionedAsymmetricSerializer.java         |   6 ++---
 .../cassandra/repair/messages/SyncResponse.java    |   9 ++++----
 .../apache/cassandra/streaming/SessionSummary.java |   4 ++--
 .../streaming/StreamDeserializingTask.java         |   3 +--
 .../apache/cassandra/streaming/StreamSummary.java  |  20 ++++++++++++-----
 .../streaming/messages/CompleteMessage.java        |   3 +--
 .../streaming/messages/IncomingStreamMessage.java  |   3 +--
 .../streaming/messages/KeepAliveMessage.java       |   3 +--
 .../streaming/messages/OutgoingStreamMessage.java  |   3 +--
 .../streaming/messages/PrepareAckMessage.java      |   3 +--
 .../streaming/messages/PrepareSynAckMessage.java   |   5 ++---
 .../streaming/messages/PrepareSynMessage.java      |   5 ++---
 .../streaming/messages/ReceivedMessage.java        |   3 +--
 .../streaming/messages/SessionFailedMessage.java   |   3 +--
 .../streaming/messages/StreamInitMessage.java      |   3 +--
 .../streaming/messages/StreamMessage.java          |   7 +++---
 .../serialization/5.1/service.SyncComplete.bin     | Bin 346 -> 346 bytes
 .../test/tcm/RepairMetadataKeyspaceTest.java       |   2 ++
 .../messages/RepairMessageSerializationsTest.java  |  19 ++++++++++++----
 .../cassandra/service/SerializationsTest.java      |   8 ++++---
 .../async/StreamingInboundHandlerTest.java         |   4 +---
 23 files changed, 86 insertions(+), 71 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java 
b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
index 5c75788c0b..a70eb83771 100644
--- a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
+++ b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.dht;
 
 import java.io.IOException;
 
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
  * Versioned serializer where the serialization depends on partitioner.
@@ -28,18 +28,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * On serialization the partitioner is given by the entity being serialized. 
To deserialize the partitioner used must
  * be known to the calling method.
  */
-public interface IPartitionerDependentSerializer<T>
+public interface IPartitionerDependentSerializer<T> extends 
IVersionedSerializer<T>
 {
-    /**
-     * Serialize the specified type into the specified DataOutputStream 
instance.
-     *
-     * @param t type that needs to be serialized
-     * @param out DataOutput into which serialization needs to happen.
-     * @param version protocol version
-     * @throws java.io.IOException if serialization fails
-     */
-    public void serialize(T t, DataOutputPlus out, int version) throws 
IOException;
-
     /**
      * Deserialize into the specified DataInputStream instance.
      * @param in DataInput from which deserialization needs to happen.
@@ -51,11 +41,8 @@ public interface IPartitionerDependentSerializer<T>
      */
     public T deserialize(DataInputPlus in, IPartitioner p, int version) throws 
IOException;
 
-    /**
-     * Calculate serialized size of object without actually serializing.
-     * @param t object to calculate serialized size
-     * @param version protocol version
-     * @return serialized size of object t
-     */
-    public long serializedSize(T t, int version);
+    default T deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return deserialize(in, null, version);
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/Token.java 
b/src/java/org/apache/cassandra/dht/Token.java
index fa886410c6..41c617fbc0 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -20,6 +20,12 @@ package org.apache.cassandra.dht;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.TypeSizes;
@@ -35,6 +41,8 @@ import 
org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
 
 public abstract class Token implements RingPosition<Token>, Serializable
 {
+    private static final Logger logger = LoggerFactory.getLogger(Token.class);
+
     private static final long serialVersionUID = 1L;
 
     public static final TokenSerializer serializer = new TokenSerializer();
@@ -178,11 +186,17 @@ public abstract class Token implements 
RingPosition<Token>, Serializable
         }
     }
 
+    public static volatile boolean logPartitioner = false;
+    public static final Set<Class<? extends IPartitioner>> 
serializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
+    public static final Set<Class<? extends IPartitioner>> 
deserializePartitioners = Sets.newSetFromMap(new ConcurrentHashMap<>());
+
     public static class CompactTokenSerializer implements 
IPartitionerDependentSerializer<Token>
     {
         public void serialize(Token token, DataOutputPlus out, int version) 
throws IOException
         {
             IPartitioner p = token.getPartitioner();
+            if (logPartitioner && serializePartitioners.add(p.getClass()))
+              logger.debug("Serializing token with partitioner " + p);
             if (!p.isFixedLength())
                 out.writeUnsignedVInt32(p.getTokenFactory().byteSize(token));
             p.getTokenFactory().serialize(token, out);
@@ -191,6 +205,8 @@ public abstract class Token implements RingPosition<Token>, 
Serializable
         public Token deserialize(DataInputPlus in, IPartitioner p, int 
version) throws IOException
         {
             int size = p.isFixedLength() ? p.getMaxTokenSize() : 
in.readUnsignedVInt32();
+            if (logPartitioner && deserializePartitioners.add(p.getClass()))
+                logger.debug("Deserializing token with partitioner " + p);
             byte[] bytes = new byte[size];
             in.readFully(bytes);
             return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
diff --git 
a/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java 
b/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java
index 8ad2c285c3..ff89110e33 100644
--- a/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java
+++ b/src/java/org/apache/cassandra/io/IVersionedAsymmetricSerializer.java
@@ -32,7 +32,7 @@ public interface IVersionedAsymmetricSerializer<In, Out>
      * @param version protocol version
      * @throws IOException if serialization fails
      */
-    public void serialize(In t, DataOutputPlus out, int version) throws 
IOException;
+    void serialize(In t, DataOutputPlus out, int version) throws IOException;
 
     /**
      * Deserialize into the specified DataInputStream instance.
@@ -41,7 +41,7 @@ public interface IVersionedAsymmetricSerializer<In, Out>
      * @return the type that was deserialized
      * @throws IOException if deserialization fails
      */
-    public Out deserialize(DataInputPlus in, int version) throws IOException;
+    Out deserialize(DataInputPlus in, int version) throws IOException;
 
     /**
      * Calculate serialized size of object without actually serializing.
@@ -49,5 +49,5 @@ public interface IVersionedAsymmetricSerializer<In, Out>
      * @param version protocol version
      * @return serialized size of object t
      */
-    public long serializedSize(In t, int version);
+    long serializedSize(In t, int version);
 }
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncResponse.java 
b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
index 0c528a3796..e7b5446bad 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncResponse.java
@@ -24,7 +24,7 @@ import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.dht.IPartitionerDependentSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -79,7 +79,7 @@ public class SyncResponse extends RepairMessage
         return Objects.hash(desc, success, nodes, summaries);
     }
 
-    public static final IVersionedSerializer<SyncResponse> serializer = new 
IVersionedSerializer<SyncResponse>()
+    public static final IPartitionerDependentSerializer<SyncResponse> 
serializer = new IPartitionerDependentSerializer<SyncResponse>()
     {
         public void serialize(SyncResponse message, DataOutputPlus out, int 
version) throws IOException
         {
@@ -94,7 +94,8 @@ public class SyncResponse extends RepairMessage
             }
         }
 
-        public SyncResponse deserialize(DataInputPlus in, int version) throws 
IOException
+        @Override
+        public SyncResponse deserialize(DataInputPlus in, IPartitioner 
partitioner, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, 
version);
             SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, 
version);
@@ -104,7 +105,7 @@ public class SyncResponse extends RepairMessage
             List<SessionSummary> summaries = new ArrayList<>(numSummaries);
             for (int i=0; i<numSummaries; i++)
             {
-                summaries.add(SessionSummary.serializer.deserialize(in, 
IPartitioner.global(), version));
+                summaries.add(SessionSummary.serializer.deserialize(in, 
partitioner, version));
             }
 
             return new SyncResponse(desc, nodes, success, summaries);
diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java 
b/src/java/org/apache/cassandra/streaming/SessionSummary.java
index f5bcfa31be..8bb1a1eb81 100644
--- a/src/java/org/apache/cassandra/streaming/SessionSummary.java
+++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java
@@ -108,14 +108,14 @@ public class SessionSummary
             List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd);
             for (int i=0; i<numRcvd; i++)
             {
-                
receivingSummaries.add(StreamSummary.serializer.deserialize(in, partitioner, 
version));
+                
receivingSummaries.add(StreamSummary.serializer.deserialize(in, version));
             }
 
             int numSent = in.readInt();
             List<StreamSummary> sendingSummaries = new ArrayList<>(numRcvd);
             for (int i=0; i<numSent; i++)
             {
-                sendingSummaries.add(StreamSummary.serializer.deserialize(in, 
partitioner, version));
+                sendingSummaries.add(StreamSummary.serializer.deserialize(in, 
version));
             }
 
             return new SessionSummary(coordinator, peer, receivingSummaries, 
sendingSummaries);
diff --git 
a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java 
b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index 139416a5f3..2d05322543 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.db.guardrails.GuardrailViolatedException;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -58,7 +57,7 @@ public class StreamDeserializingTask implements Runnable
         try
         {
             StreamMessage message;
-            while (null != (message = StreamMessage.deserialize(input, 
IPartitioner.global(), messagingVersion)))
+            while (null != (message = StreamMessage.deserialize(input, 
messagingVersion)))
             {
                 // keep-alives don't necessarily need to be tied to a session 
(they could be arrive before or after
                 // wrt session lifecycle, due to races), just log that we 
received the message and carry on
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java 
b/src/java/org/apache/cassandra/streaming/StreamSummary.java
index 34aa56c66d..b6b3545bf2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSummary.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -23,16 +23,17 @@ import java.util.List;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.IPartitionerDependentSerializer;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.CollectionSerializers;
 
 /**
@@ -40,7 +41,7 @@ import org.apache.cassandra.utils.CollectionSerializers;
  */
 public class StreamSummary implements Serializable
 {
-    public static final IPartitionerDependentSerializer<StreamSummary> 
serializer = new StreamSummarySerializer();
+    public static final IVersionedSerializer<StreamSummary> serializer = new 
StreamSummarySerializer();
 
     public final TableId tableId;
     public final List<Range<Token>> ranges;
@@ -86,25 +87,34 @@ public class StreamSummary implements Serializable
         return sb.toString();
     }
 
-    public static class StreamSummarySerializer implements 
IPartitionerDependentSerializer<StreamSummary>
+    public static class StreamSummarySerializer implements 
IVersionedSerializer<StreamSummary>
     {
         public void serialize(StreamSummary summary, DataOutputPlus out, int 
version) throws IOException
         {
             summary.tableId.serialize(out);
             out.writeInt(summary.files);
             out.writeLong(summary.totalSize);
+            Token.logPartitioner = true;
             if (version >= MessagingService.VERSION_51)
                 CollectionSerializers.serializeCollection(summary.ranges, out, 
version, Range.rangeSerializer);
+            Token.logPartitioner = false;
         }
 
-        public StreamSummary deserialize(DataInputPlus in, IPartitioner p, int 
version) throws IOException
+        public StreamSummary deserialize(DataInputPlus in, int version) throws 
IOException
         {
             TableId tableId = TableId.deserialize(in);
+
             int files = in.readInt();
             long totalSize = in.readLong();
             List<Range<Token>> ranges = ImmutableList.of();
             if (version >= MessagingService.VERSION_51)
+            {
+                TableMetadata tableMetadata = 
Schema.instance.getTableMetadata(tableId);
+                IPartitioner p = tableMetadata != null ? 
tableMetadata.partitioner : IPartitioner.global();
+                Token.logPartitioner = true;
                 ranges = CollectionSerializers.deserializeList(in, p, version, 
Range.rangeSerializer);
+                Token.logPartitioner = false;
+            }
             return new StreamSummary(tableId, ranges, files, totalSize);
         }
 
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index 86620c3859..afb1c6c7b4 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
@@ -26,7 +25,7 @@ public class CompleteMessage extends StreamMessage
 {
     public static Serializer<CompleteMessage> serializer = new 
Serializer<CompleteMessage>()
     {
-        public CompleteMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version)
+        public CompleteMessage deserialize(DataInputPlus in, int version)
         {
             return new CompleteMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index 4ee726ee83..e48d115e35 100644
--- 
a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ 
b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Objects;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.IncomingStream;
 import org.apache.cassandra.streaming.StreamManager;
@@ -34,7 +33,7 @@ public class IncomingStreamMessage extends StreamMessage
 {
     public static Serializer<IncomingStreamMessage> serializer = new 
Serializer<IncomingStreamMessage>()
     {
-        public IncomingStreamMessage deserialize(DataInputPlus input, 
IPartitioner partitioner, int version) throws IOException
+        public IncomingStreamMessage deserialize(DataInputPlus input, int 
version) throws IOException
         {
             StreamMessageHeader header = 
StreamMessageHeader.serializer.deserialize(input, version);
             StreamSession session = 
StreamManager.instance.findSession(header.sender, header.planId, 
header.sessionIndex, header.sendByFollower);
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index 928783f401..42be1e99a1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
@@ -38,7 +37,7 @@ public class KeepAliveMessage extends StreamMessage
 
     public static Serializer<KeepAliveMessage> serializer = new 
Serializer<KeepAliveMessage>()
     {
-        public KeepAliveMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version)
+        public KeepAliveMessage deserialize(DataInputPlus in, int version)
         {
             return new KeepAliveMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
index dcd3b755e8..b83d7863fc 100644
--- 
a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
+++ 
b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.OutgoingStream;
@@ -33,7 +32,7 @@ public class OutgoingStreamMessage extends StreamMessage
 {
     public static Serializer<OutgoingStreamMessage> serializer = new 
Serializer<OutgoingStreamMessage>()
     {
-        public OutgoingStreamMessage deserialize(DataInputPlus in, 
IPartitioner partitioner, int version)
+        public OutgoingStreamMessage deserialize(DataInputPlus in, int version)
         {
             throw new UnsupportedOperationException("Not allowed to call 
deserialize on an outgoing stream");
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
index 72d61d29cb..f93b5afe30 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
@@ -34,7 +33,7 @@ public class PrepareAckMessage extends StreamMessage
             //nop
         }
 
-        public PrepareAckMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version) throws IOException
+        public PrepareAckMessage deserialize(DataInputPlus in, int version) 
throws IOException
         {
             return new PrepareAckMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
index e29e651824..e052f4c301 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamSummary;
@@ -39,12 +38,12 @@ public class PrepareSynAckMessage extends StreamMessage
                 StreamSummary.serializer.serialize(summary, out, version);
         }
 
-        public PrepareSynAckMessage deserialize(DataInputPlus input, 
IPartitioner partitioner, int version) throws IOException
+        public PrepareSynAckMessage deserialize(DataInputPlus input, int 
version) throws IOException
         {
             PrepareSynAckMessage message = new PrepareSynAckMessage();
             int numSummaries = input.readInt();
             for (int i = 0; i < numSummaries; i++)
-                
message.summaries.add(StreamSummary.serializer.deserialize(input, partitioner, 
version));
+                
message.summaries.add(StreamSummary.serializer.deserialize(input, version));
             return message;
         }
 
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
index e901365e5e..c856f46983 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamRequest;
 import org.apache.cassandra.streaming.StreamSession;
@@ -32,7 +31,7 @@ public class PrepareSynMessage extends StreamMessage
 {
     public static Serializer<PrepareSynMessage> serializer = new 
Serializer<PrepareSynMessage>()
     {
-        public PrepareSynMessage deserialize(DataInputPlus input, IPartitioner 
partitioner, int version) throws IOException
+        public PrepareSynMessage deserialize(DataInputPlus input, int version) 
throws IOException
         {
             PrepareSynMessage message = new PrepareSynMessage();
             // requests
@@ -42,7 +41,7 @@ public class PrepareSynMessage extends StreamMessage
             // summaries
             int numSummaries = input.readInt();
             for (int i = 0; i < numSummaries; i++)
-                
message.summaries.add(StreamSummary.serializer.deserialize(input, partitioner, 
version));
+                
message.summaries.add(StreamSummary.serializer.deserialize(input, version));
             return message;
         }
 
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index c6b7a0f638..6755959614 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.StreamSession;
@@ -29,7 +28,7 @@ public class ReceivedMessage extends StreamMessage
 {
     public static Serializer<ReceivedMessage> serializer = new 
Serializer<ReceivedMessage>()
     {
-        public ReceivedMessage deserialize(DataInputPlus input, IPartitioner 
partitioner, int version) throws IOException
+        public ReceivedMessage deserialize(DataInputPlus input, int version) 
throws IOException
         {
             return new ReceivedMessage(TableId.deserialize(input), 
input.readInt());
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index f05be58aa6..7fa82d8f67 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingDataOutputPlus;
@@ -26,7 +25,7 @@ public class SessionFailedMessage extends StreamMessage
 {
     public static Serializer<SessionFailedMessage> serializer = new 
Serializer<SessionFailedMessage>()
     {
-        public SessionFailedMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version)
+        public SessionFailedMessage deserialize(DataInputPlus in, int version)
         {
             return new SessionFailedMessage();
         }
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 2fd65d7dff..e78442334b 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -94,7 +93,7 @@ public class StreamInitMessage extends StreamMessage
             out.writeInt(message.previewKind.getSerializationVal());
         }
 
-        public StreamInitMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version) throws IOException
+        public StreamInitMessage deserialize(DataInputPlus in, int version) 
throws IOException
         {
             InetAddressAndPort from = 
inetAddressAndPortSerializer.deserialize(in, version);
             int sessionIndex = in.readInt();
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 6e5dc08f88..186ac3274a 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingChannel;
@@ -45,16 +44,16 @@ public abstract class StreamMessage
         return 1 + message.type.outSerializer.serializedSize(message, version);
     }
 
-    public static StreamMessage deserialize(DataInputPlus in, IPartitioner 
partitioner, int version) throws IOException
+    public static StreamMessage deserialize(DataInputPlus in, int version) 
throws IOException
     {
         Type type = Type.lookupById(in.readByte());
-        return type.inSerializer.deserialize(in, partitioner, version);
+        return type.inSerializer.deserialize(in, version);
     }
 
     /** StreamMessage serializer */
     public static interface Serializer<V extends StreamMessage>
     {
-        V deserialize(DataInputPlus in, IPartitioner partitioner, int version) 
throws IOException;
+        V deserialize(DataInputPlus in, int version) throws IOException;
         void serialize(V message, StreamingDataOutputPlus out, int version, 
StreamSession session) throws IOException;
         long serializedSize(V message, int version) throws IOException;
     }
diff --git a/test/data/serialization/5.1/service.SyncComplete.bin 
b/test/data/serialization/5.1/service.SyncComplete.bin
index aede3aae15..6f8c8ee777 100644
Binary files a/test/data/serialization/5.1/service.SyncComplete.bin and 
b/test/data/serialization/5.1/service.SyncComplete.bin differ
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java
index 074a64913f..70d1acfddd 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/RepairMetadataKeyspaceTest.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
@@ -63,6 +64,7 @@ public class RepairMetadataKeyspaceTest extends TestBaseImpl
 
             IInvokableInstance toRepair = cluster.get(3);
             stopUnchecked(toRepair);
+            DatabaseDescriptor.clientInitialization();
             String targetDir = DistributedMetadataLogKeyspace.TABLE_NAME + '-' 
+ DistributedMetadataLogKeyspace.LOG_TABLE_ID.toHexString();
             for (File datadir : getDataDirectories(toRepair))
             {
diff --git 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
index 0a9333cfbe..15025b7a0b 100644
--- 
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.*;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -32,10 +34,7 @@ import org.junit.Test;
 import org.apache.cassandra.CassandraTestBase;
 import org.apache.cassandra.CassandraTestBase.DDDaemonInitialization;
 import org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -112,7 +111,19 @@ public class RepairMessageSerializationsTest extends 
CassandraTestBase
 
         buf.flip();
         DataInputPlus in = new DataInputBuffer(buf, false);
-        T deserialized = serializer.deserialize(in, PROTOCOL_VERSION);
+
+        T deserialized = null;
+        
+        if (serializer instanceof IPartitionerDependentSerializer)
+        {
+            IPartitionerDependentSerializer<T> pds = 
(IPartitionerDependentSerializer<T>) serializer;
+            deserialized = pds.deserialize(in, 
DatabaseDescriptor.getPartitioner(), PROTOCOL_VERSION);
+        }
+        else
+        {
+            deserialized = serializer.deserialize(in, PROTOCOL_VERSION);
+        }
+
         Assert.assertEquals(msg, deserialized);
         Assert.assertEquals(msg.hashCode(), deserialized.hashCode());
         return deserialized;
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java 
b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 5ed6d2c409..f2dc88c5c3 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 
 import com.google.common.collect.Lists;
 import org.junit.AfterClass;
@@ -60,6 +59,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -70,6 +70,7 @@ import static java.util.Collections.emptyList;
 public class SerializationsTest extends AbstractSerializationsTester
 {
     private static PartitionerSwitcher partitionerSwitcher;
+    private static TableId TABLE_ID;
     private static TimeUUID RANDOM_UUID;
     private static Range<Token> FULL_RANGE;
     private static RepairJobDesc DESC;
@@ -84,6 +85,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
         ClusterMetadataTestHelper.setInstanceForTest();
         
SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.simple(3)));
         SchemaTestUtil.announceNewTable(TableMetadata.minimal("Keyspace1", 
"Standard1"));
+        TABLE_ID = 
ClusterMetadata.current().schema.getKeyspaceMetadata("Keyspace1").getTableOrViewNullable("Standard1").id();
         RANDOM_UUID = 
TimeUUID.fromString("743325d0-4c4b-11ec-8a88-2d67081686db");
         FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), 
Util.testPartitioner().getMinimumToken());
         DESC = new RepairJobDesc(RANDOM_UUID, RANDOM_UUID, "Keyspace1", 
"Standard1", Arrays.asList(FULL_RANGE));
@@ -222,8 +224,8 @@ public class SerializationsTest extends 
AbstractSerializationsTester
         // sync success
         List<SessionSummary> summaries = new ArrayList<>();
         summaries.add(new SessionSummary(src, dest,
-                                         Lists.newArrayList(new 
StreamSummary(TableId.fromUUID(UUID.randomUUID()), emptyList(), 5, 100)),
-                                         Lists.newArrayList(new 
StreamSummary(TableId.fromUUID(UUID.randomUUID()), emptyList(), 500, 10))
+                                         Lists.newArrayList(new 
StreamSummary(TABLE_ID, emptyList(), 5, 100)),
+                                         Lists.newArrayList(new 
StreamSummary(TABLE_ID, emptyList(), 500, 10))
         ));
         SyncResponse success = new SyncResponse(DESC, src, dest, true, 
summaries);
         // sync fail
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 069d0fb58d..904272f7b8 100644
--- 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -56,7 +55,6 @@ import static 
org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 
 public class StreamingInboundHandlerTest
 {
-
     private NettyStreamingChannel streamingChannel;
     private EmbeddedChannel channel;
     private ByteBuf buf;
@@ -125,7 +123,7 @@ public class StreamingInboundHandlerTest
         temp.flip();
         DataInputPlus in = new DataInputBuffer(temp, false);
         // session not found
-        IncomingStreamMessage.serializer.deserialize(in, 
IPartitioner.global(), MessagingService.current_version);
+        IncomingStreamMessage.serializer.deserialize(in, 
MessagingService.current_version);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to