http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ForwardToSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToSerializer.java 
b/src/java/org/apache/cassandra/net/ForwardToSerializer.java
new file mode 100644
index 0000000..c4e8843
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ForwardToSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class ForwardToSerializer implements 
IVersionedSerializer<ForwardToContainer>
+{
+    public static ForwardToSerializer instance = new ForwardToSerializer();
+
+    private ForwardToSerializer() {}
+
+    public void serialize(ForwardToContainer forwardToContainer, 
DataOutputPlus out, int version) throws IOException
+    {
+        out.writeInt(forwardToContainer.targets.size());
+        Iterator<InetAddressAndPort> iter = 
forwardToContainer.targets.iterator();
+        for (int ii = 0; ii < forwardToContainer.messageIds.length; ii++)
+        {
+            CompactEndpointSerializationHelper.instance.serialize(iter.next(), 
out, version);
+            out.writeInt(forwardToContainer.messageIds[ii]);
+        }
+    }
+
+    public ForwardToContainer deserialize(DataInputPlus in, int version) 
throws IOException
+    {
+        int[] ids = new int[in.readInt()];
+        List<InetAddressAndPort> hosts = new ArrayList<>(ids.length);
+        for (int ii = 0; ii < ids.length; ii++)
+        {
+           
hosts.add(CompactEndpointSerializationHelper.instance.deserialize(in, version));
+           ids[ii] = in.readInt();
+        }
+        return new ForwardToContainer(hosts, ids);
+    }
+
+    public long serializedSize(ForwardToContainer forwardToContainer, int 
version)
+    {
+        //Number of forward addresses, 4 bytes per for each id
+        long size = 4 +
+                    (4 * forwardToContainer.targets.size());
+        //Depending on ipv6 or ipv4 the address size is different.
+        for (InetAddressAndPort forwardTo : forwardToContainer.targets)
+        {
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(forwardTo, version);
+        }
+
+        return size;
+    }
+
+    public static ForwardToContainer fromBytes(byte[] bytes, int version)
+    {
+        try (DataInputBuffer input = new DataInputBuffer(bytes))
+        {
+            return instance.deserialize(input, version);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java 
b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 7835079..251d263 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,11 +17,10 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
-
 import com.google.common.base.Predicate;
 
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * implementors of IAsyncCallback need to make sure that any public methods
@@ -31,9 +30,9 @@ import org.apache.cassandra.gms.FailureDetector;
  */
 public interface IAsyncCallback<T>
 {
-    Predicate<InetAddress> isAlive = new Predicate<InetAddress>()
+    Predicate<InetAddressAndPort> isAlive = new Predicate<InetAddressAndPort>()
     {
-        public boolean apply(InetAddress endpoint)
+        public boolean apply(InetAddressAndPort endpoint)
         {
             return FailureDetector.instance.isAlive(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java 
b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
index 1cd27b6..2b91f20 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -17,9 +17,8 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
-
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
 {
@@ -27,5 +26,5 @@ public interface IAsyncCallbackWithFailure<T> extends 
IAsyncCallback<T>
     /**
      * Called when there is an exception on the remote node or timeout happens
      */
-    void onFailure(InetAddress from, RequestFailureReason failureReason);
+    void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IMessageSink.java 
b/src/java/org/apache/cassandra/net/IMessageSink.java
index 5150901..090d2c2 100644
--- a/src/java/org/apache/cassandra/net/IMessageSink.java
+++ b/src/java/org/apache/cassandra/net/IMessageSink.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public interface IMessageSink
 {
@@ -26,7 +26,7 @@ public interface IMessageSink
      *
      * @return true if the message is allowed, false if it should be dropped
      */
-    boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to);
+    boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddressAndPort to);
 
     /**
      * Allow or drop an incoming message

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index c7fc991..6e132a8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 import java.io.IOException;
 import java.util.EnumSet;
 
+import com.google.common.primitives.Shorts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,7 @@ import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.io.DummyByteVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 
 public class MessageDeliveryTask implements Runnable
@@ -96,19 +98,11 @@ public class MessageDeliveryTask implements Runnable
         if (message.doCallbackOnFailure())
         {
             MessageOut response = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                                                
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM, 
MessagingService.ONE_BYTE);
+                                                
.withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE);
 
             if (t instanceof TombstoneOverwhelmingException)
             {
-                try (DataOutputBuffer out = new DataOutputBuffer())
-                {
-                    
out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code);
-                    response = 
response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData());
-                }
-                catch (IOException ex)
-                {
-                    throw new RuntimeException(ex);
-                }
+                response = 
response.withParameter(ParameterType.FAILURE_REASON, 
Shorts.checkedCast(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
             }
 
             MessagingService.instance().sendReply(response, id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java 
b/src/java/org/apache/cassandra/net/MessageIn.java
index d520fa9..a426ef0 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.Map;
 
@@ -31,8 +30,8 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService.Verb;
-
 /**
  * The receiving node's view of a {@link MessageOut}. See documentation on 
{@link MessageOut} for details on the
  * serialization format.
@@ -41,16 +40,16 @@ import org.apache.cassandra.net.MessagingService.Verb;
  */
 public class MessageIn<T>
 {
-    public final InetAddress from;
+    public final InetAddressAndPort from;
     public final T payload;
-    public final Map<String, byte[]> parameters;
-    public final Verb verb;
+    public final Map<ParameterType, Object> parameters;
+    public final MessagingService.Verb verb;
     public final int version;
     public final long constructionTime;
 
-    private MessageIn(InetAddress from,
+    private MessageIn(InetAddressAndPort from,
                       T payload,
-                      Map<String, byte[]> parameters,
+                      Map<ParameterType, Object> parameters,
                       Verb verb,
                       int version,
                       long constructionTime)
@@ -63,9 +62,9 @@ public class MessageIn<T>
         this.constructionTime = constructionTime;
     }
 
-    public static <T> MessageIn<T> create(InetAddress from,
+    public static <T> MessageIn<T> create(InetAddressAndPort from,
                                           T payload,
-                                          Map<String, byte[]> parameters,
+                                          Map<ParameterType, Object> 
parameters,
                                           Verb verb,
                                           int version,
                                           long constructionTime)
@@ -73,9 +72,9 @@ public class MessageIn<T>
         return new MessageIn<>(from, payload, parameters, verb, version, 
constructionTime);
     }
 
-    public static <T> MessageIn<T> create(InetAddress from,
+    public static <T> MessageIn<T> create(InetAddressAndPort from,
                                           T payload,
-                                          Map<String, byte[]> parameters,
+                                          Map<ParameterType, Object> 
parameters,
                                           MessagingService.Verb verb,
                                           int version)
     {
@@ -89,37 +88,46 @@ public class MessageIn<T>
 
     public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int 
id, long constructionTime) throws IOException
     {
-        InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+        InetAddressAndPort from = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
 
         MessagingService.Verb verb = 
MessagingService.Verb.fromId(in.readInt());
-        Map<String, byte[]> parameters = readParameters(in);
+        Map<ParameterType, Object> parameters = readParameters(in, version);
         int payloadSize = in.readInt();
         return read(in, version, id, constructionTime, from, payloadSize, 
verb, parameters);
     }
 
-    public static Map<String, byte[]> readParameters(DataInputPlus in) throws 
IOException
+    public static Map<ParameterType, Object> readParameters(DataInputPlus in, 
int version) throws IOException
     {
         int parameterCount = in.readInt();
+        Map<ParameterType, Object> parameters;
         if (parameterCount == 0)
         {
             return Collections.emptyMap();
         }
         else
         {
-            ImmutableMap.Builder<String, byte[]> builder = 
ImmutableMap.builder();
+            ImmutableMap.Builder<ParameterType, Object> builder = 
ImmutableMap.builder();
             for (int i = 0; i < parameterCount; i++)
             {
                 String key = in.readUTF();
-                byte[] value = new byte[in.readInt()];
-                in.readFully(value);
-                builder.put(key, value);
+                ParameterType type = ParameterType.byName.get(key);
+                if (type != null)
+                {
+                    byte[] value = new byte[in.readInt()];
+                    in.readFully(value);
+                    builder.put(type, type.serializer.deserialize(new 
DataInputBuffer(value), version));
+                }
+                else
+                {
+                    in.skipBytes(in.readInt());
+                }
             }
             return builder.build();
         }
     }
 
     public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int 
id, long constructionTime,
-                                          InetAddress from, int payloadSize, 
Verb verb, Map<String, byte[]> parameters) throws IOException
+                                          InetAddressAndPort from, int 
payloadSize, Verb verb, Map<ParameterType, Object> parameters) throws 
IOException
     {
         IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) 
MessagingService.verbSerializers.get(verb);
         if (serializer instanceof 
MessagingService.CallbackDeterminedSerializer)
@@ -140,7 +148,7 @@ public class MessageIn<T>
         return MessageIn.create(from, payload, parameters, verb, version, 
constructionTime);
     }
 
-    public static long deriveConstructionTime(InetAddress from, int 
messageTimestamp, long currentTime)
+    public static long deriveConstructionTime(InetAddressAndPort from, int 
messageTimestamp, long currentTime)
     {
         // Reconstruct the message construction time sent by the remote host 
(we sent only the lower 4 bytes, assuming the
         // higher 4 bytes wouldn't change between the sender and receiver)
@@ -182,36 +190,18 @@ public class MessageIn<T>
 
     public boolean doCallbackOnFailure()
     {
-        return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+        return parameters.containsKey(ParameterType.FAILURE_CALLBACK);
     }
 
     public boolean isFailureResponse()
     {
-        return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
-    }
-
-    public boolean containsFailureReason()
-    {
-        return parameters.containsKey(MessagingService.FAILURE_REASON_PARAM);
+        return parameters.containsKey(ParameterType.FAILURE_RESPONSE);
     }
 
     public RequestFailureReason getFailureReason()
     {
-        if (containsFailureReason())
-        {
-            try (DataInputBuffer in = new 
DataInputBuffer(parameters.get(MessagingService.FAILURE_REASON_PARAM)))
-            {
-                return RequestFailureReason.fromCode(in.readUnsignedShort());
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
-        }
-        else
-        {
-            return RequestFailureReason.UNKNOWN;
-        }
+        Short code = (Short)parameters.get(ParameterType.FAILURE_REASON);
+        return code != null ? RequestFailureReason.fromCode(code) : 
RequestFailureReason.UNKNOWN;
     }
 
     public long getTimeout()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index 379aff5..7d3c0af 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -19,17 +19,18 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -80,12 +81,20 @@ import static 
org.apache.cassandra.tracing.Tracing.isTracing;
 public class MessageOut<T>
 {
     private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
+    //Parameters are stored in an object array as tuples of size two
+    public static final int PARAMETER_TUPLE_SIZE = 2;
+    //Offset in a parameter tuple containing the type of the parameter
+    public static final int PARAMETER_TUPLE_TYPE_OFFSET = 0;
+    //Offset in a parameter tuple containing the actual parameter represented 
as a POJO
+    public static final int PARAMETER_TUPLE_PARAMETER_OFFSET = 1;
 
-    public final InetAddress from;
+    public final InetAddressAndPort from;
     public final MessagingService.Verb verb;
     public final T payload;
     public final IVersionedSerializer<T> serializer;
-    public final Map<String, byte[]> parameters;
+    //A list of tuples, first object is the ParameterType enum value,
+    //the second object is the POJO to serialize
+    public final List<Object> parameters;
 
     /**
      * Memoization of the serialized size of the just the payload.
@@ -115,16 +124,16 @@ public class MessageOut<T>
              serializer,
              isTracing()
                  ? Tracing.instance.getTraceHeaders()
-                 : Collections.<String, byte[]>emptyMap());
+                 : ImmutableList.of());
     }
 
-    private MessageOut(MessagingService.Verb verb, T payload, 
IVersionedSerializer<T> serializer, Map<String, byte[]> parameters)
+    private MessageOut(MessagingService.Verb verb, T payload, 
IVersionedSerializer<T> serializer, List<Object> parameters)
     {
-        this(FBUtilities.getBroadcastAddress(), verb, payload, serializer, 
parameters);
+        this(FBUtilities.getBroadcastAddressAndPort(), verb, payload, 
serializer, parameters);
     }
 
     @VisibleForTesting
-    public MessageOut(InetAddress from, MessagingService.Verb verb, T payload, 
IVersionedSerializer<T> serializer, Map<String, byte[]> parameters)
+    public MessageOut(InetAddressAndPort from, MessagingService.Verb verb, T 
payload, IVersionedSerializer<T> serializer, List<Object> parameters)
     {
         this.from = from;
         this.verb = verb;
@@ -133,11 +142,13 @@ public class MessageOut<T>
         this.parameters = parameters;
     }
 
-    public MessageOut<T> withParameter(String key, byte[] value)
+    public <VT> MessageOut<T> withParameter(ParameterType type, VT value)
     {
-        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
-        builder.putAll(parameters).put(key, value);
-        return new MessageOut<T>(verb, payload, serializer, builder.build());
+        List<Object> newParameters = new ArrayList<>(parameters.size() + 2);
+        newParameters.addAll(parameters);
+        newParameters.add(type);
+        newParameters.add(value);
+        return new MessageOut<T>(verb, payload, serializer, newParameters);
     }
 
     public Stage getStage()
@@ -159,15 +170,19 @@ public class MessageOut<T>
 
     public void serialize(DataOutputPlus out, int version) throws IOException
     {
-        CompactEndpointSerializationHelper.serialize(from, out);
+        CompactEndpointSerializationHelper.instance.serialize(from, out, 
version);
 
         out.writeInt(verb.getId());
-        out.writeInt(parameters.size());
-        for (Map.Entry<String, byte[]> entry : parameters.entrySet())
+        assert parameters.size() % PARAMETER_TUPLE_SIZE == 0;
+        out.writeInt(parameters.size() / PARAMETER_TUPLE_SIZE);
+        for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
         {
-            out.writeUTF(entry.getKey());
-            out.writeInt(entry.getValue().length);
-            out.write(entry.getValue());
+            ParameterType type = (ParameterType)parameters.get(ii + 
PARAMETER_TUPLE_TYPE_OFFSET);
+            out.writeUTF(type.key);
+            IVersionedSerializer serializer = type.serializer;
+            Object parameter = parameters.get(ii + 
PARAMETER_TUPLE_PARAMETER_OFFSET);
+            out.writeInt(Ints.checkedCast(serializer.serializedSize(parameter, 
version)));
+            serializer.serialize(parameter, out, version);
         }
 
         if (payload != null)
@@ -187,15 +202,19 @@ public class MessageOut<T>
 
     private Pair<Long, Long> calculateSerializedSize(int version)
     {
-        long size = CompactEndpointSerializationHelper.serializedSize(from);
+        long size = 0;
+        size += 
CompactEndpointSerializationHelper.instance.serializedSize(from, version);
 
         size += TypeSizes.sizeof(verb.getId());
         size += TypeSizes.sizeof(parameters.size());
-        for (Map.Entry<String, byte[]> entry : parameters.entrySet())
+        for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
         {
-            size += TypeSizes.sizeof(entry.getKey());
-            size += TypeSizes.sizeof(entry.getValue().length);
-            size += entry.getValue().length;
+            ParameterType type = (ParameterType)parameters.get(ii + 
PARAMETER_TUPLE_TYPE_OFFSET);
+            size += TypeSizes.sizeof(type.key());
+            size += 4;//length prefix
+            IVersionedSerializer serializer = type.serializer;
+            Object parameter = parameters.get(ii + 
PARAMETER_TUPLE_PARAMETER_OFFSET);
+            size += serializer.serializedSize(parameter, version);
         }
 
         long payloadSize = payload == null ? 0 : 
serializer.serializedSize(payload, version);
@@ -237,4 +256,16 @@ public class MessageOut<T>
 
         return sizes.left.intValue();
     }
+
+    public Object getParameter(ParameterType type)
+    {
+        for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
+        {
+            if (((ParameterType)parameters.get(ii + 
PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
+            {
+                return parameters.get(ii + PARAMETER_TUPLE_PARAMETER_OFFSET);
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 4e6fe1c..9f00d27 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.net;
 import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -91,6 +90,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
@@ -127,10 +127,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final int VERSION_40 = 12;
     public static final int current_version = VERSION_40;
 
-    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];
-    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
-    public static final String FAILURE_REASON_PARAM = "FAIL_REASON";
 
     /**
      * we preface every message with this number so the recipient can validate 
the sender is sane
@@ -447,7 +444,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     private final Map<Verb, IVerbHandler> verbHandlers;
 
     @VisibleForTesting
-    public final ConcurrentMap<InetAddress, OutboundMessagingPool> 
channelManagers = new NonBlockingHashMap<>();
+    public final ConcurrentMap<InetAddressAndPort, OutboundMessagingPool> 
channelManagers = new NonBlockingHashMap<>();
     final List<ServerChannel> serverChannels = Lists.newArrayList();
 
     private static final Logger logger = 
LoggerFactory.getLogger(MessagingService.class);
@@ -506,7 +503,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     private final List<ILatencySubscriber> subscribers = new 
ArrayList<ILatencySubscriber>();
 
     // protocol versions of the other nodes in the cluster
-    private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
+    private final ConcurrentMap<InetAddressAndPort, Integer> versions = new 
NonBlockingHashMap<>();
 
     // message sinks are a testing hook
     private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
@@ -629,7 +626,7 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param callback The message callback.
      * @param message The actual message.
      */
-    public void updateBackPressureOnSend(InetAddress host, IAsyncCallback 
callback, MessageOut<?> message)
+    public void updateBackPressureOnSend(InetAddressAndPort host, 
IAsyncCallback callback, MessageOut<?> message)
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
@@ -646,7 +643,7 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param callback The message callback.
      * @param timeout True if updated following a timeout, false otherwise.
      */
-    public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback 
callback, boolean timeout)
+    public void updateBackPressureOnReceive(InetAddressAndPort host, 
IAsyncCallback callback, boolean timeout)
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
@@ -669,14 +666,14 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param hosts The hosts to apply back-pressure to.
      * @param timeoutInNanos The max back-pressure timeout.
      */
-    public void applyBackPressure(Iterable<InetAddress> hosts, long 
timeoutInNanos)
+    public void applyBackPressure(Iterable<InetAddressAndPort> hosts, long 
timeoutInNanos)
     {
         if (DatabaseDescriptor.backPressureEnabled())
         {
             Set<BackPressureState> states = new HashSet<BackPressureState>();
-            for (InetAddress host : hosts)
+            for (InetAddressAndPort host : hosts)
             {
-                if (host.equals(FBUtilities.getBroadcastAddress()))
+                if (host.equals(FBUtilities.getBroadcastAddressAndPort()))
                     continue;
                 OutboundMessagingPool pool = getMessagingConnection(host);
                 if (pool != null)
@@ -686,13 +683,13 @@ public final class MessagingService implements 
MessagingServiceMBean
         }
     }
 
-    BackPressureState getBackPressureState(InetAddress host)
+    BackPressureState getBackPressureState(InetAddressAndPort host)
     {
         OutboundMessagingPool messagingConnection = 
getMessagingConnection(host);
         return messagingConnection != null ? 
messagingConnection.getBackPressureState() : null;
     }
 
-    void markTimeout(InetAddress addr)
+    void markTimeout(InetAddressAndPort addr)
     {
         OutboundMessagingPool conn = channelManagers.get(addr);
         if (conn != null)
@@ -706,13 +703,13 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param address the host that replied to the message
      * @param latency
      */
-    public void maybeAddLatency(IAsyncCallback cb, InetAddress address, long 
latency)
+    public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, 
long latency)
     {
         if (cb.isLatencyForSnitch())
             addLatency(address, latency);
     }
 
-    public void addLatency(InetAddress address, long latency)
+    public void addLatency(InetAddressAndPort address, long latency)
     {
         for (ILatencySubscriber subscriber : subscribers)
             subscriber.receiveTiming(address, latency);
@@ -721,7 +718,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     /**
      * called from gossiper when it notices a node is not responding.
      */
-    public void convict(InetAddress ep)
+    public void convict(InetAddressAndPort ep)
     {
         logger.trace("Resetting pool for {}", ep);
         reset(ep);
@@ -735,24 +732,24 @@ public final class MessagingService implements 
MessagingServiceMBean
     public void listen(ServerEncryptionOptions serverEncryptionOptions)
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        listen(FBUtilities.getLocalAddress(), serverEncryptionOptions);
+        listen(FBUtilities.getLocalAddressAndPort(), serverEncryptionOptions);
         if (shouldListenOnBroadcastAddress())
-            listen(FBUtilities.getBroadcastAddress(), serverEncryptionOptions);
+            listen(FBUtilities.getBroadcastAddressAndPort(), 
serverEncryptionOptions);
         listenGate.signalAll();
     }
 
     public static boolean shouldListenOnBroadcastAddress()
     {
         return DatabaseDescriptor.shouldListenOnBroadcastAddress()
-               && 
!FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress());
+               && 
!FBUtilities.getLocalAddressAndPort().equals(FBUtilities.getBroadcastAddressAndPort());
     }
 
     /**
      * Listen on the specified port.
      *
-     * @param localEp InetAddress whose port to listen on.
+     * @param localEp InetAddressAndPort whose port to listen on.
      */
-    private void listen(InetAddress localEp, ServerEncryptionOptions 
serverEncryptionOptions) throws ConfigurationException
+    private void listen(InetAddressAndPort localEp, ServerEncryptionOptions 
serverEncryptionOptions) throws ConfigurationException
     {
         IInternodeAuthenticator authenticator = 
DatabaseDescriptor.getInternodeAuthenticator();
         int receiveBufferSize = 
DatabaseDescriptor.getInternodeRecvBufferSize();
@@ -766,7 +763,7 @@ public final class MessagingService implements 
MessagingServiceMBean
             ServerEncryptionOptions legacyEncOptions = new 
ServerEncryptionOptions(serverEncryptionOptions);
             legacyEncOptions.optional = false;
 
-            InetSocketAddress localAddr = new InetSocketAddress(localEp, 
DatabaseDescriptor.getSSLStoragePort());
+            InetAddressAndPort localAddr = 
InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, 
DatabaseDescriptor.getSSLStoragePort());
             ChannelGroup channelGroup = new 
DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", 
NettyFactory.executorForChannelGroups());
             InboundInitializer initializer = new 
InboundInitializer(authenticator, legacyEncOptions, channelGroup);
             Channel encryptedChannel = 
NettyFactory.instance.createInboundChannel(localAddr, initializer, 
receiveBufferSize);
@@ -774,7 +771,8 @@ public final class MessagingService implements 
MessagingServiceMBean
         }
 
         // this is for the socket that can be plain, only ssl, or optional 
plain/ssl
-        InetSocketAddress localAddr = new InetSocketAddress(localEp, 
DatabaseDescriptor.getStoragePort());
+        assert localEp.port == DatabaseDescriptor.getStoragePort() : 
String.format("Local endpoint port %d doesn't match YAML configured port %d%n", 
localEp.port, DatabaseDescriptor.getStoragePort());
+        InetAddressAndPort localAddr = 
InetAddressAndPort.getByAddressOverrideDefaults(localEp.address, 
DatabaseDescriptor.getStoragePort());
         ChannelGroup channelGroup = new 
DefaultChannelGroup("InternodeMessagingGroup", 
NettyFactory.executorForChannelGroups());
         InboundInitializer initializer = new InboundInitializer(authenticator, 
serverEncryptionOptions, channelGroup);
         Channel channel = 
NettyFactory.instance.createInboundChannel(localAddr, initializer, 
receiveBufferSize);
@@ -809,10 +807,10 @@ public final class MessagingService implements 
MessagingServiceMBean
          * the inbound connections/channels can be closed when the listening 
socket itself is being closed.
          */
         private final ChannelGroup connectedChannels;
-        private final InetSocketAddress address;
+        private final InetAddressAndPort address;
         private final SecurityLevel securityLevel;
 
-        private ServerChannel(Channel channel, ChannelGroup channelGroup, 
InetSocketAddress address, SecurityLevel securityLevel)
+        private ServerChannel(Channel channel, ChannelGroup channelGroup, 
InetAddressAndPort address, SecurityLevel securityLevel)
         {
             this.channel = channel;
             this.connectedChannels = channelGroup;
@@ -840,7 +838,7 @@ public final class MessagingService implements 
MessagingServiceMBean
             return channel;
         }
 
-        InetSocketAddress getAddress()
+        InetAddressAndPort getAddress()
         {
             return address;
         }
@@ -869,7 +867,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     }
 
 
-    public void destroyConnectionPool(InetAddress to)
+    public void destroyConnectionPool(InetAddressAndPort to)
     {
         OutboundMessagingPool pool = channelManagers.remove(to);
         if (pool != null)
@@ -884,26 +882,26 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param address IP Address to identify the peer
      * @param preferredAddress IP Address to use (and prefer) going forward 
for connecting to the peer
      */
-    public void reconnectWithNewIp(InetAddress address, InetAddress 
preferredAddress)
+    public void reconnectWithNewIp(InetAddressAndPort address, 
InetAddressAndPort preferredAddress)
     {
         SystemKeyspace.updatePreferredIP(address, preferredAddress);
 
         OutboundMessagingPool messagingPool = channelManagers.get(address);
         if (messagingPool != null)
-            messagingPool.reconnectWithNewIp(new 
InetSocketAddress(preferredAddress, portFor(address)));
+            
messagingPool.reconnectWithNewIp(InetAddressAndPort.getByAddressOverrideDefaults(preferredAddress.address,
 portFor(address)));
     }
 
-    private void reset(InetAddress address)
+    private void reset(InetAddressAndPort address)
     {
         OutboundMessagingPool messagingPool = channelManagers.remove(address);
         if (messagingPool != null)
             messagingPool.close(false);
     }
 
-    public InetAddress getCurrentEndpoint(InetAddress publicAddress)
+    public InetAddressAndPort getCurrentEndpoint(InetAddressAndPort 
publicAddress)
     {
         OutboundMessagingPool messagingPool = 
getMessagingConnection(publicAddress);
-        return messagingPool != null ? 
messagingPool.getPreferredRemoteAddr().getAddress() : null;
+        return messagingPool != null ? messagingPool.getPreferredRemoteAddr() 
: null;
     }
 
     /**
@@ -931,7 +929,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress 
to, long timeout, boolean failureCallback)
+    public int addCallback(IAsyncCallback cb, MessageOut message, 
InetAddressAndPort to, long timeout, boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the 
overload with a ConsistencyLevel
         int messageId = nextId();
@@ -942,7 +940,7 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public int addCallback(IAsyncCallback cb,
                            MessageOut<?> message,
-                           InetAddress to,
+                           InetAddressAndPort to,
                            long timeout,
                            ConsistencyLevel consistencyLevel,
                            boolean allowHints)
@@ -971,12 +969,12 @@ public final class MessagingService implements 
MessagingServiceMBean
         return idGen.incrementAndGet();
     }
 
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
+    public int sendRR(MessageOut message, InetAddressAndPort to, 
IAsyncCallback cb)
     {
         return sendRR(message, to, cb, message.getTimeout(), false);
     }
 
-    public int sendRRWithFailure(MessageOut message, InetAddress to, 
IAsyncCallbackWithFailure cb)
+    public int sendRRWithFailure(MessageOut message, InetAddressAndPort to, 
IAsyncCallbackWithFailure cb)
     {
         return sendRR(message, to, cb, message.getTimeout(), true);
     }
@@ -992,11 +990,11 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout, boolean failureCallback)
+    public int sendRR(MessageOut message, InetAddressAndPort to, 
IAsyncCallback cb, long timeout, boolean failureCallback)
     {
         int id = addCallback(cb, message, to, timeout, failureCallback);
         updateBackPressureOnSend(to, cb, message);
-        sendOneWay(failureCallback ? 
message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
+        sendOneWay(failureCallback ? 
message.withParameter(ParameterType.FAILURE_CALLBACK, ONE_BYTE) : message, id, 
to);
         return id;
     }
 
@@ -1013,22 +1011,22 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @return an reference to message id used to match with the result
      */
     public int sendRR(MessageOut<?> message,
-                      InetAddress to,
+                      InetAddressAndPort to,
                       AbstractWriteResponseHandler<?> handler,
                       boolean allowHints)
     {
         int id = addCallback(handler, message, to, message.getTimeout(), 
handler.consistencyLevel, allowHints);
         updateBackPressureOnSend(to, handler, message);
-        sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), 
id, to);
+        sendOneWay(message.withParameter(ParameterType.FAILURE_CALLBACK, 
ONE_BYTE), id, to);
         return id;
     }
 
-    public void sendOneWay(MessageOut message, InetAddress to)
+    public void sendOneWay(MessageOut message, InetAddressAndPort to)
     {
         sendOneWay(message, nextId(), to);
     }
 
-    public void sendReply(MessageOut message, int id, InetAddress to)
+    public void sendReply(MessageOut message, int id, InetAddressAndPort to)
     {
         sendOneWay(message, id, to);
     }
@@ -1040,12 +1038,12 @@ public final class MessagingService implements 
MessagingServiceMBean
      * @param message messages to be sent.
      * @param to      endpoint to which the message needs to be sent
      */
-    public void sendOneWay(MessageOut message, int id, InetAddress to)
+    public void sendOneWay(MessageOut message, int id, InetAddressAndPort to)
     {
         if (logger.isTraceEnabled())
-            logger.trace("{} sending {} to {}@{}", 
FBUtilities.getBroadcastAddress(), message.verb, id, to);
+            logger.trace("{} sending {} to {}@{}", 
FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to);
 
-        if (to.equals(FBUtilities.getBroadcastAddress()))
+        if (to.equals(FBUtilities.getBroadcastAddressAndPort()))
             logger.trace("Message-to-self {} going over MessagingService", 
message);
 
         // message sinks are a testing hook
@@ -1058,7 +1056,7 @@ public final class MessagingService implements 
MessagingServiceMBean
             outboundMessagingPool.sendMessage(message, id);
     }
 
-    public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)
+    public <T> AsyncOneResponse<T> sendRR(MessageOut message, 
InetAddressAndPort to)
     {
         AsyncOneResponse<T> iar = new AsyncOneResponse<T>();
         sendRR(message, to, iar);
@@ -1176,7 +1174,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     /**
      * @return the last version associated with address, or @param version if 
this is the first such version
      */
-    public int setVersion(InetAddress endpoint, int version)
+    public int setVersion(InetAddressAndPort endpoint, int version)
     {
         logger.trace("Setting version {} for {}", version, endpoint);
 
@@ -1184,7 +1182,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         return v == null ? version : v;
     }
 
-    public void resetVersion(InetAddress endpoint)
+    public void resetVersion(InetAddressAndPort endpoint)
     {
         logger.trace("Resetting version for {}", endpoint);
         versions.remove(endpoint);
@@ -1194,7 +1192,7 @@ public final class MessagingService implements 
MessagingServiceMBean
      * Returns the messaging-version as announced by the given node but capped
      * to the min of the version as announced by the node and {@link 
#current_version}.
      */
-    public int getVersion(InetAddress endpoint)
+    public int getVersion(InetAddressAndPort endpoint)
     {
         Integer v = versions.get(endpoint);
         if (v == null)
@@ -1209,13 +1207,13 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public int getVersion(String endpoint) throws UnknownHostException
     {
-        return getVersion(InetAddress.getByName(endpoint));
+        return getVersion(InetAddressAndPort.getByName(endpoint));
     }
 
     /**
      * Returns the messaging-version exactly as announced by the given 
endpoint.
      */
-    public int getRawVersion(InetAddress endpoint)
+    public int getRawVersion(InetAddressAndPort endpoint)
     {
         Integer v = versions.get(endpoint);
         if (v == null)
@@ -1223,7 +1221,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         return v;
     }
 
-    public boolean knowsVersion(InetAddress endpoint)
+    public boolean knowsVersion(InetAddressAndPort endpoint)
     {
         return versions.containsKey(endpoint);
     }
@@ -1358,72 +1356,144 @@ public final class MessagingService implements 
MessagingServiceMBean
     public Map<String, Integer> getLargeMessagePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getPendingMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(false), 
entry.getValue().largeMessageChannel.getPendingMessages());
         return pendingTasks;
     }
 
     public Map<String, Long> getLargeMessageCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getCompletedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(false), 
entry.getValue().largeMessageChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getLargeMessageDroppedTasks()
     {
         Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getDroppedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(false), 
entry.getValue().largeMessageChannel.getDroppedMessages());
         return droppedTasks;
     }
 
     public Map<String, Integer> getSmallMessagePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getPendingMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(false), 
entry.getValue().smallMessageChannel.getPendingMessages());
         return pendingTasks;
     }
 
     public Map<String, Long> getSmallMessageCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getCompletedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(false), 
entry.getValue().smallMessageChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getSmallMessageDroppedTasks()
     {
         Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getDroppedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(false), 
entry.getValue().smallMessageChannel.getDroppedMessages());
         return droppedTasks;
     }
 
     public Map<String, Integer> getGossipMessagePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getPendingMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(false), 
entry.getValue().gossipChannel.getPendingMessages());
         return pendingTasks;
     }
 
     public Map<String, Long> getGossipMessageCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getCompletedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(false), 
entry.getValue().gossipChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getGossipMessageDroppedTasks()
     {
         Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getDroppedMessages());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(false), 
entry.getValue().gossipChannel.getDroppedMessages());
+        return droppedTasks;
+    }
+
+    public Map<String, Integer> getLargeMessagePendingTasksWithPort()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(), 
entry.getValue().largeMessageChannel.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getLargeMessageCompletedTasksWithPort()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(), 
entry.getValue().largeMessageChannel.getCompletedMessages());
+        return completedTasks;
+    }
+
+    public Map<String, Long> getLargeMessageDroppedTasksWithPort()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(), 
entry.getValue().largeMessageChannel.getDroppedMessages());
+        return droppedTasks;
+    }
+
+    public Map<String, Integer> getSmallMessagePendingTasksWithPort()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(), 
entry.getValue().smallMessageChannel.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getSmallMessageCompletedTasksWithPort()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(), 
entry.getValue().smallMessageChannel.getCompletedMessages());
+        return completedTasks;
+    }
+
+    public Map<String, Long> getSmallMessageDroppedTasksWithPort()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(), 
entry.getValue().smallMessageChannel.getDroppedMessages());
+        return droppedTasks;
+    }
+
+    public Map<String, Integer> getGossipMessagePendingTasksWithPort()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().toString(), 
entry.getValue().gossipChannel.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getGossipMessageCompletedTasksWithPort()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().toString(), 
entry.getValue().gossipChannel.getCompletedMessages());
+        return completedTasks;
+    }
+
+    public Map<String, Long> getGossipMessageDroppedTasksWithPort()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().toString(), 
entry.getValue().gossipChannel.getDroppedMessages());
         return droppedTasks;
     }
 
@@ -1435,7 +1505,6 @@ public final class MessagingService implements 
MessagingServiceMBean
         return map;
     }
 
-
     public long getTotalTimeouts()
     {
         return ConnectionMetrics.totalTimeouts.getCount();
@@ -1444,9 +1513,21 @@ public final class MessagingService implements 
MessagingServiceMBean
     public Map<String, Long> getTimeoutsPerHost()
     {
         Map<String, Long> result = new HashMap<String, 
Long>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
         {
-            String ip = entry.getKey().getHostAddress();
+            String ip = entry.getKey().toString(false);
+            long recent = entry.getValue().getTimeouts();
+            result.put(ip, recent);
+        }
+        return result;
+    }
+
+    public Map<String, Long> getTimeoutsPerHostWithPort()
+    {
+        Map<String, Long> result = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+        {
+            String ip = entry.getKey().toString();
             long recent = entry.getValue().getTimeouts();
             result.put(ip, recent);
         }
@@ -1456,8 +1537,17 @@ public final class MessagingService implements 
MessagingServiceMBean
     public Map<String, Double> getBackPressurePerHost()
     {
         Map<String, Double> map = new HashMap<>(channelManagers.size());
-        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
-            map.put(entry.getKey().getHostAddress(), 
entry.getValue().getBackPressureState().getBackPressureRateLimit());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            map.put(entry.getKey().toString(false), 
entry.getValue().getBackPressureState().getBackPressureRateLimit());
+
+        return map;
+    }
+
+    public Map<String, Double> getBackPressurePerHostWithPort()
+    {
+        Map<String, Double> map = new HashMap<>(channelManagers.size());
+        for (Map.Entry<InetAddressAndPort, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            map.put(entry.getKey().toString(false), 
entry.getValue().getBackPressureState().getBackPressureRateLimit());
 
         return map;
     }
@@ -1493,18 +1583,18 @@ public final class MessagingService implements 
MessagingServiceMBean
                                                    
bounds.left.getPartitioner().getClass().getName()));
     }
 
-    private OutboundMessagingPool getMessagingConnection(InetAddress to)
+    private OutboundMessagingPool getMessagingConnection(InetAddressAndPort to)
     {
         OutboundMessagingPool pool = channelManagers.get(to);
         if (pool == null)
         {
             final boolean secure = isEncryptedConnection(to);
             final int port = portFor(to, secure);
-            if 
(!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port))
+            if 
(!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to.address, port))
                 return null;
 
-            InetSocketAddress preferredRemote = new 
InetSocketAddress(SystemKeyspace.getPreferredIP(to), port);
-            InetSocketAddress local = new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0);
+            InetAddressAndPort preferredRemote = 
SystemKeyspace.getPreferredIP(to);
+            InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
             ServerEncryptionOptions encryptionOptions = secure ? 
DatabaseDescriptor.getServerEncryptionOptions() : null;
             IInternodeAuthenticator authenticator = 
DatabaseDescriptor.getInternodeAuthenticator();
 
@@ -1519,16 +1609,16 @@ public final class MessagingService implements 
MessagingServiceMBean
         return pool;
     }
 
-    public int portFor(InetAddress addr)
+    public int portFor(InetAddressAndPort addr)
     {
         final boolean secure = isEncryptedConnection(addr);
         return portFor(addr, secure);
     }
 
-    private int portFor(InetAddress address, boolean secure)
+    private int portFor(InetAddressAndPort address, boolean secure)
     {
         if (!secure)
-            return DatabaseDescriptor.getStoragePort();
+            return address.port;
 
         Integer v = versions.get(address);
         // if we don't know the version of the peer, assume it is 4.0 (or 
higher) as the only time is would be lower
@@ -1536,12 +1626,15 @@ public final class MessagingService implements 
MessagingServiceMBean
         // unfortunately fail - however the peer should connect to this node 
(at some point), and once we learn it's version, it'll be
         // in versions map. thus, when we attempt to reconnect to that node, 
we'll have the version and we can get the correct port.
         // we will be able to remove this logic at 5.0.
+        // Also as of 4.0 we will propagate the "regular" port (which will 
support both SSL and non-SSL) via gossip so
+        // for SSL and version 4.0 always connect to the gossiped port because 
if SSL is enabled it should ALWAYS
+        // listen for SSL on the "regular" port.
         int version = v != null ? v.intValue() : VERSION_40;
-        return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : 
DatabaseDescriptor.getStoragePort();
+        return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : 
address.port;
     }
 
     @VisibleForTesting
-    boolean isConnected(InetAddress address, MessageOut messageOut)
+    boolean isConnected(InetAddressAndPort address, MessageOut messageOut)
     {
         OutboundMessagingPool pool = channelManagers.get(address);
         if (pool == null)
@@ -1549,7 +1642,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         return pool.getConnection(messageOut).isConnected();
     }
 
-    public static boolean isEncryptedConnection(InetAddress address)
+    public static boolean isEncryptedConnection(InetAddressAndPort address)
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         switch 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption)
@@ -1559,13 +1652,13 @@ public final class MessagingService implements 
MessagingServiceMBean
             case all:
                 break;
             case dc:
-                if 
(snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                if 
(snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
                     return false;
                 break;
             case rack:
                 // for rack then check if the DC's are the same.
-                if 
(snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
-                    && 
snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                if 
(snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddressAndPort()))
+                    && 
snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort())))
                     return false;
                 break;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java 
b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index b2e79e0..f4a0c43 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -30,47 +30,69 @@ public interface MessagingServiceMBean
     /**
      * Pending tasks for large message TCP Connections
      */
+    @Deprecated
     public Map<String, Integer> getLargeMessagePendingTasks();
+    public Map<String, Integer> getLargeMessagePendingTasksWithPort();
 
     /**
      * Completed tasks for large message) TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getLargeMessageCompletedTasks();
+    public Map<String, Long> getLargeMessageCompletedTasksWithPort();
 
     /**
      * Dropped tasks for large message TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getLargeMessageDroppedTasks();
+    public Map<String, Long> getLargeMessageDroppedTasksWithPort();
+
 
     /**
      * Pending tasks for small message TCP Connections
      */
+    @Deprecated
     public Map<String, Integer> getSmallMessagePendingTasks();
+    public Map<String, Integer> getSmallMessagePendingTasksWithPort();
+
 
     /**
      * Completed tasks for small message TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getSmallMessageCompletedTasks();
+    public Map<String, Long> getSmallMessageCompletedTasksWithPort();
+
 
     /**
      * Dropped tasks for small message TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getSmallMessageDroppedTasks();
+    public Map<String, Long> getSmallMessageDroppedTasksWithPort();
+
 
     /**
      * Pending tasks for gossip message TCP Connections
      */
+    @Deprecated
     public Map<String, Integer> getGossipMessagePendingTasks();
+    public Map<String, Integer> getGossipMessagePendingTasksWithPort();
 
     /**
      * Completed tasks for gossip message TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getGossipMessageCompletedTasks();
+    public Map<String, Long> getGossipMessageCompletedTasksWithPort();
 
     /**
      * Dropped tasks for gossip message TCP Connections
      */
+    @Deprecated
     public Map<String, Long> getGossipMessageDroppedTasks();
+    public Map<String, Long> getGossipMessageDroppedTasksWithPort();
 
     /**
      * dropped message counts for server lifetime
@@ -85,12 +107,16 @@ public interface MessagingServiceMBean
     /**
      * Number of timeouts per host
      */
+    @Deprecated
     public Map<String, Long> getTimeoutsPerHost();
+    public Map<String, Long> getTimeoutsPerHostWithPort();
 
     /**
      * Back-pressure rate limiting per host
      */
+    @Deprecated
     public Map<String, Double> getBackPressurePerHost();
+    public Map<String, Double> getBackPressurePerHostWithPort();
 
     /**
      * Enable/Disable back-pressure

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ParameterType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ParameterType.java 
b/src/java/org/apache/cassandra/net/ParameterType.java
new file mode 100644
index 0000000..0a1f73f
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ParameterType.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.io.DummyByteVersionedSerializer;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.ShortVersionedSerializer;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * Type names and serializers for various parameters that
+ */
+public enum ParameterType
+{
+    FORWARD_TO("FORWARD_TO", ForwardToSerializer.instance),
+    FORWARD_FROM("FORWARD_FROM", CompactEndpointSerializationHelper.instance),
+    FAILURE_RESPONSE("FAIL", DummyByteVersionedSerializer.instance),
+    FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance),
+    FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance),
+    TRACE_SESSION("TraceSession", UUIDSerializer.serializer),
+    TRACE_TYPE("TraceType", Tracing.traceTypeSerializer);
+
+    public static final Map<String, ParameterType> byName;
+    public final String key;
+    public final IVersionedSerializer serializer;
+
+    static
+    {
+        ImmutableMap.Builder<String, ParameterType> builder = 
ImmutableMap.builder();
+        for (ParameterType type : values())
+        {
+            builder.put(type.key, type);
+        }
+        byName = builder.build();
+    }
+
+    ParameterType(String key, IVersionedSerializer serializer)
+    {
+        this.key = key;
+        this.serializer = serializer;
+    }
+
+    public String key()
+    {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java 
b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
index 64685b0..b951bc0 100644
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.SystemTimeSource;
 import org.apache.cassandra.utils.TimeSource;
@@ -253,7 +253,7 @@ public class RateBasedBackPressure implements 
BackPressureStrategy<RateBasedBack
     }
 
     @Override
-    public RateBasedBackPressureState newState(InetAddress host)
+    public RateBasedBackPressureState newState(InetAddressAndPort host)
     {
         return new RateBasedBackPressureState(host, timeSource, windowSize);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java 
b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
index 541d7a6..9df056e 100644
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
@@ -17,11 +17,11 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.SlidingTimeRate;
 import org.apache.cassandra.utils.TimeSource;
 import org.apache.cassandra.utils.concurrent.IntervalLock;
@@ -46,12 +46,12 @@ import org.apache.cassandra.utils.concurrent.IntervalLock;
  */
 class RateBasedBackPressureState extends IntervalLock implements 
BackPressureState
 {
-    private final InetAddress host;
+    private final InetAddressAndPort host;
     final SlidingTimeRate incomingRate;
     final SlidingTimeRate outgoingRate;
     final RateLimiter rateLimiter;
 
-    RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long 
windowSize)
+    RateBasedBackPressureState(InetAddressAndPort host, TimeSource timeSource, 
long windowSize)
     {
         super(timeSource);
         this.host = host;
@@ -99,7 +99,7 @@ class RateBasedBackPressureState extends IntervalLock 
implements BackPressureSta
     }
 
     @Override
-    public InetAddress getHost()
+    public InetAddressAndPort getHost()
     {
         return host;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java 
b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 9ecc385..41ac31b 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -18,11 +18,10 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
-
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.FBUtilities;
@@ -32,7 +31,7 @@ public class WriteCallbackInfo extends CallbackInfo
     // either a Mutation, or a Paxos Commit (MessageOut)
     private final Object mutation;
 
-    public WriteCallbackInfo(InetAddress target,
+    public WriteCallbackInfo(InetAddressAndPort target,
                              IAsyncCallback callback,
                              MessageOut message,
                              IVersionedSerializer<?> serializer,
@@ -43,7 +42,7 @@ public class WriteCallbackInfo extends CallbackInfo
         assert message != null;
         this.mutation = shouldHint(allowHints, message, consistencyLevel);
         //Local writes shouldn't go through messaging service 
(https://issues.apache.org/jira/browse/CASSANDRA-10477)
-        assert (!target.equals(FBUtilities.getBroadcastAddress()));
+        assert (!target.equals(FBUtilities.getBroadcastAddressAndPort()));
     }
 
     public boolean shouldHint()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java 
b/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java
index 9b8df80..327b20e 100644
--- a/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java
+++ b/src/java/org/apache/cassandra/net/async/HandshakeProtocol.java
@@ -24,11 +24,15 @@ import java.net.InetAddress;
 import java.util.Objects;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.ByteBufOutputStream;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 
@@ -227,9 +231,9 @@ public class HandshakeProtocol
         private static final int MIN_LENGTH = 9;
 
         final int messagingVersion;
-        final InetAddress address;
+        final InetAddressAndPort address;
 
-        ThirdHandshakeMessage(int messagingVersion, InetAddress address)
+        ThirdHandshakeMessage(int messagingVersion, InetAddressAndPort address)
         {
             this.messagingVersion = messagingVersion;
             this.address = address;
@@ -238,14 +242,14 @@ public class HandshakeProtocol
         @SuppressWarnings("resource")
         public ByteBuf encode(ByteBufAllocator allocator)
         {
-            int bufLength = Integer.BYTES + 
CompactEndpointSerializationHelper.serializedSize(address);
+            int bufLength = Ints.checkedCast(Integer.BYTES + 
CompactEndpointSerializationHelper.instance.serializedSize(address, 
messagingVersion));
             ByteBuf buffer = allocator.directBuffer(bufLength, bufLength);
             buffer.writerIndex(0);
             buffer.writeInt(messagingVersion);
             try
             {
-                DataOutput bbos = new ByteBufOutputStream(buffer);
-                CompactEndpointSerializationHelper.serialize(address, bbos);
+                DataOutputPlus dop = new ByteBufDataOutputPlus(buffer);
+                CompactEndpointSerializationHelper.instance.serialize(address, 
dop, messagingVersion);
                 return buffer;
             }
             catch (IOException e)
@@ -263,10 +267,10 @@ public class HandshakeProtocol
 
             in.markReaderIndex();
             int version = in.readInt();
-            DataInput inputStream = new ByteBufInputStream(in);
+            DataInputPlus input = new ByteBufDataInputPlus(in);
             try
             {
-                InetAddress address = 
CompactEndpointSerializationHelper.deserialize(inputStream);
+                InetAddressAndPort address = 
CompactEndpointSerializationHelper.instance.deserialize(input, version);
                 return new ThirdHandshakeMessage(version, address);
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java 
b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
index 625f03d..a84112e 100644
--- a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
+++ b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
@@ -22,6 +22,7 @@ import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslHandler;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
 import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
@@ -209,7 +210,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
     {
         ChannelPipeline pipeline = ctx.pipeline();
         InetSocketAddress address = (InetSocketAddress) 
ctx.channel().remoteAddress();
-        pipeline.addLast(NettyFactory.instance.streamingGroup, 
"streamInbound", new StreamingInboundHandler(address, protocolVersion, null));
+        pipeline.addLast(NettyFactory.instance.streamingGroup, 
"streamInbound", new 
StreamingInboundHandler(InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(),
 address.getPort()), protocolVersion, null));
         pipeline.remove(this);
 
         // pass a custom recv ByteBuf allocator to the channel. the default 
recv ByteBuf size is 1k, but in streaming we're
@@ -244,7 +245,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
         }
 
         // record the (true) version of the endpoint
-        InetAddress from = msg.address;
+        InetAddressAndPort from = msg.address;
         MessagingService.instance().setVersion(from, maxVersion);
         logger.trace("Set version for {} to {} (will use {})", from, 
maxVersion, MessagingService.instance().getVersion(from));
 
@@ -253,7 +254,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
     }
 
     @VisibleForTesting
-    void setupMessagingPipeline(ChannelPipeline pipeline, InetAddress peer, 
boolean compressed, int messagingVersion)
+    void setupMessagingPipeline(ChannelPipeline pipeline, InetAddressAndPort 
peer, boolean compressed, int messagingVersion)
     {
         if (compressed)
             pipeline.addLast(NettyFactory.INBOUND_COMPRESSOR_HANDLER_NAME, 
NettyFactory.createLz4Decoder(messagingVersion));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/MessageInHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java 
b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
index b400512..0423b80 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.net.async;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -37,9 +36,12 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.UnknownTableException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 
 /**
  * Parses out individual messages from the incoming buffers. Each message, 
both header and payload, is incrementally built up
@@ -79,7 +81,7 @@ class MessageInHandler extends ByteToMessageDecoder
      */
     private static final int SECOND_SECTION_BYTE_COUNT = 8;
 
-    private final InetAddress peer;
+    private final InetAddressAndPort peer;
     private final int messagingVersion;
 
     /**
@@ -91,12 +93,12 @@ class MessageInHandler extends ByteToMessageDecoder
     private State state;
     private MessageHeader messageHeader;
 
-    MessageInHandler(InetAddress peer, int messagingVersion)
+    MessageInHandler(InetAddressAndPort peer, int messagingVersion)
     {
         this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
     }
 
-    MessageInHandler(InetAddress peer, int messagingVersion, 
BiConsumer<MessageIn, Integer> messageConsumer)
+    MessageInHandler(InetAddressAndPort peer, int messagingVersion, 
BiConsumer<MessageIn, Integer> messageConsumer)
     {
         this.peer = peer;
         this.messagingVersion = messagingVersion;
@@ -140,7 +142,7 @@ class MessageInHandler extends ByteToMessageDecoder
                         int serializedAddrSize;
                         if (readableBytes < 1 || readableBytes < 
(serializedAddrSize = in.getByte(in.readerIndex()) + 1))
                             return;
-                        messageHeader.from = 
CompactEndpointSerializationHelper.deserialize(inputPlus);
+                        messageHeader.from = 
CompactEndpointSerializationHelper.instance.deserialize(inputPlus, 
messagingVersion);
                         state = State.READ_SECOND_CHUNK;
                         readableBytes -= serializedAddrSize;
                         // fall-through
@@ -199,7 +201,7 @@ class MessageInHandler extends ByteToMessageDecoder
     /**
      * @return <code>true</code> if all the parameters have been read from the 
{@link ByteBuf}; else, <code>false</code>.
      */
-    private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, 
int parameterCount, Map<String, byte[]> parameters) throws IOException
+    private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, 
int parameterCount, Map<ParameterType, Object> parameters) throws IOException
     {
         // makes the assumption that map.size() is a constant time function 
(HashMap.size() is)
         while (parameters.size() < parameterCount)
@@ -208,9 +210,10 @@ class MessageInHandler extends ByteToMessageDecoder
                 return false;
 
             String key = DataInputStream.readUTF(inputPlus);
+            ParameterType parameterType = ParameterType.byName.get(key);
             byte[] value = new byte[in.readInt()];
             in.readBytes(value);
-            parameters.put(key, value);
+            parameters.put(parameterType, 
parameterType.serializer.deserialize(new DataInputBuffer(value), 
messagingVersion));
         }
 
         return true;
@@ -300,11 +303,11 @@ class MessageInHandler extends ByteToMessageDecoder
     {
         int messageId;
         long constructionTime;
-        InetAddress from;
+        InetAddressAndPort from;
         MessagingService.Verb verb;
         int payloadSize;
 
-        Map<String, byte[]> parameters = Collections.emptyMap();
+        Map<ParameterType, Object> parameters = Collections.emptyMap();
 
         /**
          * Total number of incoming parameters.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/MessageOutHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageOutHandler.java 
b/src/java/org/apache/cassandra/net/async/MessageOutHandler.java
index e88b56a..f1647ab 100644
--- a/src/java/org/apache/cassandra/net/async/MessageOutHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageOutHandler.java
@@ -40,6 +40,7 @@ import io.netty.handler.timeout.IdleStateEvent;
 
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
@@ -196,10 +197,9 @@ class MessageOutHandler extends ChannelDuplexHandler
     {
         try
         {
-            byte[] sessionBytes = 
msg.message.parameters.get(Tracing.TRACE_HEADER);
-            if (sessionBytes != null)
+            UUID sessionId =  
(UUID)msg.message.getParameter(ParameterType.TRACE_SESSION);
+            if (sessionId != null)
             {
-                UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance.get(sessionId);
                 String message = String.format("Sending %s message to %s, size 
= %d bytes",
                                                msg.message.verb, 
connectionId.connectionAddress(),
@@ -207,9 +207,9 @@ class MessageOutHandler extends ChannelDuplexHandler
                 // session may have already finished; see CASSANDRA-5668
                 if (state == null)
                 {
-                    byte[] traceTypeBytes = 
msg.message.parameters.get(Tracing.TRACE_TYPE);
-                    Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), 
message, traceType.getTTL());
+                    Tracing.TraceType traceType = 
(Tracing.TraceType)msg.message.getParameter(ParameterType.TRACE_TYPE);
+                    traceType = traceType == null ? Tracing.TraceType.QUERY : 
traceType;
+                    
Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), message, 
traceType.getTTL());
                 }
                 else
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to