This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 838bd8c Correctly serialize all request responses - CASSANDRA-15946
followup
838bd8c is described below
commit 838bd8cae6d125cfb055dbf6bfc5a2d4e13cdfcb
Author: Jon Meredith <[email protected]>
AuthorDate: Wed Feb 17 10:20:57 2021 -0800
Correctly serialize all request responses - CASSANDRA-15946 followup
patch by Jon Meredith; reviewed by Alex Petrov, Yifan Cai for
CASSANDRA-16385
---
src/java/org/apache/cassandra/net/Message.java | 25 ++++++---
.../org/apache/cassandra/net/RequestCallbacks.java | 2 +-
src/java/org/apache/cassandra/net/Verb.java | 2 +-
.../cassandra/distributed/impl/Instance.java | 4 --
.../upgrade/Pre40MessageFilterTest.java | 64 ++++++++++++++++++++++
5 files changed, 84 insertions(+), 13 deletions(-)
diff --git a/src/java/org/apache/cassandra/net/Message.java
b/src/java/org/apache/cassandra/net/Message.java
index a501730..214c5c0 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -757,7 +757,7 @@ public class Message<T>
{
serializeHeaderPost40(message.header, out, version);
out.writeUnsignedVInt(message.payloadSize(version));
- message.verb().serializer().serialize(message.payload, out,
version);
+ message.getPayloadSerializer().serialize(message.payload, out,
version);
}
private <T> Message<T> deserializePost40(DataInputPlus in,
InetAddressAndPort peer, int version) throws IOException
@@ -916,7 +916,7 @@ public class Message<T>
{
int payloadSize = message.payloadSize(version);
out.writeInt(payloadSize);
- message.verb().serializer().serialize(message.payload, out,
version);
+ message.getPayloadSerializer().serialize(message.payload, out,
version);
}
else
{
@@ -940,11 +940,8 @@ public class Message<T>
if (skipHeader)
skipHeaderPre40(in);
- IVersionedAsymmetricSerializer<?, T> payloadSerializer =
header.verb.serializer();
- if (null == payloadSerializer)
- payloadSerializer =
instance().callbacks.responseSerializer(header.id, header.from);
int payloadSize = in.readInt();
- T payload = deserializePayloadPre40(in, version,
payloadSerializer, payloadSize);
+ T payload = deserializePayloadPre40(in, version,
getPayloadSerializer(header.verb, header.id, header.from), payloadSize);
Message<T> message = new Message<>(header, payload);
@@ -1286,12 +1283,26 @@ public class Message<T>
private <T> int payloadSize(Message<T> message, int version)
{
long payloadSize = message.payload != null && message.payload !=
NoPayload.noPayload
- ?
message.verb().serializer().serializedSize(message.payload, version)
+ ?
message.getPayloadSerializer().serializedSize(message.payload, version)
: 0;
return Ints.checkedCast(payloadSize);
}
}
+ private IVersionedAsymmetricSerializer<T, ?> getPayloadSerializer()
+ {
+ return getPayloadSerializer(verb(), id(), from());
+ }
+
+ // Verb#serializer() is null for legacy response messages. Once all Verbs
with null handlers
+ // are removed in a future major, this method can be replaced with a call
to verb.serializer.
+ private static <In,Out> IVersionedAsymmetricSerializer<In, Out>
getPayloadSerializer(Verb verb, long id, InetAddressAndPort from)
+ {
+ return null != verb.serializer()
+ ? verb.serializer()
+ : instance().callbacks.responseSerializer(id, from);
+ }
+
private int serializedSize30;
private int serializedSize3014;
private int serializedSize40;
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 94044e1..c102ee1 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -116,7 +116,7 @@ public class RequestCallbacks implements
OutboundMessageCallbacks
assert previous == null : format("Callback already exists for id
%d/%s! (%s)", message.id(), to.endpoint(), previous);
}
- <T> IVersionedAsymmetricSerializer<?, T> responseSerializer(long id,
InetAddressAndPort peer)
+ <In,Out> IVersionedAsymmetricSerializer<In, Out> responseSerializer(long
id, InetAddressAndPort peer)
{
CallbackInfo info = get(id, peer);
return info == null ? null : info.responseVerb.serializer();
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index fad2fbf..750a85e 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -327,7 +327,7 @@ public enum Verb
}
@VisibleForTesting
- public Supplier<? extends IVersionedAsymmetricSerializer<?, ?>>
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>>
serializer) throws NoSuchFieldException, IllegalAccessException
+ Supplier<? extends IVersionedAsymmetricSerializer<?, ?>>
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>>
serializer) throws NoSuchFieldException, IllegalAccessException
{
Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> original =
this.serializer;
Field field = Verb.class.getDeclaredField("serializer");
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 0ed7000..0e18aa3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -59,7 +59,6 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspaceMigrator40;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -98,7 +97,6 @@ import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
-import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.MigrationManager;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
@@ -468,8 +466,6 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
// Re-populate token metadata after commit log recover (new
peers might be loaded onto system keyspace #10293)
StorageService.instance.populateTokenMetadata();
- Verb.REQUEST_RSP.unsafeSetSerializer(() ->
ReadResponse.serializer);
-
if (config.has(NETWORK))
{
MessagingService.instance().listen();
diff --git
a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
new file mode 100644
index 0000000..628e8fc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import java.util.function.Consumer;
+
+public class Pre40MessageFilterTest extends UpgradeTestBase
+{
+ public void reserializePre40RequestPaxosTest(Consumer<IInstanceConfig>
configConsumer) throws Throwable
+ {
+ new UpgradeTestBase.TestCase()
+ .nodes(2)
+ .withConfig(configConsumer)
+ .nodesToUpgrade(1)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .setup((cluster) -> {
+ cluster.filters().outbound().allVerbs().messagesMatching((f,t,m)
-> false).drop();
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl(pk,ck,v) VALUES (1, 1, 1) IF NOT EXISTS",
+ ConsistencyLevel.QUORUM,
+ 1);
+ })
+ .runAfterNodeUpgrade((cluster, node) -> {
+ cluster.coordinator(node).execute("UPDATE " + KEYSPACE + ".tbl SET
v = ? WHERE pk = ? AND ck = ? IF v = ?",
+ ConsistencyLevel.QUORUM,
+ 2, 1, 1, 1);
+ }).run();
+ }
+
+ @Test
+ public void reserializePre40RequestPaxosWithoutNetworkTest() throws
Throwable
+ {
+ reserializePre40RequestPaxosTest(config -> {});
+ }
+
+ @Test
+ public void reserializePre40RequestPaxosWithNetworkTest() throws Throwable
+ {
+ reserializePre40RequestPaxosTest(config ->
config.with(Feature.NETWORK, Feature.GOSSIP));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]