This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 838bd8c  Correctly serialize all request responses - CASSANDRA-15946 
followup
838bd8c is described below

commit 838bd8cae6d125cfb055dbf6bfc5a2d4e13cdfcb
Author: Jon Meredith <[email protected]>
AuthorDate: Wed Feb 17 10:20:57 2021 -0800

    Correctly serialize all request responses - CASSANDRA-15946 followup
    
    patch by Jon Meredith; reviewed by Alex Petrov, Yifan Cai for 
CASSANDRA-16385
---
 src/java/org/apache/cassandra/net/Message.java     | 25 ++++++---
 .../org/apache/cassandra/net/RequestCallbacks.java |  2 +-
 src/java/org/apache/cassandra/net/Verb.java        |  2 +-
 .../cassandra/distributed/impl/Instance.java       |  4 --
 .../upgrade/Pre40MessageFilterTest.java            | 64 ++++++++++++++++++++++
 5 files changed, 84 insertions(+), 13 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/Message.java 
b/src/java/org/apache/cassandra/net/Message.java
index a501730..214c5c0 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -757,7 +757,7 @@ public class Message<T>
         {
             serializeHeaderPost40(message.header, out, version);
             out.writeUnsignedVInt(message.payloadSize(version));
-            message.verb().serializer().serialize(message.payload, out, 
version);
+            message.getPayloadSerializer().serialize(message.payload, out, 
version);
         }
 
         private <T> Message<T> deserializePost40(DataInputPlus in, 
InetAddressAndPort peer, int version) throws IOException
@@ -916,7 +916,7 @@ public class Message<T>
             {
                 int payloadSize = message.payloadSize(version);
                 out.writeInt(payloadSize);
-                message.verb().serializer().serialize(message.payload, out, 
version);
+                message.getPayloadSerializer().serialize(message.payload, out, 
version);
             }
             else
             {
@@ -940,11 +940,8 @@ public class Message<T>
             if (skipHeader)
                 skipHeaderPre40(in);
 
-            IVersionedAsymmetricSerializer<?, T> payloadSerializer = 
header.verb.serializer();
-            if (null == payloadSerializer)
-                payloadSerializer = 
instance().callbacks.responseSerializer(header.id, header.from);
             int payloadSize = in.readInt();
-            T payload = deserializePayloadPre40(in, version, 
payloadSerializer, payloadSize);
+            T payload = deserializePayloadPre40(in, version, 
getPayloadSerializer(header.verb, header.id, header.from), payloadSize);
 
             Message<T> message = new Message<>(header, payload);
 
@@ -1286,12 +1283,26 @@ public class Message<T>
         private <T> int payloadSize(Message<T> message, int version)
         {
             long payloadSize = message.payload != null && message.payload != 
NoPayload.noPayload
-                             ? 
message.verb().serializer().serializedSize(message.payload, version)
+                             ? 
message.getPayloadSerializer().serializedSize(message.payload, version)
                              : 0;
             return Ints.checkedCast(payloadSize);
         }
     }
 
+    private IVersionedAsymmetricSerializer<T, ?> getPayloadSerializer()
+    {
+        return getPayloadSerializer(verb(), id(), from());
+    }
+
+    // Verb#serializer() is null for legacy response messages. Once all Verbs 
with null handlers
+    // are removed in a future major, this method can be replaced with a call 
to verb.serializer.
+    private static <In,Out> IVersionedAsymmetricSerializer<In, Out> 
getPayloadSerializer(Verb verb, long id, InetAddressAndPort from)
+    {
+        return null != verb.serializer()
+             ? verb.serializer()
+             : instance().callbacks.responseSerializer(id, from);
+    }
+
     private int serializedSize30;
     private int serializedSize3014;
     private int serializedSize40;
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java 
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 94044e1..c102ee1 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -116,7 +116,7 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to.endpoint(), previous);
     }
 
-    <T> IVersionedAsymmetricSerializer<?, T> responseSerializer(long id, 
InetAddressAndPort peer)
+    <In,Out> IVersionedAsymmetricSerializer<In, Out> responseSerializer(long 
id, InetAddressAndPort peer)
     {
         CallbackInfo info = get(id, peer);
         return info == null ? null : info.responseVerb.serializer();
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index fad2fbf..750a85e 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -327,7 +327,7 @@ public enum Verb
     }
 
     @VisibleForTesting
-    public Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer) throws NoSuchFieldException, IllegalAccessException
+    Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer) throws NoSuchFieldException, IllegalAccessException
     {
         Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> original = 
this.serializer;
         Field field = Verb.class.getDeclaredField("serializer");
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 0ed7000..0e18aa3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -59,7 +59,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspaceMigrator40;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -98,7 +97,6 @@ import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
-import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -468,8 +466,6 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 // Re-populate token metadata after commit log recover (new 
peers might be loaded onto system keyspace #10293)
                 StorageService.instance.populateTokenMetadata();
 
-                Verb.REQUEST_RSP.unsafeSetSerializer(() -> 
ReadResponse.serializer);
-
                 if (config.has(NETWORK))
                 {
                     MessagingService.instance().listen();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
new file mode 100644
index 0000000..628e8fc
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import java.util.function.Consumer;
+
+public class Pre40MessageFilterTest extends UpgradeTestBase
+{
+    public void reserializePre40RequestPaxosTest(Consumer<IInstanceConfig> 
configConsumer) throws Throwable
+    {
+        new UpgradeTestBase.TestCase()
+        .nodes(2)
+        .withConfig(configConsumer)
+        .nodesToUpgrade(1)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .setup((cluster) -> {
+            cluster.filters().outbound().allVerbs().messagesMatching((f,t,m) 
-> false).drop();
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl(pk,ck,v) VALUES (1, 1, 1) IF NOT EXISTS",
+                                           ConsistencyLevel.QUORUM,
+                                           1);
+        })
+        .runAfterNodeUpgrade((cluster, node) -> {
+            cluster.coordinator(node).execute("UPDATE " + KEYSPACE + ".tbl SET 
v = ? WHERE pk = ? AND ck = ?  IF v = ?",
+                                              ConsistencyLevel.QUORUM,
+                                              2, 1, 1, 1);
+        }).run();
+    }
+
+    @Test
+    public void reserializePre40RequestPaxosWithoutNetworkTest() throws 
Throwable
+    {
+        reserializePre40RequestPaxosTest(config -> {});
+    }
+
+    @Test
+    public void reserializePre40RequestPaxosWithNetworkTest() throws Throwable
+    {
+        reserializePre40RequestPaxosTest(config -> 
config.with(Feature.NETWORK, Feature.GOSSIP));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to