http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java 
b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
index 82885e3..72242fd 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -17,16 +17,16 @@
  */
 package org.apache.cassandra.exceptions;
 
-import java.net.InetAddress;
 import java.util.Map;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class ReadFailureException extends RequestFailureException
 {
     public final boolean dataPresent;
 
-    public ReadFailureException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
+    public ReadFailureException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
     {
         super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, 
failureReasonByEndpoint);
         this.dataPresent = dataPresent;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
index 1a5289c..2b57a75 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -17,20 +17,20 @@
  */
 package org.apache.cassandra.exceptions;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class RequestFailureException extends RequestExecutionException
 {
     public final ConsistencyLevel consistency;
     public final int received;
     public final int blockFor;
-    public final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
+    public final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
 
-    protected RequestFailureException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int blockFor, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
+    protected RequestFailureException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int blockFor, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
     {
         super(code, String.format("Operation failed - received %d responses 
and %d failures", received, failureReasonByEndpoint.size()));
         this.consistency = consistency;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java 
b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
index 1a857fe..a7dc66a 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
@@ -17,17 +17,17 @@
  */
 package org.apache.cassandra.exceptions;
 
-import java.net.InetAddress;
 import java.util.Map;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class WriteFailureException extends RequestFailureException
 {
     public final WriteType writeType;
 
-    public WriteFailureException(ConsistencyLevel consistency, int received, 
int blockFor, WriteType writeType, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
+    public WriteFailureException(ConsistencyLevel consistency, int received, 
int blockFor, WriteType writeType, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
     {
         super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, 
failureReasonByEndpoint);
         this.writeType = writeType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java 
b/src/java/org/apache/cassandra/gms/ApplicationState.java
index ade9208..211387d 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -19,15 +19,15 @@ package org.apache.cassandra.gms;
 
 public enum ApplicationState
 {
-    STATUS,
+    @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 
5.0, reclaim in 6.0
     LOAD,
     SCHEMA,
     DC,
     RACK,
     RELEASE_VERSION,
     REMOVAL_COORDINATOR,
-    INTERNAL_IP,
-    RPC_ADDRESS,
+    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing 
in 5.0, reclaim in 6.0
+    @Deprecated RPC_ADDRESS, // ^ Same
     X_11_PADDING, // padding specifically for 1.1
     SEVERITY,
     NET_VERSION,
@@ -35,8 +35,9 @@ public enum ApplicationState
     TOKENS,
     RPC_READY,
     // pad to allow adding new states to existing cluster
-    X1,
-    X2,
+    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two 
ports
+    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
+    STATUS_WITH_PORT, //Replacement for STATUS
     X3,
     X4,
     X5,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 847041f..1085447 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -146,9 +146,15 @@ public class EndpointState
 
     public String getStatus()
     {
-        VersionedValue status = getApplicationState(ApplicationState.STATUS);
+        VersionedValue status = 
getApplicationState(ApplicationState.STATUS_WITH_PORT);
         if (status == null)
+        {
+            status = getApplicationState(ApplicationState.STATUS);
+        }
+        if (status == null)
+        {
             return "";
+        }
 
         String[] pieces = status.value.split(VersionedValue.DELIMITER_STR, -1);
         assert (pieces.length > 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index b895082..e567b7b 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -22,7 +22,6 @@ import java.nio.file.StandardOpenOption;
 import java.nio.file.Path;
 import java.io.*;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -79,7 +78,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     // change.
     private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434...
 
-    private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples 
= new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<InetAddressAndPort, ArrivalWindow> 
arrivalSamples = new ConcurrentHashMap<>();
     private final List<IFailureDetectionEventListener> fdEvntListeners = new 
CopyOnWriteArrayList<>();
 
     public FailureDetector()
@@ -112,10 +111,20 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
 
     public String getAllEndpointStates()
     {
+        return getAllEndpointStates(false);
+    }
+
+    public String getAllEndpointStatesWithPort()
+    {
+        return getAllEndpointStates(true);
+    }
+
+    public String getAllEndpointStates(boolean withPort)
+    {
         StringBuilder sb = new StringBuilder();
-        for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
         {
-            sb.append(entry.getKey()).append("\n");
+            sb.append(entry.getKey().toString(withPort)).append("\n");
             appendEndpointState(sb, entry.getValue());
         }
         return sb.toString();
@@ -123,13 +132,23 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
 
     public Map<String, String> getSimpleStates()
     {
+        return getSimpleStates(false);
+    }
+
+    public Map<String, String> getSimpleStatesWithPort()
+    {
+        return getSimpleStates(true);
+    }
+
+    private Map<String, String> getSimpleStates(boolean withPort)
+    {
         Map<String, String> nodesStatus = new HashMap<String, 
String>(Gossiper.instance.endpointStateMap.size());
-        for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
         {
             if (entry.getValue().isAlive())
-                nodesStatus.put(entry.getKey().toString(), "UP");
+                nodesStatus.put(entry.getKey().toString(withPort), "UP");
             else
-                nodesStatus.put(entry.getKey().toString(), "DOWN");
+                nodesStatus.put(entry.getKey().toString(withPort), "DOWN");
         }
         return nodesStatus;
     }
@@ -137,7 +156,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     public int getDownEndpointCount()
     {
         int count = 0;
-        for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
         {
             if (!entry.getValue().isAlive())
                 count++;
@@ -148,7 +167,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     public int getUpEndpointCount()
     {
         int count = 0;
-        for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
Gossiper.instance.endpointStateMap.entrySet())
         {
             if (entry.getValue().isAlive())
                 count++;
@@ -159,13 +178,24 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     @Override
     public TabularData getPhiValues() throws OpenDataException
     {
+        return getPhiValues(false);
+    }
+
+    @Override
+    public TabularData getPhiValuesWithPort() throws OpenDataException
+    {
+        return getPhiValues(true);
+    }
+
+    private TabularData getPhiValues(boolean withPort) throws OpenDataException
+    {
         final CompositeType ct = new CompositeType("Node", "Node",
                 new String[]{"Endpoint", "PHI"},
                 new String[]{"IP of the endpoint", "PHI value"},
                 new OpenType[]{SimpleType.STRING, SimpleType.DOUBLE});
         final TabularDataSupport results = new TabularDataSupport(new 
TabularType("PhiList", "PhiList", ct, new String[]{"Endpoint"}));
 
-        for (final Map.Entry<InetAddress, ArrivalWindow> entry : 
arrivalSamples.entrySet())
+        for (final Map.Entry<InetAddressAndPort, ArrivalWindow> entry : 
arrivalSamples.entrySet())
         {
             final ArrivalWindow window = entry.getValue();
             if (window.mean() > 0)
@@ -176,7 +206,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
                     // returned values are scaled by PHI_FACTOR so that the 
are on the same scale as PhiConvictThreshold
                     final CompositeData data = new CompositeDataSupport(ct,
                             new String[]{"Endpoint", "PHI"},
-                            new Object[]{entry.getKey().toString(), phi * 
PHI_FACTOR});
+                            new Object[]{entry.getKey().toString(withPort), 
phi * PHI_FACTOR});
                     results.put(data);
                 }
             }
@@ -187,7 +217,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     public String getEndpointState(String address) throws UnknownHostException
     {
         StringBuilder sb = new StringBuilder();
-        EndpointState endpointState = 
Gossiper.instance.getEndpointStateForEndpoint(InetAddress.getByName(address));
+        EndpointState endpointState = 
Gossiper.instance.getEndpointStateForEndpoint(InetAddressAndPort.getByName(address));
         appendEndpointState(sb, endpointState);
         return sb.toString();
     }
@@ -243,9 +273,9 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         return DatabaseDescriptor.getPhiConvictThreshold();
     }
 
-    public boolean isAlive(InetAddress ep)
+    public boolean isAlive(InetAddressAndPort ep)
     {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
             return true;
 
         EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(ep);
@@ -257,7 +287,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         return epState != null && epState.isAlive();
     }
 
-    public void report(InetAddress ep)
+    public void report(InetAddressAndPort ep)
     {
         long now = Clock.instance.nanoTime();
         ArrivalWindow heartbeatWindow = arrivalSamples.get(ep);
@@ -279,7 +309,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
             logger.trace("Average for {} is {}ns", ep, heartbeatWindow.mean());
     }
 
-    public void interpret(InetAddress ep)
+    public void interpret(InetAddressAndPort ep)
     {
         ArrivalWindow hbWnd = arrivalSamples.get(ep);
         if (hbWnd == null)
@@ -324,7 +354,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         }
     }
 
-    public void forceConviction(InetAddress ep)
+    public void forceConviction(InetAddressAndPort ep)
     {
         logger.debug("Forcing conviction of {}", ep);
         for (IFailureDetectionEventListener listener : fdEvntListeners)
@@ -333,7 +363,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         }
     }
 
-    public void remove(InetAddress ep)
+    public void remove(InetAddressAndPort ep)
     {
         arrivalSamples.remove(ep);
     }
@@ -351,10 +381,10 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        Set<InetAddress> eps = arrivalSamples.keySet();
+        Set<InetAddressAndPort> eps = arrivalSamples.keySet();
 
         
sb.append("-----------------------------------------------------------------------");
-        for (InetAddress ep : eps)
+        for (InetAddressAndPort ep : eps)
         {
             ArrivalWindow hWnd = arrivalSamples.get(ep);
             sb.append(ep).append(" : ");
@@ -447,7 +477,7 @@ class ArrivalWindow
         }
     }
 
-    synchronized void add(long value, InetAddress ep)
+    synchronized void add(long value, InetAddressAndPort ep)
     {
         assert tLast >= 0;
         if (tLast > 0L)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java 
b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
index 23fae3a..6be31b0 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
@@ -31,15 +31,18 @@ public interface FailureDetectorMBean
 
     public double getPhiConvictThreshold();
 
-    public String getAllEndpointStates();
+    @Deprecated public String getAllEndpointStates();
+    public String getAllEndpointStatesWithPort();
 
     public String getEndpointState(String address) throws UnknownHostException;
 
-    public Map<String, String> getSimpleStates();
+    @Deprecated public Map<String, String> getSimpleStates();
+    public Map<String, String> getSimpleStatesWithPort();
 
     public int getDownEndpointCount();
 
     public int getUpEndpointCount();
 
-    public TabularData getPhiValues() throws OpenDataException;
+    @Deprecated public TabularData getPhiValues() throws OpenDataException;
+    public TabularData getPhiValuesWithPort() throws OpenDataException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigest.java 
b/src/java/org/apache/cassandra/gms/GossipDigest.java
index 9dfd486..c7e60c4 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigest.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigest.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
-import java.net.InetAddress;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -34,18 +34,18 @@ public class GossipDigest implements 
Comparable<GossipDigest>
 {
     public static final IVersionedSerializer<GossipDigest> serializer = new 
GossipDigestSerializer();
 
-    final InetAddress endpoint;
+    final InetAddressAndPort endpoint;
     final int generation;
     final int maxVersion;
 
-    GossipDigest(InetAddress ep, int gen, int version)
+    GossipDigest(InetAddressAndPort ep, int gen, int version)
     {
         endpoint = ep;
         generation = gen;
         maxVersion = version;
     }
 
-    InetAddress getEndpoint()
+    InetAddressAndPort getEndpoint()
     {
         return endpoint;
     }
@@ -83,14 +83,14 @@ class GossipDigestSerializer implements 
IVersionedSerializer<GossipDigest>
 {
     public void serialize(GossipDigest gDigest, DataOutputPlus out, int 
version) throws IOException
     {
-        CompactEndpointSerializationHelper.serialize(gDigest.endpoint, out);
+        
CompactEndpointSerializationHelper.instance.serialize(gDigest.endpoint, out, 
version);
         out.writeInt(gDigest.generation);
         out.writeInt(gDigest.maxVersion);
     }
 
     public GossipDigest deserialize(DataInputPlus in, int version) throws 
IOException
     {
-        InetAddress endpoint = 
CompactEndpointSerializationHelper.deserialize(in);
+        InetAddressAndPort endpoint = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
         int generation = in.readInt();
         int maxVersion = in.readInt();
         return new GossipDigest(endpoint, generation, maxVersion);
@@ -98,7 +98,7 @@ class GossipDigestSerializer implements 
IVersionedSerializer<GossipDigest>
 
     public long serializedSize(GossipDigest gDigest, int version)
     {
-        long size = 
CompactEndpointSerializationHelper.serializedSize(gDigest.endpoint);
+        long size = 
CompactEndpointSerializationHelper.instance.serializedSize(gDigest.endpoint, 
version);
         size += TypeSizes.sizeof(gDigest.generation);
         size += TypeSizes.sizeof(gDigest.maxVersion);
         return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
index cf71ae6..a7d5b92 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.gms;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +26,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -38,9 +38,9 @@ public class GossipDigestAck
     public static final IVersionedSerializer<GossipDigestAck> serializer = new 
GossipDigestAckSerializer();
 
     final List<GossipDigest> gDigestList;
-    final Map<InetAddress, EndpointState> epStateMap;
+    final Map<InetAddressAndPort, EndpointState> epStateMap;
 
-    GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddress, 
EndpointState> epStateMap)
+    GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, 
EndpointState> epStateMap)
     {
         this.gDigestList = gDigestList;
         this.epStateMap = epStateMap;
@@ -51,7 +51,7 @@ public class GossipDigestAck
         return gDigestList;
     }
 
-    Map<InetAddress, EndpointState> getEndpointStateMap()
+    Map<InetAddressAndPort, EndpointState> getEndpointStateMap()
     {
         return epStateMap;
     }
@@ -63,10 +63,10 @@ class GossipDigestAckSerializer implements 
IVersionedSerializer<GossipDigestAck>
     {
         
GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList, out, 
version);
         out.writeInt(gDigestAckMessage.epStateMap.size());
-        for (Map.Entry<InetAddress, EndpointState> entry : 
gDigestAckMessage.epStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
gDigestAckMessage.epStateMap.entrySet())
         {
-            InetAddress ep = entry.getKey();
-            CompactEndpointSerializationHelper.serialize(ep, out);
+            InetAddressAndPort ep = entry.getKey();
+            CompactEndpointSerializationHelper.instance.serialize(ep, out, 
version);
             EndpointState.serializer.serialize(entry.getValue(), out, version);
         }
     }
@@ -75,11 +75,11 @@ class GossipDigestAckSerializer implements 
IVersionedSerializer<GossipDigestAck>
     {
         List<GossipDigest> gDigestList = 
GossipDigestSerializationHelper.deserialize(in, version);
         int size = in.readInt();
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
+        Map<InetAddressAndPort, EndpointState> epStateMap = new 
HashMap<InetAddressAndPort, EndpointState>(size);
 
         for (int i = 0; i < size; ++i)
         {
-            InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort ep = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
             EndpointState epState = EndpointState.serializer.deserialize(in, 
version);
             epStateMap.put(ep, epState);
         }
@@ -90,8 +90,8 @@ class GossipDigestAckSerializer implements 
IVersionedSerializer<GossipDigestAck>
     {
         int size = 
GossipDigestSerializationHelper.serializedSize(ack.gDigestList, version);
         size += TypeSizes.sizeof(ack.epStateMap.size());
-        for (Map.Entry<InetAddress, EndpointState> entry : 
ack.epStateMap.entrySet())
-            size += 
CompactEndpointSerializationHelper.serializedSize(entry.getKey())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
ack.epStateMap.entrySet())
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(entry.getKey(), 
version)
                     + 
EndpointState.serializer.serializedSize(entry.getValue(), version);
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
index 9d779fe..a6d1d2b 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -26,6 +25,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -36,14 +36,14 @@ public class GossipDigestAck2
 {
     public static final IVersionedSerializer<GossipDigestAck2> serializer = 
new GossipDigestAck2Serializer();
 
-    final Map<InetAddress, EndpointState> epStateMap;
+    final Map<InetAddressAndPort, EndpointState> epStateMap;
 
-    GossipDigestAck2(Map<InetAddress, EndpointState> epStateMap)
+    GossipDigestAck2(Map<InetAddressAndPort, EndpointState> epStateMap)
     {
         this.epStateMap = epStateMap;
     }
 
-    Map<InetAddress, EndpointState> getEndpointStateMap()
+    Map<InetAddressAndPort, EndpointState> getEndpointStateMap()
     {
         return epStateMap;
     }
@@ -54,10 +54,10 @@ class GossipDigestAck2Serializer implements 
IVersionedSerializer<GossipDigestAck
     public void serialize(GossipDigestAck2 ack2, DataOutputPlus out, int 
version) throws IOException
     {
         out.writeInt(ack2.epStateMap.size());
-        for (Map.Entry<InetAddress, EndpointState> entry : 
ack2.epStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
ack2.epStateMap.entrySet())
         {
-            InetAddress ep = entry.getKey();
-            CompactEndpointSerializationHelper.serialize(ep, out);
+            InetAddressAndPort ep = entry.getKey();
+            CompactEndpointSerializationHelper.instance.serialize(ep, out, 
version);
             EndpointState.serializer.serialize(entry.getValue(), out, version);
         }
     }
@@ -65,11 +65,11 @@ class GossipDigestAck2Serializer implements 
IVersionedSerializer<GossipDigestAck
     public GossipDigestAck2 deserialize(DataInputPlus in, int version) throws 
IOException
     {
         int size = in.readInt();
-        Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, 
EndpointState>(size);
+        Map<InetAddressAndPort, EndpointState> epStateMap = new 
HashMap<>(size);
 
         for (int i = 0; i < size; ++i)
         {
-            InetAddress ep = 
CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort ep = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
             EndpointState epState = EndpointState.serializer.deserialize(in, 
version);
             epStateMap.put(ep, epState);
         }
@@ -79,8 +79,8 @@ class GossipDigestAck2Serializer implements 
IVersionedSerializer<GossipDigestAck
     public long serializedSize(GossipDigestAck2 ack2, int version)
     {
         long size = TypeSizes.sizeof(ack2.epStateMap.size());
-        for (Map.Entry<InetAddress, EndpointState> entry : 
ack2.epStateMap.entrySet())
-            size += 
CompactEndpointSerializationHelper.serializedSize(entry.getKey())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
ack2.epStateMap.entrySet())
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(entry.getKey(), 
version)
                     + 
EndpointState.serializer.serializedSize(entry.getValue(), version);
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index 240bb40..fd5d487 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
 import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 
@@ -34,7 +34,7 @@ public class GossipDigestAck2VerbHandler implements 
IVerbHandler<GossipDigestAck
     {
         if (logger.isTraceEnabled())
         {
-            InetAddress from = message.from;
+            InetAddressAndPort from = message.from;
             logger.trace("Received a GossipDigestAck2Message from {}", from);
         }
         if (!Gossiper.instance.isEnabled())
@@ -43,7 +43,7 @@ public class GossipDigestAck2VerbHandler implements 
IVerbHandler<GossipDigestAck
                 logger.trace("Ignoring GossipDigestAck2Message because gossip 
is disabled");
             return;
         }
-        Map<InetAddress, EndpointState> remoteEpStateMap = 
message.payload.getEndpointStateMap();
+        Map<InetAddressAndPort, EndpointState> remoteEpStateMap = 
message.payload.getEndpointStateMap();
         /* Notify the Failure Detector */
         Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
         Gossiper.instance.applyStateLocally(remoteEpStateMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index d6d9dfb..2a12b7c 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +24,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -36,7 +36,7 @@ public class GossipDigestAckVerbHandler implements 
IVerbHandler<GossipDigestAck>
 
     public void doVerb(MessageIn<GossipDigestAck> message, int id)
     {
-        InetAddress from = message.from;
+        InetAddressAndPort from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestAckMessage from {}", from);
         if (!Gossiper.instance.isEnabled() && 
!Gossiper.instance.isInShadowRound())
@@ -48,7 +48,7 @@ public class GossipDigestAckVerbHandler implements 
IVerbHandler<GossipDigestAck>
 
         GossipDigestAck gDigestAckMessage = message.payload;
         List<GossipDigest> gDigestList = 
gDigestAckMessage.getGossipDigestList();
-        Map<InetAddress, EndpointState> epStateMap = 
gDigestAckMessage.getEndpointStateMap();
+        Map<InetAddressAndPort, EndpointState> epStateMap = 
gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", 
gDigestList.size(), epStateMap.size());
 
         if (Gossiper.instance.isInShadowRound())
@@ -79,10 +79,10 @@ public class GossipDigestAckVerbHandler implements 
IVerbHandler<GossipDigestAck>
         }
 
         /* Get the state required to send to this gossipee - construct 
GossipDigestAck2Message */
-        Map<InetAddress, EndpointState> deltaEpStateMap = new 
HashMap<InetAddress, EndpointState>();
+        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<InetAddressAndPort, EndpointState>();
         for (GossipDigest gDigest : gDigestList)
         {
-            InetAddress addr = gDigest.getEndpoint();
+            InetAddressAndPort addr = gDigest.getEndpoint();
             EndpointState localEpStatePtr = 
Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
             if (localEpStatePtr != null)
                 deltaEpStateMap.put(addr, localEpStatePtr);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index ddfafc9..9619f4e 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.collect.Maps;
@@ -26,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -37,7 +37,7 @@ public class GossipDigestSynVerbHandler implements 
IVerbHandler<GossipDigestSyn>
 
     public void doVerb(MessageIn<GossipDigestSyn> message, int id)
     {
-        InetAddress from = message.from;
+        InetAddressAndPort from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestSynMessage from {}", from);
         if (!Gossiper.instance.isEnabled() && 
!Gossiper.instance.isInShadowRound())
@@ -102,7 +102,7 @@ public class GossipDigestSynVerbHandler implements 
IVerbHandler<GossipDigestSyn>
         doSort(gDigestList);
 
         List<GossipDigest> deltaGossipDigestList = new 
ArrayList<GossipDigest>();
-        Map<InetAddress, EndpointState> deltaEpStateMap = new 
HashMap<InetAddress, EndpointState>();
+        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<InetAddressAndPort, EndpointState>();
         Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, 
deltaEpStateMap);
         logger.trace("sending {} digests and {} deltas", 
deltaGossipDigestList.size(), deltaEpStateMap.size());
         MessageOut<GossipDigestAck> gDigestAckMessage = new 
MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
@@ -116,14 +116,14 @@ public class GossipDigestSynVerbHandler implements 
IVerbHandler<GossipDigestSyn>
     /*
      * First construct a map whose key is the endpoint in the GossipDigest and 
the value is the
      * GossipDigest itself. Then build a list of version differences i.e 
difference between the
-     * version in the GossipDigest and the version in the local state for a 
given InetAddress.
+     * version in the GossipDigest and the version in the local state for a 
given InetAddressAndPort.
      * Sort this list. Now loop through the sorted list and retrieve the 
GossipDigest corresponding
      * to the endpoint from the map that was initially constructed.
     */
     private void doSort(List<GossipDigest> gDigestList)
     {
         /* Construct a map of endpoint to GossipDigest. */
-        Map<InetAddress, GossipDigest> epToDigestMap = 
Maps.newHashMapWithExpectedSize(gDigestList.size());
+        Map<InetAddressAndPort, GossipDigest> epToDigestMap = 
Maps.newHashMapWithExpectedSize(gDigestList.size());
         for (GossipDigest gDigest : gDigestList)
         {
             epToDigestMap.put(gDigest.getEndpoint(), gDigest);
@@ -136,7 +136,7 @@ public class GossipDigestSynVerbHandler implements 
IVerbHandler<GossipDigestSyn>
         List<GossipDigest> diffDigests = new 
ArrayList<GossipDigest>(gDigestList.size());
         for (GossipDigest gDigest : gDigestList)
         {
-            InetAddress ep = gDigest.getEndpoint();
+            InetAddressAndPort ep = gDigest.getEndpoint();
             EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(ep);
             int version = (epState != null) ? 
Gossiper.instance.getMaxEndpointStateVersion(epState) : 0;
             int diffVersion = Math.abs(version - gDigest.getMaxVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index e675d92..eb6c500 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.gms;
 
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 import javax.management.MBeanServer;
@@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
@@ -99,43 +100,36 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     static final int MAX_GENERATION_DIFFERENCE = 86400 * 365;
     private long fatClientTimeout;
     private final Random random = new Random();
-    private final Comparator<InetAddress> inetcomparator = new 
Comparator<InetAddress>()
-    {
-        public int compare(InetAddress addr1, InetAddress addr2)
-        {
-            return addr1.getHostAddress().compareTo(addr2.getHostAddress());
-        }
-    };
 
     /* subscribers for interest in EndpointState change */
     private final List<IEndpointStateChangeSubscriber> subscribers = new 
CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
 
     /* live member set */
-    private final Set<InetAddress> liveEndpoints = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
+    private final Set<InetAddressAndPort> liveEndpoints = new 
ConcurrentSkipListSet<>();
 
     /* unreachable member set */
-    private final Map<InetAddress, Long> unreachableEndpoints = new 
ConcurrentHashMap<InetAddress, Long>();
+    private final Map<InetAddressAndPort, Long> unreachableEndpoints = new 
ConcurrentHashMap<>();
 
     /* initial seeds for joining the cluster */
     @VisibleForTesting
-    final Set<InetAddress> seeds = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
+    final Set<InetAddressAndPort> seeds = new ConcurrentSkipListSet<>();
 
     /* map where key is the endpoint and value is the state associated with 
the endpoint */
-    final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new 
ConcurrentHashMap<InetAddress, EndpointState>();
+    final ConcurrentMap<InetAddressAndPort, EndpointState> endpointStateMap = 
new ConcurrentHashMap<>();
 
     /* map where key is endpoint and value is timestamp when this endpoint was 
removed from
      * gossip. We will ignore any gossip regarding these endpoints for 
QUARANTINE_DELAY time
      * after removal to prevent nodes from falsely reincarnating during the 
time when removal
      * gossip gets propagated to all nodes */
-    private final Map<InetAddress, Long> justRemovedEndpoints = new 
ConcurrentHashMap<InetAddress, Long>();
+    private final Map<InetAddressAndPort, Long> justRemovedEndpoints = new 
ConcurrentHashMap<>();
 
-    private final Map<InetAddress, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<InetAddress, Long>();
+    private final Map<InetAddressAndPort, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<>();
 
     private volatile boolean inShadowRound = false;
     // seeds gathered during shadow round that indicated to be in the shadow 
round phase as well
-    private final Set<InetAddress> seedsInShadowRound = new 
ConcurrentSkipListSet<>(inetcomparator);
+    private final Set<InetAddressAndPort> seedsInShadowRound = new 
ConcurrentSkipListSet<>();
     // endpoint states as gathered during shadow round
-    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new 
ConcurrentHashMap<>();
+    private final Map<InetAddressAndPort, EndpointState> 
endpointShadowStateMap = new ConcurrentHashMap<>();
 
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
@@ -151,9 +145,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 taskLock.lock();
 
                 /* Update the local heartbeat counter. */
-                
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
+                
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
                 if (logger.isTraceEnabled())
-                    logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
+                    logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
                 final List<GossipDigest> gDigests = new 
ArrayList<GossipDigest>();
                 Gossiper.instance.makeRandomGossipDigest(gDigests);
 
@@ -231,14 +225,24 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     public boolean seenAnySeed()
     {
-        for (Map.Entry<InetAddress, EndpointState> entry : 
endpointStateMap.entrySet())
+        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
endpointStateMap.entrySet())
         {
             if (seeds.contains(entry.getKey()))
                 return true;
             try
             {
                 VersionedValue internalIp = 
entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
-                if (internalIp != null && 
seeds.contains(InetAddress.getByName(internalIp.value)))
+                VersionedValue internalIpAndPort = 
entry.getValue().getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+                InetAddressAndPort endpoint = null;
+                if (internalIpAndPort != null)
+                {
+                    endpoint = 
InetAddressAndPort.getByName(internalIpAndPort.value);
+                }
+                else if (internalIp != null)
+                {
+                    endpoint = InetAddressAndPort.getByName(internalIp.value);
+                }
+                if (endpoint != null && seeds.contains(endpoint))
                     return true;
             }
             catch (UnknownHostException e)
@@ -272,18 +276,18 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * @return a list of live gossip participants, including fat clients
      */
-    public Set<InetAddress> getLiveMembers()
+    public Set<InetAddressAndPort> getLiveMembers()
     {
-        Set<InetAddress> liveMembers = new HashSet<>(liveEndpoints);
-        if (!liveMembers.contains(FBUtilities.getBroadcastAddress()))
-            liveMembers.add(FBUtilities.getBroadcastAddress());
+        Set<InetAddressAndPort> liveMembers = new HashSet<>(liveEndpoints);
+        if (!liveMembers.contains(FBUtilities.getBroadcastAddressAndPort()))
+            liveMembers.add(FBUtilities.getBroadcastAddressAndPort());
         return liveMembers;
     }
 
     /**
      * @return a list of live ring members.
      */
-    public Set<InetAddress> getLiveTokenOwners()
+    public Set<InetAddressAndPort> getLiveTokenOwners()
     {
         return StorageService.instance.getLiveRingMembers(true);
     }
@@ -291,7 +295,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * @return a list of unreachable gossip participants, including fat clients
      */
-    public Set<InetAddress> getUnreachableMembers()
+    public Set<InetAddressAndPort> getUnreachableMembers()
     {
         return unreachableEndpoints.keySet();
     }
@@ -299,10 +303,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * @return a list of unreachable token owners
      */
-    public Set<InetAddress> getUnreachableTokenOwners()
+    public Set<InetAddressAndPort> getUnreachableTokenOwners()
     {
-        Set<InetAddress> tokenOwners = new HashSet<>();
-        for (InetAddress endpoint : unreachableEndpoints.keySet())
+        Set<InetAddressAndPort> tokenOwners = new HashSet<>();
+        for (InetAddressAndPort endpoint : unreachableEndpoints.keySet())
         {
             if (StorageService.instance.getTokenMetadata().isMember(endpoint))
                 tokenOwners.add(endpoint);
@@ -311,7 +315,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return tokenOwners;
     }
 
-    public long getEndpointDowntime(InetAddress ep)
+    public long getEndpointDowntime(InetAddressAndPort ep)
     {
         Long downtime = unreachableEndpoints.get(ep);
         if (downtime != null)
@@ -320,14 +324,25 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             return 0L;
     }
 
-    private boolean isShutdown(InetAddress endpoint)
+    private boolean isShutdown(InetAddressAndPort endpoint)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
+        {
             return false;
-        if (epState.getApplicationState(ApplicationState.STATUS) == null)
-            return false;
-        String value = 
epState.getApplicationState(ApplicationState.STATUS).value;
+        }
+
+        VersionedValue versionedValue = 
epState.getApplicationState(ApplicationState.STATUS_WITH_PORT);
+        if (versionedValue == null)
+        {
+            versionedValue = 
epState.getApplicationState(ApplicationState.STATUS);
+            if (versionedValue == null)
+            {
+                return false;
+            }
+        }
+
+        String value = versionedValue.value;
         String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
         assert (pieces.length > 0);
         String state = pieces[0];
@@ -340,7 +355,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      *
      * @param endpoint end point that is convicted.
      */
-    public void convict(InetAddress endpoint, double phi)
+    public void convict(InetAddressAndPort endpoint, double phi)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
@@ -366,11 +381,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * This method is used to mark a node as shutdown; that is it gracefully 
exited on its own and told us about it
      * @param endpoint endpoint that has shut itself down
      */
-    protected void markAsShutdown(InetAddress endpoint)
+    protected void markAsShutdown(InetAddressAndPort endpoint)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
             return;
+        epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.shutdown(true));
         epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.shutdown(true));
         epState.addApplicationState(ApplicationState.RPC_READY, 
StorageService.instance.valueFactory.rpcReady(false));
         epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
@@ -397,7 +413,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      *
      * @param endpoint endpoint to be removed from the current membership.
      */
-    private void evictFromMembership(InetAddress endpoint)
+    private void evictFromMembership(InetAddressAndPort endpoint)
     {
         unreachableEndpoints.remove(endpoint);
         endpointStateMap.remove(endpoint);
@@ -411,7 +427,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * Removes the endpoint from Gossip but retains endpoint state
      */
-    public void removeEndpoint(InetAddress endpoint)
+    public void removeEndpoint(InetAddressAndPort endpoint)
     {
         // do subscribers first so anything in the subscriber that depends on 
gossiper state won't get confused
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
@@ -438,7 +454,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      *
      * @param endpoint
      */
-    private void quarantineEndpoint(InetAddress endpoint)
+    private void quarantineEndpoint(InetAddressAndPort endpoint)
     {
         quarantineEndpoint(endpoint, System.currentTimeMillis());
     }
@@ -449,7 +465,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param endpoint
      * @param quarantineExpiration
      */
-    private void quarantineEndpoint(InetAddress endpoint, long 
quarantineExpiration)
+    private void quarantineEndpoint(InetAddressAndPort endpoint, long 
quarantineExpiration)
     {
         justRemovedEndpoints.put(endpoint, quarantineExpiration);
     }
@@ -458,7 +474,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * Quarantine endpoint specifically for replacement purposes.
      * @param endpoint
      */
-    public void replacementQuarantine(InetAddress endpoint)
+    public void replacementQuarantine(InetAddressAndPort endpoint)
     {
         // remember, quarantineEndpoint will effectively already add 
QUARANTINE_DELAY, so this is 2x
         logger.debug("");
@@ -471,7 +487,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      *
      * @param endpoint The endpoint that has been replaced
      */
-    public void replacedEndpoint(InetAddress endpoint)
+    public void replacedEndpoint(InetAddressAndPort endpoint)
     {
         removeEndpoint(endpoint);
         evictFromMembership(endpoint);
@@ -491,9 +507,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         int maxVersion = 0;
 
         // local epstate will be part of endpointStateMap
-        List<InetAddress> endpoints = new 
ArrayList<InetAddress>(endpointStateMap.keySet());
+        List<InetAddressAndPort> endpoints = new 
ArrayList<>(endpointStateMap.keySet());
         Collections.shuffle(endpoints, random);
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             epState = endpointStateMap.get(endpoint);
             if (epState != null)
@@ -524,7 +540,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param hostId      - the ID of the host being removed
      * @param localHostId - my own host ID for replication coordination
      */
-    public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID 
localHostId)
+    public void advertiseRemoving(InetAddressAndPort endpoint, UUID hostId, 
UUID localHostId)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         // remember this node's generation
@@ -541,6 +557,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         epState.updateTimestamp(); // make sure we don't evict it too soon
         epState.getHeartBeatState().forceNewerGenerationUnsafe();
         Map<ApplicationState, VersionedValue> states = new 
EnumMap<>(ApplicationState.class);
+        states.put(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
         states.put(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
         states.put(ApplicationState.REMOVAL_COORDINATOR, 
StorageService.instance.valueFactory.removalCoordinator(localHostId));
         epState.addApplicationStates(states);
@@ -554,12 +571,13 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param endpoint
      * @param hostId
      */
-    public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId)
+    public void advertiseTokenRemoved(InetAddressAndPort endpoint, UUID hostId)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
         epState.getHeartBeatState().forceNewerGenerationUnsafe();
         long expireTime = computeExpireTime();
+        epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
         epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
         logger.info("Completing removal of {}", endpoint);
         addExpireTimeForEndpoint(endpoint, expireTime);
@@ -584,7 +602,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      */
     public void assassinateEndpoint(String address) throws UnknownHostException
     {
-        InetAddress endpoint = InetAddress.getByName(address);
+        InetAddressAndPort endpoint = InetAddressAndPort.getByName(address);
         EndpointState epState = endpointStateMap.get(endpoint);
         Collection<Token> tokens = null;
         logger.warn("Assassinating {} via gossip", endpoint);
@@ -624,18 +642,20 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
 
         // do not pass go, do not collect 200 dollars, just gtfo
-        epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
+        long expireTime = computeExpireTime();
+        epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.left(tokens, expireTime));
+        epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.left(tokens, expireTime));
         handleMajorStateChange(endpoint, epState);
         Uninterruptibles.sleepUninterruptibly(intervalInMillis * 4, 
TimeUnit.MILLISECONDS);
         logger.warn("Finished assassinating {}", endpoint);
     }
 
-    public boolean isKnownEndpoint(InetAddress endpoint)
+    public boolean isKnownEndpoint(InetAddressAndPort endpoint)
     {
         return endpointStateMap.containsKey(endpoint);
     }
 
-    public int getCurrentGenerationNumber(InetAddress endpoint)
+    public int getCurrentGenerationNumber(InetAddressAndPort endpoint)
     {
         return 
endpointStateMap.get(endpoint).getHeartBeatState().getGeneration();
     }
@@ -647,16 +667,16 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param epSet   a set of endpoint from which a random endpoint is chosen.
      * @return true if the chosen endpoint is also a seed.
      */
-    private boolean sendGossip(MessageOut<GossipDigestSyn> message, 
Set<InetAddress> epSet)
+    private boolean sendGossip(MessageOut<GossipDigestSyn> message, 
Set<InetAddressAndPort> epSet)
     {
-        List<InetAddress> liveEndpoints = ImmutableList.copyOf(epSet);
+        List<InetAddressAndPort> liveEndpoints = ImmutableList.copyOf(epSet);
 
         int size = liveEndpoints.size();
         if (size < 1)
             return false;
         /* Generate a random number from 0 -> size */
         int index = (size == 1) ? 0 : random.nextInt(size);
-        InetAddress to = liveEndpoints.get(index);
+        InetAddressAndPort to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
         if (firstSynSendAt == 0)
@@ -695,7 +715,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         int size = seeds.size();
         if (size > 0)
         {
-            if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddress()))
+            if (size == 1 && 
seeds.contains(FBUtilities.getBroadcastAddressAndPort()))
             {
                 return;
             }
@@ -715,7 +735,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    public boolean isGossipOnlyMember(InetAddress endpoint)
+    public boolean isGossipOnlyMember(InetAddressAndPort endpoint)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
@@ -740,8 +760,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param epStates - endpoint states in the cluster
      * @return true if it is safe to start the node, false otherwise
      */
-    public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, 
boolean isBootstrapping,
-                                    Map<InetAddress, EndpointState> epStates)
+    public boolean isSafeForStartup(InetAddressAndPort endpoint, UUID 
localHostUUID, boolean isBootstrapping,
+                                    Map<InetAddressAndPort, EndpointState> 
epStates)
     {
         EndpointState epState = epStates.get(endpoint);
         // if there's no previous state, or the node was previously removed 
from the cluster, we're good
@@ -792,10 +812,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             }
         }
 
-        Set<InetAddress> eps = endpointStateMap.keySet();
-        for (InetAddress endpoint : eps)
+        Set<InetAddressAndPort> eps = endpointStateMap.keySet();
+        for (InetAddressAndPort endpoint : eps)
         {
-            if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+            if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                 continue;
 
             FailureDetector.instance.interpret(endpoint);
@@ -829,7 +849,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
         if (!justRemovedEndpoints.isEmpty())
         {
-            for (Entry<InetAddress, Long> entry : 
justRemovedEndpoints.entrySet())
+            for (Entry<InetAddressAndPort, Long> entry : 
justRemovedEndpoints.entrySet())
             {
                 if ((now - entry.getValue()) > QUARANTINE_DELAY)
                 {
@@ -841,34 +861,34 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    protected long getExpireTimeForEndpoint(InetAddress endpoint)
+    protected long getExpireTimeForEndpoint(InetAddressAndPort endpoint)
     {
         /* default expireTime is aVeryLongTime */
         Long storedTime = expireTimeEndpointMap.get(endpoint);
         return storedTime == null ? computeExpireTime() : storedTime;
     }
 
-    public EndpointState getEndpointStateForEndpoint(InetAddress ep)
+    public EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep)
     {
         return endpointStateMap.get(ep);
     }
 
-    public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
+    public Set<Entry<InetAddressAndPort, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
     }
 
-    public UUID getHostId(InetAddress endpoint)
+    public UUID getHostId(InetAddressAndPort endpoint)
     {
         return getHostId(endpoint, endpointStateMap);
     }
 
-    public UUID getHostId(InetAddress endpoint, Map<InetAddress, 
EndpointState> epStates)
+    public UUID getHostId(InetAddressAndPort endpoint, Map<InetAddressAndPort, 
EndpointState> epStates)
     {
         return 
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
-    EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
+    EndpointState getStateForVersionBiggerThan(InetAddressAndPort forEndpoint, 
int version)
     {
         EndpointState epState = endpointStateMap.get(forEndpoint);
         EndpointState reqdEndpointState = null;
@@ -919,7 +939,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * determine which endpoint started up earlier
      */
-    public int compareEndpointStartup(InetAddress addr1, InetAddress addr2)
+    public int compareEndpointStartup(InetAddressAndPort addr1, 
InetAddressAndPort addr2)
     {
         EndpointState ep1 = getEndpointStateForEndpoint(addr1);
         EndpointState ep2 = getEndpointStateForEndpoint(addr2);
@@ -927,15 +947,15 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return ep1.getHeartBeatState().getGeneration() - 
ep2.getHeartBeatState().getGeneration();
     }
 
-    void notifyFailureDetector(Map<InetAddress, EndpointState> 
remoteEpStateMap)
+    void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> 
remoteEpStateMap)
     {
-        for (Entry<InetAddress, EndpointState> entry : 
remoteEpStateMap.entrySet())
+        for (Entry<InetAddressAndPort, EndpointState> entry : 
remoteEpStateMap.entrySet())
         {
             notifyFailureDetector(entry.getKey(), entry.getValue());
         }
     }
 
-    void notifyFailureDetector(InetAddress endpoint, EndpointState 
remoteEndpointState)
+    void notifyFailureDetector(InetAddressAndPort endpoint, EndpointState 
remoteEndpointState)
     {
         EndpointState localEndpointState = endpointStateMap.get(endpoint);
         /*
@@ -976,7 +996,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     }
 
-    private void markAlive(final InetAddress addr, final EndpointState 
localState)
+    private void markAlive(final InetAddressAndPort addr, final EndpointState 
localState)
     {
         localState.markDead();
 
@@ -999,7 +1019,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     @VisibleForTesting
-    public void realMarkAlive(final InetAddress addr, final EndpointState 
localState)
+    public void realMarkAlive(final InetAddressAndPort addr, final 
EndpointState localState)
     {
         if (logger.isTraceEnabled())
             logger.trace("marking as alive {}", addr);
@@ -1017,7 +1037,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     @VisibleForTesting
-    public void markDead(InetAddress addr, EndpointState localState)
+    public void markDead(InetAddressAndPort addr, EndpointState localState)
     {
         if (logger.isTraceEnabled())
             logger.trace("marking as down {}", addr);
@@ -1037,7 +1057,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param ep      endpoint
      * @param epState EndpointState for the endpoint
      */
-    private void handleMajorStateChange(InetAddress ep, EndpointState epState)
+    private void handleMajorStateChange(InetAddressAndPort ep, EndpointState 
epState)
     {
         EndpointState localEpState = endpointStateMap.get(ep);
         if (!isDeadState(epState))
@@ -1071,7 +1091,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             markAsShutdown(ep);
     }
 
-    public boolean isAlive(InetAddress endpoint)
+    public boolean isAlive(InetAddressAndPort endpoint)
     {
         EndpointState epState = getEndpointStateForEndpoint(endpoint);
         if (epState == null)
@@ -1099,21 +1119,33 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     private static String getGossipStatus(EndpointState epState)
     {
-        if (epState == null || 
epState.getApplicationState(ApplicationState.STATUS) == null)
+        if (epState == null)
+        {
             return "";
+        }
+
+        VersionedValue versionedValue = 
epState.getApplicationState(ApplicationState.STATUS_WITH_PORT);
+        if (versionedValue == null)
+        {
+            versionedValue = 
epState.getApplicationState(ApplicationState.STATUS);
+            if (versionedValue == null)
+            {
+                return "";
+            }
+        }
 
-        String value = 
epState.getApplicationState(ApplicationState.STATUS).value;
+        String value = versionedValue.value;
         String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
         assert (pieces.length > 0);
         return pieces[0];
     }
 
-    void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
+    void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
     {
-        for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
+        for (Entry<InetAddressAndPort, EndpointState> entry : 
epStateMap.entrySet())
         {
-            InetAddress ep = entry.getKey();
-            if ( ep.equals(FBUtilities.getBroadcastAddress()) && 
!isInShadowRound())
+            InetAddressAndPort ep = entry.getKey();
+            if ( ep.equals(FBUtilities.getBroadcastAddressAndPort()) && 
!isInShadowRound())
                 continue;
             if (justRemovedEndpoints.containsKey(ep))
             {
@@ -1181,7 +1213,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    private void applyNewStates(InetAddress addr, EndpointState localState, 
EndpointState remoteState)
+    private void applyNewStates(InetAddressAndPort addr, EndpointState 
localState, EndpointState remoteState)
     {
         // don't assert here, since if the node restarts the version will go 
back to zero
         int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
@@ -1194,12 +1226,27 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         assert remoteState.getHeartBeatState().getGeneration() == 
localState.getHeartBeatState().getGeneration();
         localState.addApplicationStates(remoteStates);
 
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : 
remoteStates)
+        //Filter out pre-4.0 versions of data for more complete 4.0 versions
+        Set<Entry<ApplicationState, VersionedValue>> filtered = 
remoteStates.stream().filter(entry -> {
+           switch (entry.getKey())
+           {
+               case INTERNAL_IP:
+                    return 
remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) == 
null;
+               case STATUS:
+                   return 
remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null;
+               case RPC_ADDRESS:
+                   return 
remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) == 
null;
+               default:
+                   return true;
+           }
+        }).collect(Collectors.toSet());
+
+        for (Entry<ApplicationState, VersionedValue> remoteEntry : filtered)
             doOnChangeNotifications(addr, remoteEntry.getKey(), 
remoteEntry.getValue());
     }
 
     // notify that a local application state is going to change (doesn't get 
triggered for remote changes)
-    private void doBeforeChangeNotifications(InetAddress addr, EndpointState 
epState, ApplicationState apState, VersionedValue newValue)
+    private void doBeforeChangeNotifications(InetAddressAndPort addr, 
EndpointState epState, ApplicationState apState, VersionedValue newValue)
     {
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
         {
@@ -1208,7 +1255,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     // notify that an application state has changed
-    private void doOnChangeNotifications(InetAddress addr, ApplicationState 
state, VersionedValue value)
+    private void doOnChangeNotifications(InetAddressAndPort addr, 
ApplicationState state, VersionedValue value)
     {
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
         {
@@ -1226,7 +1273,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     /* Send all the data with version greater than maxRemoteVersion */
-    private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> 
deltaEpStateMap, int maxRemoteVersion)
+    private void sendAll(GossipDigest gDigest, Map<InetAddressAndPort, 
EndpointState> deltaEpStateMap, int maxRemoteVersion)
     {
         EndpointState localEpStatePtr = 
getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion);
         if (localEpStatePtr != null)
@@ -1237,7 +1284,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         This method is used to figure the state that the Gossiper has but 
Gossipee doesn't. The delta digests
         and the delta state are built up.
     */
-    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> 
deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
+    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> 
deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap)
     {
         if (gDigestList.size() == 0)
         {
@@ -1245,7 +1292,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
               If this is happening then the node is attempting shadow gossip, 
and we should reply with everything we know.
             */
             logger.debug("Shadow request received, adding all states");
-            for (Map.Entry<InetAddress, EndpointState> entry : 
endpointStateMap.entrySet())
+            for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
endpointStateMap.entrySet())
             {
                 gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
             }
@@ -1320,7 +1367,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         buildSeedsList();
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
-        EndpointState localState = 
endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        EndpointState localState = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
         localState.addApplicationStates(preloadLocalStates);
 
         //notify snitches that Gossiper is about to start
@@ -1345,14 +1392,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * </ul>
      *
      * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
-     * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, 
boolean, Map)}. This will update
+     * again by calling {@link 
Gossiper#maybeFinishShadowRound(InetAddressAndPort, boolean, Map)}. This will 
update
      * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
      * the same time.
      *
      * @return endpoint states gathered during shadow round or empty map
      */
-    public synchronized Map<InetAddress, EndpointState> doShadowRound()
+    public synchronized Map<InetAddressAndPort, EndpointState> doShadowRound()
     {
         buildSeedsList();
         // it may be that the local address is the only entry in the seed
@@ -1381,7 +1428,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 { // CASSANDRA-8072, retry at the beginning and every 5 seconds
                     logger.trace("Sending shadow round GOSSIP DIGEST SYN to 
seeds {}", seeds);
 
-                    for (InetAddress seed : seeds)
+                    for (InetAddressAndPort seed : seeds)
                         MessagingService.instance().sendOneWay(message, seed);
                 }
 
@@ -1393,8 +1440,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 if (slept > StorageService.RING_DELAY)
                 {
                     // if we don't consider ourself to be a seed, fail out
-                    if 
(!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
-                        throw new RuntimeException("Unable to gossip with any 
seeds");
+                    if 
(!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()))
+                        throw new RuntimeException("Unable to gossip with any 
seeds " + DatabaseDescriptor.getSeeds() + " and " + 
FBUtilities.getBroadcastAddressAndPort());
 
                     logger.warn("Unable to gossip with any seeds but 
continuing since node is in its own seed list");
                     inShadowRound = false;
@@ -1413,9 +1460,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     @VisibleForTesting
     void buildSeedsList()
     {
-        for (InetAddress seed : DatabaseDescriptor.getSeeds())
+        for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds())
         {
-            if (seed.equals(FBUtilities.getBroadcastAddress()))
+            if (seed.equals(FBUtilities.getBroadcastAddressAndPort()))
                 continue;
             seeds.add(seed);
         }
@@ -1427,12 +1474,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         HeartBeatState hbState = new HeartBeatState(generationNbr);
         EndpointState localState = new EndpointState(hbState);
         localState.markAlive();
-        endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), 
localState);
+        endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddressAndPort(), 
localState);
     }
 
     public void forceNewerGeneration()
     {
-        EndpointState epstate = 
endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        EndpointState epstate = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
         epstate.getHeartBeatState().forceNewerGenerationUnsafe();
     }
 
@@ -1440,9 +1487,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     /**
      * Add an endpoint we knew about previously, but whose state is unknown
      */
-    public void addSavedEndpoint(InetAddress ep)
+    public void addSavedEndpoint(InetAddressAndPort ep)
     {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.debug("Attempt to add self as saved endpoint");
             return;
@@ -1470,8 +1517,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     private void addLocalApplicationStateInternal(ApplicationState state, 
VersionedValue value)
     {
         assert taskLock.isHeldByCurrentThread();
-        EndpointState epState = 
endpointStateMap.get(FBUtilities.getBroadcastAddress());
-        InetAddress epAddr = FBUtilities.getBroadcastAddress();
+        EndpointState epState = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
+        InetAddressAndPort epAddr = FBUtilities.getBroadcastAddressAndPort();
         assert epState != null;
         // Fire "before change" notifications:
         doBeforeChangeNotifications(epAddr, epState, state, value);
@@ -1508,13 +1555,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     public void stop()
     {
-        EndpointState mystate = 
endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        EndpointState mystate = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
         if (mystate != null && !isSilentShutdownState(mystate) && 
StorageService.instance.isJoined())
         {
             logger.info("Announcing shutdown");
+            addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.shutdown(true));
             addLocalApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.shutdown(true));
             MessageOut message = new 
MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN);
-            for (InetAddress ep : liveEndpoints)
+            for (InetAddressAndPort ep : liveEndpoints)
                 MessagingService.instance().sendOneWay(message, ep);
             
Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms",
 2000), TimeUnit.MILLISECONDS);
         }
@@ -1529,7 +1577,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && 
(!scheduledGossipTask.isCancelled());
     }
 
-    protected void maybeFinishShadowRound(InetAddress respondent, boolean 
isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
+    protected void maybeFinishShadowRound(InetAddressAndPort respondent, 
boolean isInShadowRound, Map<InetAddressAndPort, EndpointState> epStateMap)
     {
         if (inShadowRound)
         {
@@ -1565,7 +1613,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     @VisibleForTesting
-    public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int 
generationNbr)
+    public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int 
generationNbr)
     {
         HeartBeatState hbState = new HeartBeatState(generationNbr);
         EndpointState newState = new EndpointState(hbState);
@@ -1581,7 +1629,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     @VisibleForTesting
-    public void injectApplicationState(InetAddress endpoint, ApplicationState 
state, VersionedValue value)
+    public void injectApplicationState(InetAddressAndPort endpoint, 
ApplicationState state, VersionedValue value)
     {
         EndpointState localState = endpointStateMap.get(endpoint);
         localState.addApplicationState(state, value);
@@ -1589,15 +1637,15 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     public long getEndpointDowntime(String address) throws UnknownHostException
     {
-        return getEndpointDowntime(InetAddress.getByName(address));
+        return getEndpointDowntime(InetAddressAndPort.getByName(address));
     }
 
     public int getCurrentGenerationNumber(String address) throws 
UnknownHostException
     {
-        return getCurrentGenerationNumber(InetAddress.getByName(address));
+        return 
getCurrentGenerationNumber(InetAddressAndPort.getByName(address));
     }
 
-    public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime)
+    public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long 
expireTime)
     {
         if (logger.isDebugEnabled())
         {
@@ -1612,14 +1660,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     @Nullable
-    public CassandraVersion getReleaseVersion(InetAddress ep)
+    public CassandraVersion getReleaseVersion(InetAddressAndPort ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
         return state != null ? state.getReleaseVersion() : null;
     }
 
     @Nullable
-    public UUID getSchemaVersion(InetAddress ep)
+    public UUID getSchemaVersion(InetAddressAndPort ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
         return state != null ? state.getSchemaVersion() : null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java 
b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index 1bfd678..dc81650 100644
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * This is called by an instance of the IEndpointStateChangePublisher to notify
@@ -36,17 +36,17 @@ public interface IEndpointStateChangeSubscriber
      * @param endpoint endpoint for which the state change occurred.
      * @param epState  state that actually changed for the above endpoint.
      */
-    public void onJoin(InetAddress endpoint, EndpointState epState);
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState);
     
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, 
ApplicationState newStateKey, VersionedValue newValue);
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue);
 
-    public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value);
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, 
VersionedValue value);
 
-    public void onAlive(InetAddress endpoint, EndpointState state);
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state);
 
-    public void onDead(InetAddress endpoint, EndpointState state);
+    public void onDead(InetAddressAndPort endpoint, EndpointState state);
 
-    public void onRemove(InetAddress endpoint);
+    public void onRemove(InetAddressAndPort endpoint);
 
     /**
      * Called whenever a node is restarted.
@@ -54,5 +54,5 @@ public interface IEndpointStateChangeSubscriber
      * previously marked down. It will have only if {@code state.isAlive() == 
false}
      * as {@code state} is from before the restarted node is marked up.
      */
-    public void onRestart(InetAddress endpoint, EndpointState state);
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java 
b/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
index 8b274b6..4e0c663 100644
--- a/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
+++ b/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Implemented by the Gossiper to convict an endpoint
@@ -33,5 +33,5 @@ public interface IFailureDetectionEventListener
      * @param ep  endpoint to be convicted
      * @param phi the value of phi with with ep was convicted
      */
-    public void convict(InetAddress ep, double phi);
+    public void convict(InetAddressAndPort ep, double phi);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/IFailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/IFailureDetector.java 
b/src/java/org/apache/cassandra/gms/IFailureDetector.java
index a860c7c..62fc97d 100644
--- a/src/java/org/apache/cassandra/gms/IFailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/IFailureDetector.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.gms;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * An interface that provides an application with the ability
@@ -35,7 +35,7 @@ public interface IFailureDetector
      * @param ep endpoint in question.
      * @return true if UP and false if DOWN.
      */
-    public boolean isAlive(InetAddress ep);
+    public boolean isAlive(InetAddressAndPort ep);
 
     /**
      * This method is invoked by any entity wanting to interrogate the status 
of an endpoint.
@@ -44,7 +44,7 @@ public interface IFailureDetector
      *
      * param ep endpoint for which we interpret the inter arrival times.
      */
-    public void interpret(InetAddress ep);
+    public void interpret(InetAddressAndPort ep);
 
     /**
      * This method is invoked by the receiver of the heartbeat. In our case it 
would be
@@ -53,17 +53,17 @@ public interface IFailureDetector
      *
      * param ep endpoint being reported.
      */
-    public void report(InetAddress ep);
+    public void report(InetAddressAndPort ep);
 
     /**
      * remove endpoint from failure detector
      */
-    public void remove(InetAddress ep);
+    public void remove(InetAddressAndPort ep);
 
     /**
      * force conviction of endpoint in the failure detector
      */
-    public void forceConviction(InetAddress ep);
+    public void forceConviction(InetAddressAndPort ep);
 
     /**
      * Register interest for Failure Detector events.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d9c8d0b..691f544 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang3.StringUtils;
@@ -133,11 +134,17 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             return new VersionedValue(value.value);
         }
 
+        @Deprecated
         public VersionedValue bootReplacing(InetAddress oldNode)
         {
             return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.getHostAddress()));
         }
 
+        public VersionedValue bootReplacingWithPort(InetAddressAndPort oldNode)
+        {
+            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.toString()));
+        }
+
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {
             return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
@@ -248,6 +255,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             return new VersionedValue(endpoint.getHostAddress());
         }
 
+        public VersionedValue nativeaddressAndPort(InetAddressAndPort address)
+        {
+            return new VersionedValue(address.toString());
+        }
+
         public VersionedValue releaseVersion()
         {
             return new VersionedValue(FBUtilities.getReleaseVersionString());
@@ -263,6 +275,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             return new VersionedValue(private_ip);
         }
 
+        public VersionedValue internalAddressAndPort(InetAddressAndPort 
address)
+        {
+            return new VersionedValue(address.toString());
+        }
+
         public VersionedValue severity(double value)
         {
             return new VersionedValue(String.valueOf(value));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to