This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 20a315099753ae5220b31f07e74dd3745d02d21b Merge: 5e2a72d 6dbf34b Author: Alex Petrov <[email protected]> AuthorDate: Thu Feb 25 14:09:10 2021 +0100 Merge branch 'cassandra-3.11' into trunk .../org/apache/cassandra/hints/HintMessage.java | 7 +- src/java/org/apache/cassandra/net/Verb.java | 2 +- .../cassandra/distributed/impl/Instance.java | 4 ++ .../distributed/test/MessageFiltersTest.java | 57 +++++++++++++++ .../apache/cassandra/hints/DTestSerializer.java | 84 ++++++++++++++++++++++ 5 files changed, 151 insertions(+), 3 deletions(-) diff --cc src/java/org/apache/cassandra/hints/HintMessage.java index 333af84,723ab6d..60adb85 --- a/src/java/org/apache/cassandra/hints/HintMessage.java +++ b/src/java/org/apache/cassandra/hints/HintMessage.java @@@ -19,12 -19,11 +19,12 @@@ package org.apache.cassandra.hints; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Objects; import java.util.UUID; -- import javax.annotation.Nullable; ++import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.cassandra.db.TypeSizes; @@@ -62,6 -62,6 +62,7 @@@ public final class HintMessage implemen HintMessage(UUID hostId, Hint hint) { ++ assert hint != null; this.hostId = hostId; this.hint = hint; this.unknownTableID = null; @@@ -74,72 -74,37 +75,74 @@@ this.unknownTableID = unknownTableID; } - public MessageOut<HintMessage> createMessageOut() - { - return new MessageOut<>(MessagingService.Verb.HINT, this, serializer); - } - - public static class Serializer implements IVersionedSerializer<HintMessage> + public static class Serializer implements IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage> { - public long serializedSize(HintMessage message, int version) + public long serializedSize(SerializableHintMessage obj, int version) { - long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); + if (obj instanceof HintMessage) + { + HintMessage message = (HintMessage) obj; - long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); + ++ Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint ++ ++ long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); + long hintSize = Hint.serializer.serializedSize(message.hint, version); + size += TypeSizes.sizeofUnsignedVInt(hintSize); + size += hintSize; + + return size; + } + else if (obj instanceof Encoded) + { + Encoded message = (Encoded) obj; - long hintSize = Hint.serializer.serializedSize(message.hint, version); - size += TypeSizes.sizeofUnsignedVInt(hintSize); - size += hintSize; + if (version != message.version) + throw new IllegalArgumentException("serializedSize() called with non-matching version " + version); - return size; + long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); + size += TypeSizes.sizeofUnsignedVInt(message.hint.remaining()); + size += message.hint.remaining(); + return size; + } + else + { + throw new IllegalStateException("Unexpected type: " + obj); + } } - public void serialize(HintMessage message, DataOutputPlus out, int version) throws IOException + public void serialize(SerializableHintMessage obj, DataOutputPlus out, int version) throws IOException { - Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint + if (obj instanceof HintMessage) + { + HintMessage message = (HintMessage) obj; - UUIDSerializer.serializer.serialize(message.hostId, out, version); + Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint - /* - * We are serializing the hint size so that the receiver of the message could gracefully handle - * deserialize failure when a table had been dropped, by simply skipping the unread bytes. - */ - out.writeUnsignedVInt(Hint.serializer.serializedSize(message.hint, version)); + UUIDSerializer.serializer.serialize(message.hostId, out, version); - Hint.serializer.serialize(message.hint, out, version); + /* + * We are serializing the hint size so that the receiver of the message could gracefully handle + * deserialize failure when a table had been dropped, by simply skipping the unread bytes. + */ + out.writeUnsignedVInt(Hint.serializer.serializedSize(message.hint, version)); + + Hint.serializer.serialize(message.hint, out, version); + } + else if (obj instanceof Encoded) + { + Encoded message = (Encoded) obj; + + if (version != message.version) + throw new IllegalArgumentException("serialize() called with non-matching version " + version); + + UUIDSerializer.serializer.serialize(message.hostId, out, version); + out.writeUnsignedVInt(message.hint.remaining()); + out.write(message.hint); + } + else + { + throw new IllegalStateException("Unexpected type: " + obj); + } } /* diff --cc src/java/org/apache/cassandra/net/Verb.java index 750a85e,0000000..fad2fbf mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@@ -1,453 -1,0 +1,453 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.function.ToLongFunction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.batchlog.Batch; +import org.apache.cassandra.batchlog.BatchRemoveVerbHandler; +import org.apache.cassandra.batchlog.BatchStoreVerbHandler; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.CounterMutationVerbHandler; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.MutationVerbHandler; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadCommandVerbHandler; +import org.apache.cassandra.db.ReadRepairVerbHandler; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.SnapshotCommand; +import org.apache.cassandra.db.TruncateResponse; +import org.apache.cassandra.db.TruncateVerbHandler; +import org.apache.cassandra.db.TruncateRequest; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.gms.GossipDigestAck; +import org.apache.cassandra.gms.GossipDigestAck2; +import org.apache.cassandra.gms.GossipDigestAck2VerbHandler; +import org.apache.cassandra.gms.GossipDigestAckVerbHandler; +import org.apache.cassandra.gms.GossipDigestSyn; +import org.apache.cassandra.gms.GossipDigestSynVerbHandler; +import org.apache.cassandra.gms.GossipShutdownVerbHandler; +import org.apache.cassandra.hints.HintMessage; +import org.apache.cassandra.hints.HintVerbHandler; +import org.apache.cassandra.io.IVersionedAsymmetricSerializer; +import org.apache.cassandra.repair.RepairMessageVerbHandler; +import org.apache.cassandra.repair.messages.CleanupMessage; +import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.FinalizeCommit; +import org.apache.cassandra.repair.messages.FinalizePromise; +import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.PrepareConsistentRequest; +import org.apache.cassandra.repair.messages.PrepareConsistentResponse; +import org.apache.cassandra.repair.messages.PrepareMessage; +import org.apache.cassandra.repair.messages.SnapshotMessage; +import org.apache.cassandra.repair.messages.StatusRequest; +import org.apache.cassandra.repair.messages.StatusResponse; +import org.apache.cassandra.repair.messages.SyncResponse; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.repair.messages.ValidationResponse; +import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.schema.SchemaPullVerbHandler; +import org.apache.cassandra.schema.SchemaPushVerbHandler; +import org.apache.cassandra.schema.SchemaVersionVerbHandler; +import org.apache.cassandra.utils.BooleanSerializer; +import org.apache.cassandra.service.EchoVerbHandler; +import org.apache.cassandra.service.SnapshotVerbHandler; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.CommitVerbHandler; +import org.apache.cassandra.service.paxos.PrepareResponse; +import org.apache.cassandra.service.paxos.PrepareVerbHandler; +import org.apache.cassandra.service.paxos.ProposeVerbHandler; +import org.apache.cassandra.streaming.ReplicationDoneVerbHandler; +import org.apache.cassandra.utils.UUIDSerializer; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.concurrent.Stage.*; +import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE; +import static org.apache.cassandra.concurrent.Stage.MISC; +import static org.apache.cassandra.net.VerbTimeouts.*; +import static org.apache.cassandra.net.Verb.Kind.*; +import static org.apache.cassandra.net.Verb.Priority.*; +import static org.apache.cassandra.schema.MigrationManager.MigrationsSerializer; + +/** + * Note that priorities except P0 are presently unused. P0 corresponds to urgent, i.e. what used to be the "Gossip" connection. + */ +public enum Verb +{ + MUTATION_RSP (60, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + MUTATION_REQ (0, P3, writeTimeout, MUTATION, () -> Mutation.serializer, () -> MutationVerbHandler.instance, MUTATION_RSP ), + HINT_RSP (61, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + HINT_REQ (1, P4, writeTimeout, MUTATION, () -> HintMessage.serializer, () -> HintVerbHandler.instance, HINT_RSP ), + READ_REPAIR_RSP (62, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + READ_REPAIR_REQ (2, P1, writeTimeout, MUTATION, () -> Mutation.serializer, () -> ReadRepairVerbHandler.instance, READ_REPAIR_RSP ), + BATCH_STORE_RSP (65, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + BATCH_STORE_REQ (5, P3, writeTimeout, MUTATION, () -> Batch.serializer, () -> BatchStoreVerbHandler.instance, BATCH_STORE_RSP ), + BATCH_REMOVE_RSP (66, P1, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + BATCH_REMOVE_REQ (6, P3, writeTimeout, MUTATION, () -> UUIDSerializer.serializer, () -> BatchRemoveVerbHandler.instance, BATCH_REMOVE_RSP ), + + PAXOS_PREPARE_RSP (93, P2, writeTimeout, REQUEST_RESPONSE, () -> PrepareResponse.serializer, () -> ResponseVerbHandler.instance ), + PAXOS_PREPARE_REQ (33, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> PrepareVerbHandler.instance, PAXOS_PREPARE_RSP ), + PAXOS_PROPOSE_RSP (94, P2, writeTimeout, REQUEST_RESPONSE, () -> BooleanSerializer.serializer, () -> ResponseVerbHandler.instance ), + PAXOS_PROPOSE_REQ (34, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> ProposeVerbHandler.instance, PAXOS_PROPOSE_RSP ), + PAXOS_COMMIT_RSP (95, P2, writeTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + PAXOS_COMMIT_REQ (35, P2, writeTimeout, MUTATION, () -> Commit.serializer, () -> CommitVerbHandler.instance, PAXOS_COMMIT_RSP ), + + TRUNCATE_RSP (79, P0, truncateTimeout, REQUEST_RESPONSE, () -> TruncateResponse.serializer, () -> ResponseVerbHandler.instance ), + TRUNCATE_REQ (19, P0, truncateTimeout, MUTATION, () -> TruncateRequest.serializer, () -> TruncateVerbHandler.instance, TRUNCATE_RSP ), + + COUNTER_MUTATION_RSP (84, P1, counterTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + COUNTER_MUTATION_REQ (24, P2, counterTimeout, COUNTER_MUTATION, () -> CounterMutation.serializer, () -> CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP), + + READ_RSP (63, P2, readTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ), + READ_REQ (3, P3, readTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, READ_RSP ), + RANGE_RSP (69, P2, rangeTimeout, REQUEST_RESPONSE, () -> ReadResponse.serializer, () -> ResponseVerbHandler.instance ), + RANGE_REQ (9, P3, rangeTimeout, READ, () -> ReadCommand.serializer, () -> ReadCommandVerbHandler.instance, RANGE_RSP ), + + GOSSIP_DIGEST_SYN (14, P0, longTimeout, GOSSIP, () -> GossipDigestSyn.serializer, () -> GossipDigestSynVerbHandler.instance ), + GOSSIP_DIGEST_ACK (15, P0, longTimeout, GOSSIP, () -> GossipDigestAck.serializer, () -> GossipDigestAckVerbHandler.instance ), + GOSSIP_DIGEST_ACK2 (16, P0, longTimeout, GOSSIP, () -> GossipDigestAck2.serializer, () -> GossipDigestAck2VerbHandler.instance ), + GOSSIP_SHUTDOWN (29, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> GossipShutdownVerbHandler.instance ), + + ECHO_RSP (91, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + ECHO_REQ (31, P0, rpcTimeout, GOSSIP, () -> NoPayload.serializer, () -> EchoVerbHandler.instance, ECHO_RSP ), + PING_RSP (97, P1, pingTimeout, GOSSIP, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + PING_REQ (37, P1, pingTimeout, GOSSIP, () -> PingRequest.serializer, () -> PingVerbHandler.instance, PING_RSP ), + + // P1 because messages can be arbitrarily large or aren't crucial + SCHEMA_PUSH_RSP (98, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + SCHEMA_PUSH_REQ (18, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> SchemaPushVerbHandler.instance, SCHEMA_PUSH_RSP ), + SCHEMA_PULL_RSP (88, P1, rpcTimeout, MIGRATION, () -> MigrationsSerializer.instance, () -> ResponseVerbHandler.instance ), + SCHEMA_PULL_REQ (28, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaPullVerbHandler.instance, SCHEMA_PULL_RSP ), + SCHEMA_VERSION_RSP (80, P1, rpcTimeout, MIGRATION, () -> UUIDSerializer.serializer, () -> ResponseVerbHandler.instance ), + SCHEMA_VERSION_REQ (20, P1, rpcTimeout, MIGRATION, () -> NoPayload.serializer, () -> SchemaVersionVerbHandler.instance, SCHEMA_VERSION_RSP ), + + // repair; mostly doesn't use callbacks and sends responses as their own request messages, with matching sessions by uuid; should eventually harmonize and make idiomatic + REPAIR_RSP (100, P1, rpcTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + VALIDATION_RSP (102, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + VALIDATION_REQ (101, P1, rpcTimeout, ANTI_ENTROPY, () -> ValidationRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_RSP (104, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SYNC_REQ (103, P1, rpcTimeout, ANTI_ENTROPY, () -> SyncRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_MSG (105, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + SNAPSHOT_MSG (106, P1, rpcTimeout, ANTI_ENTROPY, () -> SnapshotMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + CLEANUP_MSG (107, P1, rpcTimeout, ANTI_ENTROPY, () -> CleanupMessage.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout, ANTI_ENTROPY, () -> PrepareConsistentRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROPOSE_MSG (110, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePropose.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_PROMISE_MSG (111, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FINALIZE_COMMIT_MSG (112, P1, rpcTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + FAILED_SESSION_MSG (113, P1, rpcTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_RSP (115, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + STATUS_REQ (114, P1, rpcTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance, REPAIR_RSP ), + + REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP), + SNAPSHOT_RSP (87, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), + SNAPSHOT_REQ (27, P0, rpcTimeout, MISC, () -> SnapshotCommand.serializer, () -> SnapshotVerbHandler.instance, SNAPSHOT_RSP ), + + // generic failure response + FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailureReason.serializer, () -> ResponseVerbHandler.instance ), + + // dummy verbs + _TRACE (30, P1, rpcTimeout, TRACING, () -> NoPayload.serializer, () -> null ), + _SAMPLE (42, P1, rpcTimeout, INTERNAL_RESPONSE, () -> NoPayload.serializer, () -> null ), + _TEST_1 (10, P0, writeTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ), + _TEST_2 (11, P1, rpcTimeout, IMMEDIATE, () -> NoPayload.serializer, () -> null ), + + @Deprecated + REQUEST_RSP (4, P1, rpcTimeout, REQUEST_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ), + @Deprecated + INTERNAL_RSP (23, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> ResponseVerbHandler.instance ), + + // largest used ID: 116 + + // CUSTOM VERBS + UNUSED_CUSTOM_VERB (CUSTOM, + 0, P1, rpcTimeout, INTERNAL_RESPONSE, () -> null, () -> null ), + + ; + + public static final List<Verb> VERBS = ImmutableList.copyOf(Verb.values()); + + public enum Priority + { + P0, // sends on the urgent connection (i.e. for Gossip, Echo) + P1, // small or empty responses + P2, // larger messages that can be dropped but who have a larger impact on system stability (e.g. READ_REPAIR, READ_RSP) + P3, + P4 + } + + public enum Kind + { + NORMAL, + CUSTOM + } + + public final int id; + public final Priority priority; + public final Stage stage; + public final Kind kind; + + /** + * Messages we receive from peers have a Verb that tells us what kind of message it is. + * Most of the time, this is enough to determine how to deserialize the message payload. + * The exception is the REQUEST_RSP verb, which just means "a response to something you told me to do." + * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and + * handled the deserialization itself. Now that we do that in ITC, to avoid the extra copy to an + * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to the CallbackInfo object + * (see below). + * + * NOTE: we use a Supplier to avoid loading the dependent classes until necessary. + */ + private final Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer; + private final Supplier<? extends IVerbHandler<?>> handler; + + final Verb responseVerb; + + private final ToLongFunction<TimeUnit> expiration; + + + /** + * Verbs it's okay to drop if the request has been queued longer than the request timeout. These + * all correspond to client requests or something triggered by them; we don't want to + * drop internal messages like bootstrap or repair notifications. + */ + Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler) + { + this(id, priority, expiration, stage, serializer, handler, null); + } + + Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb) + { + this(NORMAL, id, priority, expiration, stage, serializer, handler, responseVerb); + } + + Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler) + { + this(kind, id, priority, expiration, stage, serializer, handler, null); + } + + Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb) + { + this.stage = stage; + if (id < 0) + throw new IllegalArgumentException("Verb id must be non-negative, got " + id + " for verb " + name()); + + if (kind == CUSTOM) + { + if (id > MAX_CUSTOM_VERB_ID) + throw new AssertionError("Invalid custom verb id " + id + " - we only allow custom ids between 0 and " + MAX_CUSTOM_VERB_ID); + this.id = idForCustomVerb(id); + } + else + { + if (id > CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID) + throw new AssertionError("Invalid verb id " + id + " - we only allow ids between 0 and " + (CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID)); + this.id = id; + } + this.priority = priority; + this.serializer = serializer; + this.handler = handler; + this.responseVerb = responseVerb; + this.expiration = expiration; + this.kind = kind; + } + + public <In, Out> IVersionedAsymmetricSerializer<In, Out> serializer() + { + return (IVersionedAsymmetricSerializer<In, Out>) serializer.get(); + } + + public <T> IVerbHandler<T> handler() + { + return (IVerbHandler<T>) handler.get(); + } + + public long expiresAtNanos(long nowNanos) + { + return nowNanos + expiresAfterNanos(); + } + + public long expiresAfterNanos() + { + return expiration.applyAsLong(NANOSECONDS); + } + + // this is a little hacky, but reduces the number of parameters up top + public boolean isResponse() + { + return handler.get() == ResponseVerbHandler.instance; + } + + Verb toPre40Verb() + { + if (!isResponse()) + return this; + if (priority == P0) + return INTERNAL_RSP; + return REQUEST_RSP; + } + + @VisibleForTesting + Supplier<? extends IVerbHandler<?>> unsafeSetHandler(Supplier<? extends IVerbHandler<?>> handler) throws NoSuchFieldException, IllegalAccessException + { + Supplier<? extends IVerbHandler<?>> original = this.handler; + Field field = Verb.class.getDeclaredField("handler"); + field.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(this, handler); + return original; + } + + @VisibleForTesting - Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer) throws NoSuchFieldException, IllegalAccessException ++ public Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> serializer) throws NoSuchFieldException, IllegalAccessException + { + Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> original = this.serializer; + Field field = Verb.class.getDeclaredField("serializer"); + field.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(this, serializer); + return original; + } + + @VisibleForTesting + ToLongFunction<TimeUnit> unsafeSetExpiration(ToLongFunction<TimeUnit> expiration) throws NoSuchFieldException, IllegalAccessException + { + ToLongFunction<TimeUnit> original = this.expiration; + Field field = Verb.class.getDeclaredField("expiration"); + field.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(this, expiration); + return original; + } + + // This is the largest number we can store in 2 bytes using VIntCoding (1 bit per byte is used to indicate if there is more data coming). + // When generating ids we count *down* from this number + private static final int CUSTOM_VERB_START = (1 << (7 * 2)) - 1; + + // Sanity check for the custom verb ids - avoids someone mistakenly adding a custom verb id close to the normal verbs which + // could cause a conflict later when new normal verbs are added. + private static final int MAX_CUSTOM_VERB_ID = 1000; + + private static final Verb[] idToVerbMap; + private static final Verb[] idToCustomVerbMap; + private static final int minCustomId; + + static + { + Verb[] verbs = values(); + int max = -1; + int minCustom = Integer.MAX_VALUE; + for (Verb v : verbs) + { + switch (v.kind) + { + case NORMAL: + max = Math.max(v.id, max); + break; + case CUSTOM: + minCustom = Math.min(v.id, minCustom); + break; + default: + throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v); + } + } + minCustomId = minCustom; + + if (minCustom <= max) + throw new IllegalStateException("Overlapping verb ids are not allowed"); + + Verb[] idMap = new Verb[max + 1]; + int customCount = minCustom < Integer.MAX_VALUE ? CUSTOM_VERB_START - minCustom : 0; + Verb[] customIdMap = new Verb[customCount + 1]; + for (Verb v : verbs) + { + switch (v.kind) + { + case NORMAL: + if (idMap[v.id] != null) + throw new IllegalArgumentException("cannot have two verbs that map to the same id: " + v + " and " + idMap[v.id]); + idMap[v.id] = v; + break; + case CUSTOM: + int relativeId = idForCustomVerb(v.id); + if (customIdMap[relativeId] != null) + throw new IllegalArgumentException("cannot have two custom verbs that map to the same id: " + v + " and " + customIdMap[relativeId]); + customIdMap[relativeId] = v; + break; + default: + throw new AssertionError("Unsupported Verb Kind: " + v.kind + " for verb " + v); + } + } + + idToVerbMap = idMap; + idToCustomVerbMap = customIdMap; + } + + public static Verb fromId(int id) + { + Verb[] verbs = idToVerbMap; + if (id >= minCustomId) + { + id = idForCustomVerb(id); + verbs = idToCustomVerbMap; + } + Verb verb = id >= 0 && id < verbs.length ? verbs[id] : null; + if (verb == null) + throw new IllegalArgumentException("Unknown verb id " + id); + return verb; + } + + /** + * calculate an id for a custom verb + */ + private static int idForCustomVerb(int id) + { + return CUSTOM_VERB_START - id; + } +} + +@SuppressWarnings("unused") +class VerbTimeouts +{ + static final ToLongFunction<TimeUnit> rpcTimeout = DatabaseDescriptor::getRpcTimeout; + static final ToLongFunction<TimeUnit> writeTimeout = DatabaseDescriptor::getWriteRpcTimeout; + static final ToLongFunction<TimeUnit> readTimeout = DatabaseDescriptor::getReadRpcTimeout; + static final ToLongFunction<TimeUnit> rangeTimeout = DatabaseDescriptor::getRangeRpcTimeout; + static final ToLongFunction<TimeUnit> counterTimeout = DatabaseDescriptor::getCounterWriteRpcTimeout; + static final ToLongFunction<TimeUnit> truncateTimeout = DatabaseDescriptor::getTruncateRpcTimeout; + static final ToLongFunction<TimeUnit> pingTimeout = DatabaseDescriptor::getPingTimeout; + static final ToLongFunction<TimeUnit> longTimeout = units -> Math.max(DatabaseDescriptor.getRpcTimeout(units), units.convert(5L, TimeUnit.MINUTES)); + static final ToLongFunction<TimeUnit> noTimeout = units -> { throw new IllegalStateException(); }; +} diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 0e18aa3,7784214..b79ccbc --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -84,23 -84,20 +84,25 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; + import org.apache.cassandra.hints.DTestSerializer; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.io.IVersionedAsymmetricSerializer; import org.apache.cassandra.io.sstable.IndexSummaryManager; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.CassandraMetricsRegistry; -import org.apache.cassandra.net.IMessageSink; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.LegacySchemaMigrator; +import org.apache.cassandra.net.NoPayload; ++import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.DefaultFSErrorHandler; @@@ -463,11 -539,9 +465,13 @@@ public class Instance extends IsolatedE throw new RuntimeException(e); } + // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293) + StorageService.instance.populateTokenMetadata(); + ++ Verb.HINT_REQ.unsafeSetSerializer(DTestSerializer::new); ++ if (config.has(NETWORK)) { - registerFilters(cluster); MessagingService.instance().listen(); } else diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java index bd09891,be3622d..44f70e4 --- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java @@@ -39,12 -36,13 +39,16 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.distributed.impl.Instance; import org.apache.cassandra.distributed.shared.MessageFilters; + import org.apache.cassandra.hints.HintMessage; -import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.Verb; + import static org.apache.cassandra.distributed.api.Feature.GOSSIP; + import static org.apache.cassandra.distributed.api.Feature.NETWORK; + public class MessageFiltersTest extends TestBaseImpl { @Test @@@ -264,6 -216,59 +268,59 @@@ } } + @Test + public void hintSerializationTest() throws Exception + { + try (Cluster cluster = init(builder().withNodes(3) + .withConfig(config -> config.with(GOSSIP) + .with(NETWORK) + .set("hinted_handoff_enabled", true)) + .start())) + { + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int PRIMARY KEY, v int)")); + executeWithWriteFailure(cluster, + withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1,1)"), + ConsistencyLevel.QUORUM, + 1); + CountDownLatch latch = new CountDownLatch(1); - cluster.filters().verbs(MessagingService.Verb.HINT.ordinal()).messagesMatching((a,b,msg) -> { ++ cluster.filters().verbs(Verb.HINT_REQ.id).messagesMatching((a,b,msg) -> { + cluster.get(1).acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<IMessage>) (m) -> { + HintMessage hintMessage = (HintMessage) Instance.deserializeMessage(m).payload; + assert hintMessage != null; + }).accept(msg); + + latch.countDown(); + return false; + }).drop().on(); + cluster.schemaChange(withKeyspace("DROP TABLE %s.tbl")); + latch.await(); + } + } + + public Object[][] executeWithWriteFailure(Cluster cluster, String statement, ConsistencyLevel cl, int coordinator, Object... bindings) + { + IMessageFilters filters = cluster.filters(); + + // Drop exactly one coordinated message - filters.verbs(MessagingService.Verb.MUTATION.ordinal()).from(coordinator).messagesMatching(new IMessageFilters.Matcher() ++ filters.verbs(Verb.MUTATION_REQ.id).from(coordinator).messagesMatching(new IMessageFilters.Matcher() + { + private final AtomicBoolean issued = new AtomicBoolean(); + + public boolean matches(int from, int to, IMessage message) + { - if (from != coordinator || message.verb() != MessagingService.Verb.MUTATION.ordinal()) ++ if (from != coordinator || message.verb() != Verb.MUTATION_REQ.id) + return false; + + return !issued.getAndSet(true); + } + }).drop().on(); + Object[][] res = cluster + .coordinator(coordinator) + .execute(statement, cl, bindings); + filters.reset(); + return res; + } + private static void assertTimeOut(Runnable r) { try diff --cc test/unit/org/apache/cassandra/hints/DTestSerializer.java index 0000000,09b87d3..1e308dc mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/hints/DTestSerializer.java +++ b/test/unit/org/apache/cassandra/hints/DTestSerializer.java @@@ -1,0 -1,87 +1,84 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.hints; + + import java.io.IOException; -import java.util.Objects; + import java.util.UUID; + ++import com.google.common.annotations.VisibleForTesting; + import com.google.common.primitives.Ints; + + import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.UnknownColumnFamilyException; -import org.apache.cassandra.io.IVersionedSerializer; ++import org.apache.cassandra.exceptions.UnknownTableException; ++import org.apache.cassandra.io.IVersionedAsymmetricSerializer; + import org.apache.cassandra.io.util.DataInputPlus; + import org.apache.cassandra.io.util.DataOutputPlus; + import org.apache.cassandra.io.util.TrackedDataInputPlus; ++import org.apache.cassandra.schema.TableId; + import org.apache.cassandra.utils.UUIDSerializer; + + // Fake serializer for dtests. Located in hints package to avoid publishing package-private fields. -public class DTestSerializer implements IVersionedSerializer<HintMessage> ++public class DTestSerializer implements IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage> + { - public long serializedSize(HintMessage message, int version) ++ public void serialize(SerializableHintMessage obj, DataOutputPlus out, int version) throws IOException + { - if (message.hint != null) - return HintMessage.serializer.serializedSize(message, version); - - long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); - size += TypeSizes.sizeofUnsignedVInt(0); - size += UUIDSerializer.serializer.serializedSize(message.unknownTableID, version); - return size; - } - - public void serialize(HintMessage message, DataOutputPlus out, int version) throws IOException - { - if (message.hint != null) ++ HintMessage message; ++ if (!(obj instanceof HintMessage) || (message = (HintMessage) obj).hint != null) + { - HintMessage.serializer.serialize(message, out, version); ++ HintMessage.serializer.serialize(obj, out, version); + return; + } + + UUIDSerializer.serializer.serialize(message.hostId, out, version); + out.writeUnsignedVInt(0); - UUIDSerializer.serializer.serialize(message.unknownTableID, out, version); ++ message.unknownTableID.serialize(out); + } + - /* - * It's not an exceptional scenario to have a hints file streamed that have partition updates for tables - * that don't exist anymore. We want to handle that case gracefully instead of dropping the connection for every - * one of them. - */ + public HintMessage deserialize(DataInputPlus in, int version) throws IOException + { + UUID hostId = UUIDSerializer.serializer.deserialize(in, version); + + long hintSize = in.readUnsignedVInt(); + TrackedDataInputPlus countingIn = new TrackedDataInputPlus(in); - + if (hintSize == 0) - return new HintMessage(hostId, UUIDSerializer.serializer.deserialize(in, version)); ++ return new HintMessage(hostId, TableId.deserialize(countingIn)); + + try + { + return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version)); + } - catch (UnknownColumnFamilyException e) ++ catch (UnknownTableException e) + { + in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead())); - return new HintMessage(hostId, e.cfId); ++ return new HintMessage(hostId, e.id); + } + } -} ++ ++ public long serializedSize(SerializableHintMessage obj, int version) ++ { ++ HintMessage message; ++ if (!(obj instanceof HintMessage) || (message = (HintMessage) obj).hint != null) ++ return HintMessage.serializer.serializedSize(obj, version); ++ ++ long size = UUIDSerializer.serializer.serializedSize(message.hostId, version); ++ size += TypeSizes.sizeofUnsignedVInt(0); ++ size += UUIDSerializer.serializer.serializedSize(message.unknownTableID.asUUID(), version); ++ return size; ++ } ++} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
