This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 20a315099753ae5220b31f07e74dd3745d02d21b
Merge: 5e2a72d 6dbf34b
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Feb 25 14:09:10 2021 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../org/apache/cassandra/hints/HintMessage.java    |  7 +-
 src/java/org/apache/cassandra/net/Verb.java        |  2 +-
 .../cassandra/distributed/impl/Instance.java       |  4 ++
 .../distributed/test/MessageFiltersTest.java       | 57 +++++++++++++++
 .../apache/cassandra/hints/DTestSerializer.java    | 84 ++++++++++++++++++++++
 5 files changed, 151 insertions(+), 3 deletions(-)

diff --cc src/java/org/apache/cassandra/hints/HintMessage.java
index 333af84,723ab6d..60adb85
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@@ -19,12 -19,11 +19,12 @@@
  package org.apache.cassandra.hints;
  
  import java.io.IOException;
 +import java.nio.ByteBuffer;
  import java.util.Objects;
  import java.util.UUID;
--
  import javax.annotation.Nullable;
  
++import com.google.common.annotations.VisibleForTesting;
  import com.google.common.primitives.Ints;
  
  import org.apache.cassandra.db.TypeSizes;
@@@ -62,6 -62,6 +62,7 @@@ public final class HintMessage implemen
  
      HintMessage(UUID hostId, Hint hint)
      {
++        assert hint != null;
          this.hostId = hostId;
          this.hint = hint;
          this.unknownTableID = null;
@@@ -74,72 -74,37 +75,74 @@@
          this.unknownTableID = unknownTableID;
      }
  
 -    public MessageOut<HintMessage> createMessageOut()
 -    {
 -        return new MessageOut<>(MessagingService.Verb.HINT, this, serializer);
 -    }
 -
 -    public static class Serializer implements 
IVersionedSerializer<HintMessage>
 +    public static class Serializer implements 
IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage>
      {
 -        public long serializedSize(HintMessage message, int version)
 +        public long serializedSize(SerializableHintMessage obj, int version)
          {
 -            long size = 
UUIDSerializer.serializer.serializedSize(message.hostId, version);
 +            if (obj instanceof HintMessage)
 +            {
 +                HintMessage message = (HintMessage) obj;
-                 long size = 
UUIDSerializer.serializer.serializedSize(message.hostId, version);
 +
++                Objects.requireNonNull(message.hint); // we should never 
*send* a HintMessage with null hint
++
++                long size = 
UUIDSerializer.serializer.serializedSize(message.hostId, version);
 +                long hintSize = Hint.serializer.serializedSize(message.hint, 
version);
 +                size += TypeSizes.sizeofUnsignedVInt(hintSize);
 +                size += hintSize;
 +
 +                return size;
 +            }
 +            else if (obj instanceof Encoded)
 +            {
 +                Encoded message = (Encoded) obj;
  
 -            long hintSize = Hint.serializer.serializedSize(message.hint, 
version);
 -            size += TypeSizes.sizeofUnsignedVInt(hintSize);
 -            size += hintSize;
 +                if (version != message.version)
 +                    throw new IllegalArgumentException("serializedSize() 
called with non-matching version " + version);
  
 -            return size;
 +                long size = 
UUIDSerializer.serializer.serializedSize(message.hostId, version);
 +                size += 
TypeSizes.sizeofUnsignedVInt(message.hint.remaining());
 +                size += message.hint.remaining();
 +                return size;
 +            }
 +            else
 +            {
 +                throw new IllegalStateException("Unexpected type: " + obj);
 +            }
          }
  
 -        public void serialize(HintMessage message, DataOutputPlus out, int 
version) throws IOException
 +        public void serialize(SerializableHintMessage obj, DataOutputPlus 
out, int version) throws IOException
          {
 -            Objects.requireNonNull(message.hint); // we should never *send* a 
HintMessage with null hint
 +            if (obj instanceof HintMessage)
 +            {
 +                HintMessage message = (HintMessage) obj;
  
 -            UUIDSerializer.serializer.serialize(message.hostId, out, version);
 +                Objects.requireNonNull(message.hint); // we should never 
*send* a HintMessage with null hint
  
 -            /*
 -             * We are serializing the hint size so that the receiver of the 
message could gracefully handle
 -             * deserialize failure when a table had been dropped, by simply 
skipping the unread bytes.
 -             */
 -            
out.writeUnsignedVInt(Hint.serializer.serializedSize(message.hint, version));
 +                UUIDSerializer.serializer.serialize(message.hostId, out, 
version);
  
 -            Hint.serializer.serialize(message.hint, out, version);
 +                /*
 +                 * We are serializing the hint size so that the receiver of 
the message could gracefully handle
 +                 * deserialize failure when a table had been dropped, by 
simply skipping the unread bytes.
 +                 */
 +                
out.writeUnsignedVInt(Hint.serializer.serializedSize(message.hint, version));
 +
 +                Hint.serializer.serialize(message.hint, out, version);
 +            }
 +            else if (obj instanceof Encoded)
 +            {
 +                Encoded message = (Encoded) obj;
 +
 +                if (version != message.version)
 +                    throw new IllegalArgumentException("serialize() called 
with non-matching version " + version);
 +
 +                UUIDSerializer.serializer.serialize(message.hostId, out, 
version);
 +                out.writeUnsignedVInt(message.hint.remaining());
 +                out.write(message.hint);
 +            }
 +            else
 +            {
 +                throw new IllegalStateException("Unexpected type: " + obj);
 +            }
          }
  
          /*
diff --cc src/java/org/apache/cassandra/net/Verb.java
index 750a85e,0000000..fad2fbf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@@ -1,453 -1,0 +1,453 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Supplier;
 +import java.util.function.ToLongFunction;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
 +
 +import org.apache.cassandra.batchlog.Batch;
 +import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
 +import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.CounterMutation;
 +import org.apache.cassandra.db.CounterMutationVerbHandler;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.MutationVerbHandler;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadCommandVerbHandler;
 +import org.apache.cassandra.db.ReadRepairVerbHandler;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.SnapshotCommand;
 +import org.apache.cassandra.db.TruncateResponse;
 +import org.apache.cassandra.db.TruncateVerbHandler;
 +import org.apache.cassandra.db.TruncateRequest;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.gms.GossipDigestAck;
 +import org.apache.cassandra.gms.GossipDigestAck2;
 +import org.apache.cassandra.gms.GossipDigestAck2VerbHandler;
 +import org.apache.cassandra.gms.GossipDigestAckVerbHandler;
 +import org.apache.cassandra.gms.GossipDigestSyn;
 +import org.apache.cassandra.gms.GossipDigestSynVerbHandler;
 +import org.apache.cassandra.gms.GossipShutdownVerbHandler;
 +import org.apache.cassandra.hints.HintMessage;
 +import org.apache.cassandra.hints.HintVerbHandler;
 +import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
 +import org.apache.cassandra.repair.RepairMessageVerbHandler;
 +import org.apache.cassandra.repair.messages.CleanupMessage;
 +import org.apache.cassandra.repair.messages.FailSession;
 +import org.apache.cassandra.repair.messages.FinalizeCommit;
 +import org.apache.cassandra.repair.messages.FinalizePromise;
 +import org.apache.cassandra.repair.messages.FinalizePropose;
 +import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
 +import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
 +import org.apache.cassandra.repair.messages.PrepareMessage;
 +import org.apache.cassandra.repair.messages.SnapshotMessage;
 +import org.apache.cassandra.repair.messages.StatusRequest;
 +import org.apache.cassandra.repair.messages.StatusResponse;
 +import org.apache.cassandra.repair.messages.SyncResponse;
 +import org.apache.cassandra.repair.messages.SyncRequest;
 +import org.apache.cassandra.repair.messages.ValidationResponse;
 +import org.apache.cassandra.repair.messages.ValidationRequest;
 +import org.apache.cassandra.schema.SchemaPullVerbHandler;
 +import org.apache.cassandra.schema.SchemaPushVerbHandler;
 +import org.apache.cassandra.schema.SchemaVersionVerbHandler;
 +import org.apache.cassandra.utils.BooleanSerializer;
 +import org.apache.cassandra.service.EchoVerbHandler;
 +import org.apache.cassandra.service.SnapshotVerbHandler;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.CommitVerbHandler;
 +import org.apache.cassandra.service.paxos.PrepareResponse;
 +import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 +import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 +import org.apache.cassandra.streaming.ReplicationDoneVerbHandler;
 +import org.apache.cassandra.utils.UUIDSerializer;
 +
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static org.apache.cassandra.concurrent.Stage.*;
 +import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
 +import static org.apache.cassandra.concurrent.Stage.MISC;
 +import static org.apache.cassandra.net.VerbTimeouts.*;
 +import static org.apache.cassandra.net.Verb.Kind.*;
 +import static org.apache.cassandra.net.Verb.Priority.*;
 +import static 
org.apache.cassandra.schema.MigrationManager.MigrationsSerializer;
 +
 +/**
 + * Note that priorities except P0 are presently unused.  P0 corresponds to 
urgent, i.e. what used to be the "Gossip" connection.
 + */
 +public enum Verb
 +{
 +    MUTATION_RSP           (60,  P1, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    MUTATION_REQ           (0,   P3, writeTimeout,    MUTATION,          () 
-> Mutation.serializer,                  () -> MutationVerbHandler.instance,    
    MUTATION_RSP        ),
 +    HINT_RSP               (61,  P1, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    HINT_REQ               (1,   P4, writeTimeout,    MUTATION,          () 
-> HintMessage.serializer,               () -> HintVerbHandler.instance,        
    HINT_RSP            ),
 +    READ_REPAIR_RSP        (62,  P1, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    READ_REPAIR_REQ        (2,   P1, writeTimeout,    MUTATION,          () 
-> Mutation.serializer,                  () -> ReadRepairVerbHandler.instance,  
    READ_REPAIR_RSP     ),
 +    BATCH_STORE_RSP        (65,  P1, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    BATCH_STORE_REQ        (5,   P3, writeTimeout,    MUTATION,          () 
-> Batch.serializer,                     () -> BatchStoreVerbHandler.instance,  
    BATCH_STORE_RSP     ),
 +    BATCH_REMOVE_RSP       (66,  P1, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    BATCH_REMOVE_REQ       (6,   P3, writeTimeout,    MUTATION,          () 
-> UUIDSerializer.serializer,            () -> BatchRemoveVerbHandler.instance, 
    BATCH_REMOVE_RSP    ),
 +
 +    PAXOS_PREPARE_RSP      (93,  P2, writeTimeout,    REQUEST_RESPONSE,  () 
-> PrepareResponse.serializer,           () -> ResponseVerbHandler.instance     
                        ),
 +    PAXOS_PREPARE_REQ      (33,  P2, writeTimeout,    MUTATION,          () 
-> Commit.serializer,                    () -> PrepareVerbHandler.instance,     
    PAXOS_PREPARE_RSP   ),
 +    PAXOS_PROPOSE_RSP      (94,  P2, writeTimeout,    REQUEST_RESPONSE,  () 
-> BooleanSerializer.serializer,         () -> ResponseVerbHandler.instance     
                        ),
 +    PAXOS_PROPOSE_REQ      (34,  P2, writeTimeout,    MUTATION,          () 
-> Commit.serializer,                    () -> ProposeVerbHandler.instance,     
    PAXOS_PROPOSE_RSP   ),
 +    PAXOS_COMMIT_RSP       (95,  P2, writeTimeout,    REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    PAXOS_COMMIT_REQ       (35,  P2, writeTimeout,    MUTATION,          () 
-> Commit.serializer,                    () -> CommitVerbHandler.instance,      
    PAXOS_COMMIT_RSP    ),
 +
 +    TRUNCATE_RSP           (79,  P0, truncateTimeout, REQUEST_RESPONSE,  () 
-> TruncateResponse.serializer,          () -> ResponseVerbHandler.instance     
                        ),
 +    TRUNCATE_REQ           (19,  P0, truncateTimeout, MUTATION,          () 
-> TruncateRequest.serializer,           () -> TruncateVerbHandler.instance,    
    TRUNCATE_RSP        ),
 +
 +    COUNTER_MUTATION_RSP   (84,  P1, counterTimeout,  REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    COUNTER_MUTATION_REQ   (24,  P2, counterTimeout,  COUNTER_MUTATION,  () 
-> CounterMutation.serializer,           () -> 
CounterMutationVerbHandler.instance, COUNTER_MUTATION_RSP),
 +
 +    READ_RSP               (63,  P2, readTimeout,     REQUEST_RESPONSE,  () 
-> ReadResponse.serializer,              () -> ResponseVerbHandler.instance     
                        ),
 +    READ_REQ               (3,   P3, readTimeout,     READ,              () 
-> ReadCommand.serializer,               () -> ReadCommandVerbHandler.instance, 
    READ_RSP            ),
 +    RANGE_RSP              (69,  P2, rangeTimeout,    REQUEST_RESPONSE,  () 
-> ReadResponse.serializer,              () -> ResponseVerbHandler.instance     
                        ),
 +    RANGE_REQ              (9,   P3, rangeTimeout,    READ,              () 
-> ReadCommand.serializer,               () -> ReadCommandVerbHandler.instance, 
    RANGE_RSP           ),
 +
 +    GOSSIP_DIGEST_SYN      (14,  P0, longTimeout,     GOSSIP,            () 
-> GossipDigestSyn.serializer,           () -> 
GossipDigestSynVerbHandler.instance                      ),
 +    GOSSIP_DIGEST_ACK      (15,  P0, longTimeout,     GOSSIP,            () 
-> GossipDigestAck.serializer,           () -> 
GossipDigestAckVerbHandler.instance                      ),
 +    GOSSIP_DIGEST_ACK2     (16,  P0, longTimeout,     GOSSIP,            () 
-> GossipDigestAck2.serializer,          () -> 
GossipDigestAck2VerbHandler.instance                     ),
 +    GOSSIP_SHUTDOWN        (29,  P0, rpcTimeout,      GOSSIP,            () 
-> NoPayload.serializer,                 () -> 
GossipShutdownVerbHandler.instance                       ),
 +
 +    ECHO_RSP               (91,  P0, rpcTimeout,      GOSSIP,            () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    ECHO_REQ               (31,  P0, rpcTimeout,      GOSSIP,            () 
-> NoPayload.serializer,                 () -> EchoVerbHandler.instance,        
    ECHO_RSP            ),
 +    PING_RSP               (97,  P1, pingTimeout,     GOSSIP,            () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    PING_REQ               (37,  P1, pingTimeout,     GOSSIP,            () 
-> PingRequest.serializer,               () -> PingVerbHandler.instance,        
    PING_RSP            ),
 +
 +    // P1 because messages can be arbitrarily large or aren't crucial
 +    SCHEMA_PUSH_RSP        (98,  P1, rpcTimeout,      MIGRATION,         () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    SCHEMA_PUSH_REQ        (18,  P1, rpcTimeout,      MIGRATION,         () 
-> MigrationsSerializer.instance,        () -> SchemaPushVerbHandler.instance,  
    SCHEMA_PUSH_RSP     ),
 +    SCHEMA_PULL_RSP        (88,  P1, rpcTimeout,      MIGRATION,         () 
-> MigrationsSerializer.instance,        () -> ResponseVerbHandler.instance     
                        ),
 +    SCHEMA_PULL_REQ        (28,  P1, rpcTimeout,      MIGRATION,         () 
-> NoPayload.serializer,                 () -> SchemaPullVerbHandler.instance,  
    SCHEMA_PULL_RSP     ),
 +    SCHEMA_VERSION_RSP     (80,  P1, rpcTimeout,      MIGRATION,         () 
-> UUIDSerializer.serializer,            () -> ResponseVerbHandler.instance     
                        ),
 +    SCHEMA_VERSION_REQ     (20,  P1, rpcTimeout,      MIGRATION,         () 
-> NoPayload.serializer,                 () -> 
SchemaVersionVerbHandler.instance,   SCHEMA_VERSION_RSP  ),
 +
 +    // repair; mostly doesn't use callbacks and sends responses as their own 
request messages, with matching sessions by uuid; should eventually harmonize 
and make idiomatic
 +    REPAIR_RSP             (100, P1, rpcTimeout,      REQUEST_RESPONSE,  () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    VALIDATION_RSP         (102, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> ValidationResponse.serializer,        () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    VALIDATION_REQ         (101, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> ValidationRequest.serializer,         () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SYNC_RSP               (104, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> SyncResponse.serializer,              () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SYNC_REQ               (103, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> SyncRequest.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_MSG            (105, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> PrepareMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    SNAPSHOT_MSG           (106, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> SnapshotMessage.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    CLEANUP_MSG            (107, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> CleanupMessage.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_CONSISTENT_RSP (109, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> PrepareConsistentResponse.serializer, () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    PREPARE_CONSISTENT_REQ (108, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> PrepareConsistentRequest.serializer,  () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_PROPOSE_MSG   (110, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> FinalizePropose.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_PROMISE_MSG   (111, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> FinalizePromise.serializer,           () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FINALIZE_COMMIT_MSG    (112, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> FinalizeCommit.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    FAILED_SESSION_MSG     (113, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> FailSession.serializer,               () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    STATUS_RSP             (115, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> StatusResponse.serializer,            () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +    STATUS_REQ             (114, P1, rpcTimeout,      ANTI_ENTROPY,      () 
-> StatusRequest.serializer,             () -> 
RepairMessageVerbHandler.instance,   REPAIR_RSP          ),
 +
 +    REPLICATION_DONE_RSP   (82,  P0, rpcTimeout,      MISC,              () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    REPLICATION_DONE_REQ   (22,  P0, rpcTimeout,      MISC,              () 
-> NoPayload.serializer,                 () -> 
ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
 +    SNAPSHOT_RSP           (87,  P0, rpcTimeout,      MISC,              () 
-> NoPayload.serializer,                 () -> ResponseVerbHandler.instance     
                        ),
 +    SNAPSHOT_REQ           (27,  P0, rpcTimeout,      MISC,              () 
-> SnapshotCommand.serializer,           () -> SnapshotVerbHandler.instance,    
    SNAPSHOT_RSP        ),
 +
 +    // generic failure response
 +    FAILURE_RSP            (99,  P0, noTimeout,       REQUEST_RESPONSE,  () 
-> RequestFailureReason.serializer,      () -> ResponseVerbHandler.instance     
                        ),
 +
 +    // dummy verbs
 +    _TRACE                 (30,  P1, rpcTimeout,      TRACING,           () 
-> NoPayload.serializer,                 () -> null                             
                        ),
 +    _SAMPLE                (42,  P1, rpcTimeout,      INTERNAL_RESPONSE, () 
-> NoPayload.serializer,                 () -> null                             
                        ),
 +    _TEST_1                (10,  P0, writeTimeout,    IMMEDIATE,         () 
-> NoPayload.serializer,                 () -> null                             
                        ),
 +    _TEST_2                (11,  P1, rpcTimeout,      IMMEDIATE,         () 
-> NoPayload.serializer,                 () -> null                             
                        ),
 +
 +    @Deprecated
 +    REQUEST_RSP            (4,   P1, rpcTimeout,      REQUEST_RESPONSE,  () 
-> null,                                 () -> ResponseVerbHandler.instance     
                        ),
 +    @Deprecated
 +    INTERNAL_RSP           (23,  P1, rpcTimeout,      INTERNAL_RESPONSE, () 
-> null,                                 () -> ResponseVerbHandler.instance     
                        ),
 +
 +    // largest used ID: 116
 +
 +    // CUSTOM VERBS
 +    UNUSED_CUSTOM_VERB     (CUSTOM,
 +                            0,   P1, rpcTimeout,      INTERNAL_RESPONSE, () 
-> null,                                 () -> null                             
                        ),
 +
 +    ;
 +
 +    public static final List<Verb> VERBS = 
ImmutableList.copyOf(Verb.values());
 +
 +    public enum Priority
 +    {
 +        P0,  // sends on the urgent connection (i.e. for Gossip, Echo)
 +        P1,  // small or empty responses
 +        P2,  // larger messages that can be dropped but who have a larger 
impact on system stability (e.g. READ_REPAIR, READ_RSP)
 +        P3,
 +        P4
 +    }
 +
 +    public enum Kind
 +    {
 +        NORMAL,
 +        CUSTOM
 +    }
 +
 +    public final int id;
 +    public final Priority priority;
 +    public final Stage stage;
 +    public final Kind kind;
 +
 +    /**
 +     * Messages we receive from peers have a Verb that tells us what kind of 
message it is.
 +     * Most of the time, this is enough to determine how to deserialize the 
message payload.
 +     * The exception is the REQUEST_RSP verb, which just means "a response to 
something you told me to do."
 +     * Traditionally, this was fine since each VerbHandler knew what type of 
payload it expected, and
 +     * handled the deserialization itself.  Now that we do that in ITC, to 
avoid the extra copy to an
 +     * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to 
the CallbackInfo object
 +     * (see below).
 +     *
 +     * NOTE: we use a Supplier to avoid loading the dependent classes until 
necessary.
 +     */
 +    private final Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer;
 +    private final Supplier<? extends IVerbHandler<?>> handler;
 +
 +    final Verb responseVerb;
 +
 +    private final ToLongFunction<TimeUnit> expiration;
 +
 +
 +    /**
 +     * Verbs it's okay to drop if the request has been queued longer than the 
request timeout.  These
 +     * all correspond to client requests or something triggered by them; we 
don't want to
 +     * drop internal messages like bootstrap or repair notifications.
 +     */
 +    Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, 
Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer, Supplier<? extends IVerbHandler<?>> handler)
 +    {
 +        this(id, priority, expiration, stage, serializer, handler, null);
 +    }
 +
 +    Verb(int id, Priority priority, ToLongFunction<TimeUnit> expiration, 
Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
 +    {
 +        this(NORMAL, id, priority, expiration, stage, serializer, handler, 
responseVerb);
 +    }
 +
 +    Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> 
expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, 
?>> serializer, Supplier<? extends IVerbHandler<?>> handler)
 +    {
 +        this(kind, id, priority, expiration, stage, serializer, handler, 
null);
 +    }
 +
 +    Verb(Kind kind, int id, Priority priority, ToLongFunction<TimeUnit> 
expiration, Stage stage, Supplier<? extends IVersionedAsymmetricSerializer<?, 
?>> serializer, Supplier<? extends IVerbHandler<?>> handler, Verb responseVerb)
 +    {
 +        this.stage = stage;
 +        if (id < 0)
 +            throw new IllegalArgumentException("Verb id must be non-negative, 
got " + id + " for verb " + name());
 +
 +        if (kind == CUSTOM)
 +        {
 +            if (id > MAX_CUSTOM_VERB_ID)
 +                throw new AssertionError("Invalid custom verb id " + id + " - 
we only allow custom ids between 0 and " + MAX_CUSTOM_VERB_ID);
 +            this.id = idForCustomVerb(id);
 +        }
 +        else
 +        {
 +            if (id > CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID)
 +                throw new AssertionError("Invalid verb id " + id + " - we 
only allow ids between 0 and " + (CUSTOM_VERB_START - MAX_CUSTOM_VERB_ID));
 +            this.id = id;
 +        }
 +        this.priority = priority;
 +        this.serializer = serializer;
 +        this.handler = handler;
 +        this.responseVerb = responseVerb;
 +        this.expiration = expiration;
 +        this.kind = kind;
 +    }
 +
 +    public <In, Out> IVersionedAsymmetricSerializer<In, Out> serializer()
 +    {
 +        return (IVersionedAsymmetricSerializer<In, Out>) serializer.get();
 +    }
 +
 +    public <T> IVerbHandler<T> handler()
 +    {
 +        return (IVerbHandler<T>) handler.get();
 +    }
 +
 +    public long expiresAtNanos(long nowNanos)
 +    {
 +        return nowNanos + expiresAfterNanos();
 +    }
 +
 +    public long expiresAfterNanos()
 +    {
 +        return expiration.applyAsLong(NANOSECONDS);
 +    }
 +
 +    // this is a little hacky, but reduces the number of parameters up top
 +    public boolean isResponse()
 +    {
 +        return handler.get() == ResponseVerbHandler.instance;
 +    }
 +
 +    Verb toPre40Verb()
 +    {
 +        if (!isResponse())
 +            return this;
 +        if (priority == P0)
 +            return INTERNAL_RSP;
 +        return REQUEST_RSP;
 +    }
 +
 +    @VisibleForTesting
 +    Supplier<? extends IVerbHandler<?>> unsafeSetHandler(Supplier<? extends 
IVerbHandler<?>> handler) throws NoSuchFieldException, IllegalAccessException
 +    {
 +        Supplier<? extends IVerbHandler<?>> original = this.handler;
 +        Field field = Verb.class.getDeclaredField("handler");
 +        field.setAccessible(true);
 +        Field modifiers = Field.class.getDeclaredField("modifiers");
 +        modifiers.setAccessible(true);
 +        modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
 +        field.set(this, handler);
 +        return original;
 +    }
 +
 +    @VisibleForTesting
-     Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer) throws NoSuchFieldException, IllegalAccessException
++    public Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
unsafeSetSerializer(Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> 
serializer) throws NoSuchFieldException, IllegalAccessException
 +    {
 +        Supplier<? extends IVersionedAsymmetricSerializer<?, ?>> original = 
this.serializer;
 +        Field field = Verb.class.getDeclaredField("serializer");
 +        field.setAccessible(true);
 +        Field modifiers = Field.class.getDeclaredField("modifiers");
 +        modifiers.setAccessible(true);
 +        modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
 +        field.set(this, serializer);
 +        return original;
 +    }
 +
 +    @VisibleForTesting
 +    ToLongFunction<TimeUnit> unsafeSetExpiration(ToLongFunction<TimeUnit> 
expiration) throws NoSuchFieldException, IllegalAccessException
 +    {
 +        ToLongFunction<TimeUnit> original = this.expiration;
 +        Field field = Verb.class.getDeclaredField("expiration");
 +        field.setAccessible(true);
 +        Field modifiers = Field.class.getDeclaredField("modifiers");
 +        modifiers.setAccessible(true);
 +        modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
 +        field.set(this, expiration);
 +        return original;
 +    }
 +
 +    // This is the largest number we can store in 2 bytes using VIntCoding (1 
bit per byte is used to indicate if there is more data coming).
 +    // When generating ids we count *down* from this number
 +    private static final int CUSTOM_VERB_START = (1 << (7 * 2)) - 1;
 +
 +    // Sanity check for the custom verb ids - avoids someone mistakenly 
adding a custom verb id close to the normal verbs which
 +    // could cause a conflict later when new normal verbs are added.
 +    private static final int MAX_CUSTOM_VERB_ID = 1000;
 +
 +    private static final Verb[] idToVerbMap;
 +    private static final Verb[] idToCustomVerbMap;
 +    private static final int minCustomId;
 +
 +    static
 +    {
 +        Verb[] verbs = values();
 +        int max = -1;
 +        int minCustom = Integer.MAX_VALUE;
 +        for (Verb v : verbs)
 +        {
 +            switch (v.kind)
 +            {
 +                case NORMAL:
 +                    max = Math.max(v.id, max);
 +                    break;
 +                case CUSTOM:
 +                    minCustom = Math.min(v.id, minCustom);
 +                    break;
 +                default:
 +                    throw new AssertionError("Unsupported Verb Kind: " + 
v.kind + " for verb " + v);
 +            }
 +        }
 +        minCustomId = minCustom;
 +
 +        if (minCustom <= max)
 +            throw new IllegalStateException("Overlapping verb ids are not 
allowed");
 +
 +        Verb[] idMap = new Verb[max + 1];
 +        int customCount = minCustom < Integer.MAX_VALUE ? CUSTOM_VERB_START - 
minCustom : 0;
 +        Verb[] customIdMap = new Verb[customCount + 1];
 +        for (Verb v : verbs)
 +        {
 +            switch (v.kind)
 +            {
 +                case NORMAL:
 +                    if (idMap[v.id] != null)
 +                        throw new IllegalArgumentException("cannot have two 
verbs that map to the same id: " + v + " and " + idMap[v.id]);
 +                    idMap[v.id] = v;
 +                    break;
 +                case CUSTOM:
 +                    int relativeId = idForCustomVerb(v.id);
 +                    if (customIdMap[relativeId] != null)
 +                        throw new IllegalArgumentException("cannot have two 
custom verbs that map to the same id: " + v + " and " + 
customIdMap[relativeId]);
 +                    customIdMap[relativeId] = v;
 +                    break;
 +                default:
 +                    throw new AssertionError("Unsupported Verb Kind: " + 
v.kind + " for verb " + v);
 +            }
 +        }
 +
 +        idToVerbMap = idMap;
 +        idToCustomVerbMap = customIdMap;
 +    }
 +
 +    public static Verb fromId(int id)
 +    {
 +        Verb[] verbs = idToVerbMap;
 +        if (id >= minCustomId)
 +        {
 +            id = idForCustomVerb(id);
 +            verbs = idToCustomVerbMap;
 +        }
 +        Verb verb = id >= 0 && id < verbs.length ? verbs[id] : null;
 +        if (verb == null)
 +            throw new IllegalArgumentException("Unknown verb id " + id);
 +        return verb;
 +    }
 +
 +    /**
 +     * calculate an id for a custom verb
 +     */
 +    private static int idForCustomVerb(int id)
 +    {
 +        return CUSTOM_VERB_START - id;
 +    }
 +}
 +
 +@SuppressWarnings("unused")
 +class VerbTimeouts
 +{
 +    static final ToLongFunction<TimeUnit> rpcTimeout      = 
DatabaseDescriptor::getRpcTimeout;
 +    static final ToLongFunction<TimeUnit> writeTimeout    = 
DatabaseDescriptor::getWriteRpcTimeout;
 +    static final ToLongFunction<TimeUnit> readTimeout     = 
DatabaseDescriptor::getReadRpcTimeout;
 +    static final ToLongFunction<TimeUnit> rangeTimeout    = 
DatabaseDescriptor::getRangeRpcTimeout;
 +    static final ToLongFunction<TimeUnit> counterTimeout  = 
DatabaseDescriptor::getCounterWriteRpcTimeout;
 +    static final ToLongFunction<TimeUnit> truncateTimeout = 
DatabaseDescriptor::getTruncateRpcTimeout;
 +    static final ToLongFunction<TimeUnit> pingTimeout     = 
DatabaseDescriptor::getPingTimeout;
 +    static final ToLongFunction<TimeUnit> longTimeout     = units -> 
Math.max(DatabaseDescriptor.getRpcTimeout(units), units.convert(5L, 
TimeUnit.MINUTES));
 +    static final ToLongFunction<TimeUnit> noTimeout       = units -> { throw 
new IllegalStateException(); };
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 0e18aa3,7784214..b79ccbc
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -84,23 -84,20 +84,25 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.VersionedValue;
+ import org.apache.cassandra.hints.DTestSerializer;
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
  import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 -import org.apache.cassandra.net.IMessageSink;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.schema.LegacySchemaMigrator;
 +import org.apache.cassandra.net.NoPayload;
++import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.service.CassandraDaemon;
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.DefaultFSErrorHandler;
@@@ -463,11 -539,9 +465,13 @@@ public class Instance extends IsolatedE
                      throw new RuntimeException(e);
                  }
  
 +                // Re-populate token metadata after commit log recover (new 
peers might be loaded onto system keyspace #10293)
 +                StorageService.instance.populateTokenMetadata();
 +
++                Verb.HINT_REQ.unsafeSetSerializer(DTestSerializer::new);
++
                  if (config.has(NETWORK))
                  {
 -                    registerFilters(cluster);
                      MessagingService.instance().listen();
                  }
                  else
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index bd09891,be3622d..44f70e4
--- 
a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -39,12 -36,13 +39,16 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.IMessageFilters;
  import org.apache.cassandra.distributed.impl.Instance;
  import org.apache.cassandra.distributed.shared.MessageFilters;
+ import org.apache.cassandra.hints.HintMessage;
 -import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.NoPayload;
 +import org.apache.cassandra.net.Verb;
  
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ 
  public class MessageFiltersTest extends TestBaseImpl
  {
      @Test
@@@ -264,6 -216,59 +268,59 @@@
          }
      }
  
+     @Test
+     public void hintSerializationTest() throws Exception
+     {
+         try (Cluster cluster = init(builder().withNodes(3)
+                                              .withConfig(config -> 
config.with(GOSSIP)
+                                                                          
.with(NETWORK)
+                                                                          
.set("hinted_handoff_enabled", true))
+                                              .start()))
+         {
+             cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int 
PRIMARY KEY, v int)"));
+             executeWithWriteFailure(cluster,
+                                     withKeyspace("INSERT INTO %s.tbl (k, v) 
VALUES (1,1)"),
+                                     ConsistencyLevel.QUORUM,
+                                     1);
+             CountDownLatch latch = new CountDownLatch(1);
 -            
cluster.filters().verbs(MessagingService.Verb.HINT.ordinal()).messagesMatching((a,b,msg)
 -> {
++            
cluster.filters().verbs(Verb.HINT_REQ.id).messagesMatching((a,b,msg) -> {
+                 
cluster.get(1).acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<IMessage>)
 (m) -> {
+                     HintMessage hintMessage = (HintMessage) 
Instance.deserializeMessage(m).payload;
+                     assert hintMessage != null;
+                 }).accept(msg);
+ 
+                 latch.countDown();
+                 return false;
+             }).drop().on();
+             cluster.schemaChange(withKeyspace("DROP TABLE %s.tbl"));
+             latch.await();
+         }
+     }
+ 
+     public Object[][] executeWithWriteFailure(Cluster cluster, String 
statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+     {
+         IMessageFilters filters = cluster.filters();
+ 
+         // Drop exactly one coordinated message
 -        
filters.verbs(MessagingService.Verb.MUTATION.ordinal()).from(coordinator).messagesMatching(new
 IMessageFilters.Matcher()
++        
filters.verbs(Verb.MUTATION_REQ.id).from(coordinator).messagesMatching(new 
IMessageFilters.Matcher()
+         {
+             private final AtomicBoolean issued = new AtomicBoolean();
+ 
+             public boolean matches(int from, int to, IMessage message)
+             {
 -                if (from != coordinator || message.verb() != 
MessagingService.Verb.MUTATION.ordinal())
++                if (from != coordinator || message.verb() != 
Verb.MUTATION_REQ.id)
+                     return false;
+ 
+                 return !issued.getAndSet(true);
+             }
+         }).drop().on();
+         Object[][] res = cluster
+                          .coordinator(coordinator)
+                          .execute(statement, cl, bindings);
+         filters.reset();
+         return res;
+     }
+ 
      private static void assertTimeOut(Runnable r)
      {
          try
diff --cc test/unit/org/apache/cassandra/hints/DTestSerializer.java
index 0000000,09b87d3..1e308dc
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/hints/DTestSerializer.java
+++ b/test/unit/org/apache/cassandra/hints/DTestSerializer.java
@@@ -1,0 -1,87 +1,84 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.hints;
+ 
+ import java.io.IOException;
 -import java.util.Objects;
+ import java.util.UUID;
+ 
++import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.primitives.Ints;
+ 
+ import org.apache.cassandra.db.TypeSizes;
 -import org.apache.cassandra.db.UnknownColumnFamilyException;
 -import org.apache.cassandra.io.IVersionedSerializer;
++import org.apache.cassandra.exceptions.UnknownTableException;
++import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
+ import org.apache.cassandra.io.util.DataInputPlus;
+ import org.apache.cassandra.io.util.DataOutputPlus;
+ import org.apache.cassandra.io.util.TrackedDataInputPlus;
++import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.utils.UUIDSerializer;
+ 
+ // Fake serializer for dtests. Located in hints package to avoid publishing 
package-private fields.
 -public class DTestSerializer implements IVersionedSerializer<HintMessage>
++public class DTestSerializer implements 
IVersionedAsymmetricSerializer<SerializableHintMessage, HintMessage>
+ {
 -    public long serializedSize(HintMessage message, int version)
++    public void serialize(SerializableHintMessage obj, DataOutputPlus out, 
int version) throws IOException
+     {
 -        if (message.hint != null)
 -            return HintMessage.serializer.serializedSize(message, version);
 -
 -        long size = UUIDSerializer.serializer.serializedSize(message.hostId, 
version);
 -        size += TypeSizes.sizeofUnsignedVInt(0);
 -        size += 
UUIDSerializer.serializer.serializedSize(message.unknownTableID, version);
 -        return size;
 -    }
 -
 -    public void serialize(HintMessage message, DataOutputPlus out, int 
version) throws IOException
 -    {
 -        if (message.hint != null)
++        HintMessage message;
++        if (!(obj instanceof HintMessage) || (message = (HintMessage) 
obj).hint != null)
+         {
 -            HintMessage.serializer.serialize(message, out, version);
++            HintMessage.serializer.serialize(obj, out, version);
+             return;
+         }
+ 
+         UUIDSerializer.serializer.serialize(message.hostId, out, version);
+         out.writeUnsignedVInt(0);
 -        UUIDSerializer.serializer.serialize(message.unknownTableID, out, 
version);
++        message.unknownTableID.serialize(out);
+     }
+ 
 -    /*
 -     * It's not an exceptional scenario to have a hints file streamed that 
have partition updates for tables
 -     * that don't exist anymore. We want to handle that case gracefully 
instead of dropping the connection for every
 -     * one of them.
 -     */
+     public HintMessage deserialize(DataInputPlus in, int version) throws 
IOException
+     {
+         UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
+ 
+         long hintSize = in.readUnsignedVInt();
+         TrackedDataInputPlus countingIn = new TrackedDataInputPlus(in);
 -
+         if (hintSize == 0)
 -            return new HintMessage(hostId, 
UUIDSerializer.serializer.deserialize(in, version));
++            return new HintMessage(hostId, TableId.deserialize(countingIn));
+ 
+         try
+         {
+             return new HintMessage(hostId, 
Hint.serializer.deserialize(countingIn, version));
+         }
 -        catch (UnknownColumnFamilyException e)
++        catch (UnknownTableException e)
+         {
+             in.skipBytes(Ints.checkedCast(hintSize - 
countingIn.getBytesRead()));
 -            return new HintMessage(hostId, e.cfId);
++            return new HintMessage(hostId, e.id);
+         }
+     }
 -}
++
++    public long serializedSize(SerializableHintMessage obj, int version)
++    {
++        HintMessage message;
++        if (!(obj instanceof HintMessage) || (message = (HintMessage) 
obj).hint != null)
++            return HintMessage.serializer.serializedSize(obj, version);
++
++        long size = UUIDSerializer.serializer.serializedSize(message.hostId, 
version);
++        size += TypeSizes.sizeofUnsignedVInt(0);
++        size += 
UUIDSerializer.serializer.serializedSize(message.unknownTableID.asUUID(), 
version);
++        return size;
++    }
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to