http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java 
b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
index ce05e93..c7d45bf 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Basic idea is that we track incoming ranges instead of blindly just 
exchanging the ranges that mismatch between two nodes
@@ -47,19 +47,19 @@ public class ReduceHelper
     /**
      * Reduces the differences provided by the merkle trees to a minimum set 
of differences
      */
-    public static ImmutableMap<InetAddress, HostDifferences> 
reduce(DifferenceHolder differences, PreferedNodeFilter filter)
+    public static ImmutableMap<InetAddressAndPort, HostDifferences> 
reduce(DifferenceHolder differences, PreferedNodeFilter filter)
     {
-        Map<InetAddress, IncomingRepairStreamTracker> trackers = 
createIncomingRepairStreamTrackers(differences);
-        Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>();
-        ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = 
ImmutableMap.builder();
-        for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry 
: trackers.entrySet())
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = 
createIncomingRepairStreamTrackers(differences);
+        Map<InetAddressAndPort, Integer> outgoingStreamCounts = new 
HashMap<>();
+        ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = 
ImmutableMap.builder();
+        for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> 
trackerEntry : trackers.entrySet())
         {
             IncomingRepairStreamTracker tracker = trackerEntry.getValue();
             HostDifferences rangesToFetch = new HostDifferences();
             for (Map.Entry<Range<Token>, StreamFromOptions> entry : 
tracker.getIncoming().entrySet())
             {
                 Range<Token> rangeToFetch = entry.getKey();
-                for (InetAddress remoteNode : 
pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), 
outgoingStreamCounts, filter))
+                for (InetAddressAndPort remoteNode : 
pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), 
outgoingStreamCounts, filter))
                     rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
             }
             mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
@@ -69,14 +69,14 @@ public class ReduceHelper
     }
 
     @VisibleForTesting
-    static Map<InetAddress, IncomingRepairStreamTracker> 
createIncomingRepairStreamTrackers(DifferenceHolder differences)
+    static Map<InetAddressAndPort, IncomingRepairStreamTracker> 
createIncomingRepairStreamTrackers(DifferenceHolder differences)
     {
-        Map<InetAddress, IncomingRepairStreamTracker> trackers = new 
HashMap<>();
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = new 
HashMap<>();
 
-        for (InetAddress hostWithDifference : differences.keyHosts())
+        for (InetAddressAndPort hostWithDifference : differences.keyHosts())
         {
             HostDifferences hostDifferences = 
differences.get(hostWithDifference);
-            for (InetAddress differingHost : hostDifferences.hosts())
+            for (InetAddressAndPort differingHost : hostDifferences.hosts())
             {
                 List<Range<Token>> differingRanges = 
hostDifferences.get(differingHost);
                 // hostWithDifference has mismatching ranges differingRanges 
with differingHost:
@@ -93,24 +93,24 @@ public class ReduceHelper
     }
 
     private static IncomingRepairStreamTracker getTracker(DifferenceHolder 
differences,
-                                                          Map<InetAddress, 
IncomingRepairStreamTracker> trackers,
-                                                          InetAddress host)
+                                                          
Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers,
+                                                          InetAddressAndPort 
host)
     {
         return trackers.computeIfAbsent(host, (h) -> new 
IncomingRepairStreamTracker(differences));
     }
 
     // greedily pick the nodes doing the least amount of streaming
-    private static Collection<InetAddress> pickLeastStreaming(InetAddress 
streamingNode,
+    private static Collection<InetAddressAndPort> 
pickLeastStreaming(InetAddressAndPort streamingNode,
                                                               
StreamFromOptions toStreamFrom,
-                                                              Map<InetAddress, 
Integer> outgoingStreamCounts,
+                                                              
Map<InetAddressAndPort, Integer> outgoingStreamCounts,
                                                               
PreferedNodeFilter filter)
     {
-        Set<InetAddress> retSet = new HashSet<>();
-        for (Set<InetAddress> toStream : toStreamFrom.allStreams())
+        Set<InetAddressAndPort> retSet = new HashSet<>();
+        for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
         {
-            InetAddress candidate = null;
-            Set<InetAddress> prefered = filter.apply(streamingNode, toStream);
-            for (InetAddress node : prefered)
+            InetAddressAndPort candidate = null;
+            Set<InetAddressAndPort> prefered = filter.apply(streamingNode, 
toStream);
+            for (InetAddressAndPort node : prefered)
             {
                 if (candidate == null || 
outgoingStreamCounts.getOrDefault(candidate, 0) > 
outgoingStreamCounts.getOrDefault(node, 0))
                 {
@@ -120,7 +120,7 @@ public class ReduceHelper
             // ok, found no prefered hosts, try all of them
             if (candidate == null)
             {
-                for (InetAddress node : toStream)
+                for (InetAddressAndPort node : toStream)
                 {
                     if (candidate == null || 
outgoingStreamCounts.getOrDefault(candidate, 0) > 
outgoingStreamCounts.getOrDefault(node, 0))
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java 
b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
index 4516f23..6070983 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -28,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Keeps track of where a node needs to stream a given range from.
@@ -53,18 +53,18 @@ public class StreamFromOptions
     /**
      * Contains the hosts to stream from - if two nodes are in the same inner 
set, they are identical for the range we are handling
      */
-    private final Set<Set<InetAddress>> streamOptions = new HashSet<>();
+    private final Set<Set<InetAddressAndPort>> streamOptions = new HashSet<>();
 
     public StreamFromOptions(DifferenceHolder differences, Range<Token> range)
     {
         this(differences, range, Collections.emptySet());
     }
 
-    private StreamFromOptions(DifferenceHolder differences, Range<Token> 
range, Set<Set<InetAddress>> existing)
+    private StreamFromOptions(DifferenceHolder differences, Range<Token> 
range, Set<Set<InetAddressAndPort>> existing)
     {
         this.differences = differences;
         this.range = range;
-        for (Set<InetAddress> addresses : existing)
+        for (Set<InetAddressAndPort> addresses : existing)
             this.streamOptions.add(Sets.newHashSet(addresses));
     }
 
@@ -75,11 +75,11 @@ public class StreamFromOptions
      * range we are tracking, then just add it to the set with the identical 
remote nodes. Otherwise create a new group
      * of nodes containing this new node.
      */
-    public void add(InetAddress streamFromNode)
+    public void add(InetAddressAndPort streamFromNode)
     {
-        for (Set<InetAddress> options : streamOptions)
+        for (Set<InetAddressAndPort> options : streamOptions)
         {
-            InetAddress first = options.iterator().next();
+            InetAddressAndPort first = options.iterator().next();
             if (!differences.hasDifferenceBetween(first, streamFromNode, 
range))
             {
                 options.add(streamFromNode);
@@ -94,7 +94,7 @@ public class StreamFromOptions
         return new StreamFromOptions(differences, withRange, streamOptions);
     }
 
-    public Iterable<Set<InetAddress>> allStreams()
+    public Iterable<Set<InetAddressAndPort>> allStreams()
     {
         return streamOptions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index c137346..78057e2 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
 import org.apache.cassandra.repair.messages.FinalizePromise;
@@ -55,7 +56,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  * There are 4 stages to a consistent incremental repair.
  *
  * <h1>Repair prepare</h1>
- *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, 
InetAddress, Set, RepairOption, List)} stuff
+ *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, 
InetAddressAndPort, Set, RepairOption, List)} stuff
  *  happens, which sends out {@link PrepareMessage} and creates a {@link 
ActiveRepairService.ParentRepairSession}
  *  on the coordinator and each of the neighbors.
  *
@@ -68,7 +69,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  coordinator indicating success or failure. If the pending anti-compaction 
fails, the local session state is set
  *  to {@code FAILED}.
  *  <p/>
- *  (see {@link LocalSessions#handlePrepareMessage(InetAddress, 
PrepareConsistentRequest)}
+ *  (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, 
PrepareConsistentRequest)}
  *  <p/>
  *  Once the coordinator recieves positive {@code PrepareConsistentResponse} 
messages from all the participants, the
  *  coordinator begins the normal repair process.
@@ -99,8 +100,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  & {@link CoordinatorSession#finalizeCommit()}
  *  <p/>
  *
- *  On the local session side, see {@link 
LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)}
- *  & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, 
FinalizeCommit)}
+ *  On the local session side, see {@link 
LocalSessions#handleFinalizeProposeMessage(InetAddressAndPort, FinalizePropose)}
+ *  & {@link LocalSessions#handleFinalizeCommitMessage(InetAddressAndPort, 
FinalizeCommit)}
  *
  * <h1>Failure</h1>
  *  If there are any failures or problems during the process above, the 
session will be failed. When a session is failed,
@@ -187,11 +188,11 @@ public abstract class ConsistentSession
 
     private volatile State state;
     public final UUID sessionID;
-    public final InetAddress coordinator;
+    public final InetAddressAndPort coordinator;
     public final ImmutableSet<TableId> tableIds;
     public final long repairedAt;
     public final ImmutableSet<Range<Token>> ranges;
-    public final ImmutableSet<InetAddress> participants;
+    public final ImmutableSet<InetAddressAndPort> participants;
 
     ConsistentSession(AbstractBuilder builder)
     {
@@ -260,11 +261,11 @@ public abstract class ConsistentSession
     {
         private State state;
         private UUID sessionID;
-        private InetAddress coordinator;
+        private InetAddressAndPort coordinator;
         private Set<TableId> ids;
         private long repairedAt;
         private Collection<Range<Token>> ranges;
-        private Set<InetAddress> participants;
+        private Set<InetAddressAndPort> participants;
 
         void withState(State state)
         {
@@ -276,7 +277,7 @@ public abstract class ConsistentSession
             this.sessionID = sessionID;
         }
 
-        void withCoordinator(InetAddress coordinator)
+        void withCoordinator(InetAddressAndPort coordinator)
         {
             this.coordinator = coordinator;
         }
@@ -301,7 +302,7 @@ public abstract class ConsistentSession
             this.ranges = ranges;
         }
 
-        void withParticipants(Set<InetAddress> peers)
+        void withParticipants(Set<InetAddressAndPort> peers)
         {
             this.participants = peers;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index ba0025f..f52a28d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +38,7 @@ import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairSessionResult;
@@ -58,7 +58,7 @@ public class CoordinatorSession extends ConsistentSession
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorSession.class);
 
-    private final Map<InetAddress, State> participantStates = new HashMap<>();
+    private final Map<InetAddressAndPort, State> participantStates = new 
HashMap<>();
     private final SettableFuture<Boolean> prepareFuture = 
SettableFuture.create();
     private final SettableFuture<Boolean> finalizeProposeFuture = 
SettableFuture.create();
 
@@ -69,7 +69,7 @@ public class CoordinatorSession extends ConsistentSession
     public CoordinatorSession(Builder builder)
     {
         super(builder);
-        for (InetAddress participant : participants)
+        for (InetAddressAndPort participant : participants)
         {
             participantStates.put(participant, State.PREPARING);
         }
@@ -95,7 +95,7 @@ public class CoordinatorSession extends ConsistentSession
         super.setState(state);
     }
 
-    public synchronized void setParticipantState(InetAddress participant, 
State state)
+    public synchronized void setParticipantState(InetAddressAndPort 
participant, State state)
     {
         logger.trace("Setting participant {} to state {} for repair {}", 
participant, state, sessionID);
         Preconditions.checkArgument(participantStates.containsKey(participant),
@@ -115,7 +115,7 @@ public class CoordinatorSession extends ConsistentSession
 
     synchronized void setAll(State state)
     {
-        for (InetAddress participant : participants)
+        for (InetAddressAndPort participant : participants)
         {
             setParticipantState(participant, state);
         }
@@ -131,7 +131,7 @@ public class CoordinatorSession extends ConsistentSession
         return getState() == State.FAILED || 
Iterables.any(participantStates.values(), v -> v == State.FAILED);
     }
 
-    protected void sendMessage(InetAddress destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, RepairMessage 
message)
     {
         logger.trace("Sending {} to {}", message, destination);
         MessageOut<RepairMessage> messageOut = new 
MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, 
RepairMessage.serializer);
@@ -144,14 +144,14 @@ public class CoordinatorSession extends ConsistentSession
 
         logger.debug("Beginning prepare phase of incremental repair session 
{}", sessionID);
         PrepareConsistentRequest message = new 
PrepareConsistentRequest(sessionID, coordinator, participants);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
         return prepareFuture;
     }
 
-    public synchronized void handlePrepareResponse(InetAddress participant, 
boolean success)
+    public synchronized void handlePrepareResponse(InetAddressAndPort 
participant, boolean success)
     {
         if (getState() == State.FAILED)
         {
@@ -185,14 +185,14 @@ public class CoordinatorSession extends ConsistentSession
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.debug("Proposing finalization of repair session {}", sessionID);
         FinalizePropose message = new FinalizePropose(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
         return finalizeProposeFuture;
     }
 
-    public synchronized void handleFinalizePromise(InetAddress participant, 
boolean success)
+    public synchronized void handleFinalizePromise(InetAddressAndPort 
participant, boolean success)
     {
         if (getState() == State.FAILED)
         {
@@ -221,7 +221,7 @@ public class CoordinatorSession extends ConsistentSession
         Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
         logger.debug("Committing finalization of repair session {}", 
sessionID);
         FinalizeCommit message = new FinalizeCommit(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
@@ -233,7 +233,7 @@ public class CoordinatorSession extends ConsistentSession
     {
         logger.info("Incremental repair session {} failed", sessionID);
         FailSession message = new FailSession(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             if (participantStates.get(participant) != State.FAILED)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index 211e0c1..bb84d0a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -26,6 +25,7 @@ import java.util.UUID;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizePromise;
 import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
@@ -43,7 +43,7 @@ public class CoordinatorSessions
         return new CoordinatorSession(builder);
     }
 
-    public synchronized CoordinatorSession registerSession(UUID sessionId, 
Set<InetAddress> participants)
+    public synchronized CoordinatorSession registerSession(UUID sessionId, 
Set<InetAddressAndPort> participants)
     {
         Preconditions.checkArgument(!sessions.containsKey(sessionId), "A 
coordinator already exists for session %s", sessionId);
         ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(sessionId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
index 903aeb5..98b883a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.repair.consistent;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -40,6 +42,7 @@ public class LocalSessionInfo
     public static final String LAST_UPDATE = "LAST_UPDATE";
     public static final String COORDINATOR = "COORDINATOR";
     public static final String PARTICIPANTS = "PARTICIPANTS";
+    public static final String PARTICIPANTS_WP = "PARTICIPANTS_WP";
     public static final String TABLES = "TABLES";
 
 
@@ -59,7 +62,8 @@ public class LocalSessionInfo
         m.put(STARTED, Integer.toString(session.getStartedAt()));
         m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate()));
         m.put(COORDINATOR, session.coordinator.toString());
-        m.put(PARTICIPANTS, 
Joiner.on(',').join(Iterables.transform(session.participants, 
InetAddress::toString)));
+        m.put(PARTICIPANTS, 
Joiner.on(',').join(Iterables.transform(session.participants.stream().map(peer 
-> peer.address).collect(Collectors.toList()), InetAddress::getHostAddress)));
+        m.put(PARTICIPANTS_WP, 
Joiner.on(',').join(Iterables.transform(session.participants, 
InetAddressAndPort::toString)));
         m.put(TABLES, 
Joiner.on(',').join(Iterables.transform(session.tableIds, 
LocalSessionInfo::tableString)));
 
         return m;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4ef2c2c..e62f6fd 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.consistent;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Date;
@@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -49,13 +51,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.InetAddressType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -141,13 +148,13 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    protected InetAddress getBroadcastAddress()
+    protected InetAddressAndPort getBroadcastAddressAndPort()
     {
-        return FBUtilities.getBroadcastAddress();
+        return FBUtilities.getBroadcastAddressAndPort();
     }
 
     @VisibleForTesting
-    protected boolean isAlive(InetAddress address)
+    protected boolean isAlive(InetAddressAndPort address)
     {
         return FailureDetector.instance.isAlive(address);
     }
@@ -177,14 +184,14 @@ public class LocalSessions
         logger.info("Cancelling local repair session {}", sessionID);
         LocalSession session = getSession(sessionID);
         Preconditions.checkArgument(session != null, "Session {} does not 
exist", sessionID);
-        Preconditions.checkArgument(force || 
session.coordinator.equals(getBroadcastAddress()),
+        Preconditions.checkArgument(force || 
session.coordinator.equals(getBroadcastAddressAndPort()),
                                     "Cancel session %s from it's coordinator 
(%s) or use --force",
                                     sessionID, session.coordinator);
 
         setStateAndSave(session, FAILED);
-        for (InetAddress participant : session.participants)
+        for (InetAddressAndPort participant : session.participants)
         {
-            if (!participant.equals(getBroadcastAddress()))
+            if (!participant.equals(getBroadcastAddressAndPort()))
                 sendMessage(participant, new FailSession(sessionID));
         }
     }
@@ -335,10 +342,12 @@ public class LocalSessions
                        "repaired_at, " +
                        "state, " +
                        "coordinator, " +
+                       "coordinator_port, " +
                        "participants, " +
+                       "participants_wp," +
                        "ranges, " +
                        "cfids) " +
-                       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
         QueryProcessor.executeInternal(String.format(query, keyspace, table),
                                        session.sessionID,
@@ -346,8 +355,10 @@ public class LocalSessions
                                        
Date.from(Instant.ofEpochSecond(session.getLastUpdate())),
                                        
Date.from(Instant.ofEpochMilli(session.repairedAt)),
                                        session.getState().ordinal(),
-                                       session.coordinator,
-                                       session.participants,
+                                       session.coordinator.address,
+                                       session.coordinator.port,
+                                       
session.participants.stream().map(participant -> 
participant.address).collect(Collectors.toSet()),
+                                       
session.participants.stream().map(participant -> 
participant.toString()).collect(Collectors.toSet()),
                                        serializeRanges(session.ranges),
                                        tableIdToUuid(session.tableIds));
     }
@@ -362,12 +373,27 @@ public class LocalSessions
         LocalSession.Builder builder = LocalSession.builder();
         
builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
         builder.withSessionID(row.getUUID("parent_id"));
-        builder.withCoordinator(row.getInetAddress("coordinator"));
+        InetAddressAndPort coordinator = 
InetAddressAndPort.getByAddressOverrideDefaults(
+            row.getInetAddress("coordinator"),
+            row.getInt("coordinator_port"));
+        builder.withCoordinator(coordinator);
         builder.withTableIds(uuidToTableId(row.getSet("cfids", 
UUIDType.instance)));
         builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
         builder.withRanges(deserializeRanges(row.getSet("ranges", 
BytesType.instance)));
-        builder.withParticipants(row.getSet("participants", 
InetAddressType.instance));
-
+        //There is no cross version streaming and thus no cross version repair 
so assume that
+        //any valid repair sessions has the participants_wp column and any 
that doesn't is malformed
+        Set<String> participants = row.getSet("participants_wp", 
UTF8Type.instance);
+        builder.withParticipants(participants.stream().map(participant ->
+                                                             {
+                                                                 try
+                                                                 {
+                                                                     return 
InetAddressAndPort.getByName(participant);
+                                                                 }
+                                                                 catch 
(UnknownHostException e)
+                                                                 {
+                                                                     throw new 
RuntimeException(e);
+                                                                 }
+                                                             
}).collect(Collectors.toSet()));
         builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at")));
         builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update")));
 
@@ -440,7 +466,7 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    LocalSession createSessionUnsafe(UUID sessionId, 
ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers)
+    LocalSession createSessionUnsafe(UUID sessionId, 
ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers)
     {
         LocalSession.Builder builder = LocalSession.builder();
         builder.withState(ConsistentSession.State.PREPARING);
@@ -464,7 +490,7 @@ public class LocalSessions
         return ActiveRepairService.instance.getParentRepairSession(sessionID);
     }
 
-    protected void sendMessage(InetAddress destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, RepairMessage 
message)
     {
         logger.trace("sending {} to {}", message, destination);
         MessageOut<RepairMessage> messageOut = new 
MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, 
RepairMessage.serializer);
@@ -536,12 +562,12 @@ public class LocalSessions
      * successfully. If the pending anti compaction fails, a failure message 
is sent to the coordinator,
      * cancelling the session.
      */
-    public void handlePrepareMessage(InetAddress from, 
PrepareConsistentRequest request)
+    public void handlePrepareMessage(InetAddressAndPort from, 
PrepareConsistentRequest request)
     {
         logger.trace("received {} from {}", request, from);
         UUID sessionID = request.parentSession;
-        InetAddress coordinator = request.coordinator;
-        Set<InetAddress> peers = request.participants;
+        InetAddressAndPort coordinator = request.coordinator;
+        Set<InetAddressAndPort> peers = request.participants;
 
         ActiveRepairService.ParentRepairSession parentSession;
         try
@@ -568,7 +594,7 @@ public class LocalSessions
             {
                 logger.debug("Prepare phase for incremental repair session {} 
completed", sessionID);
                 setStateAndSave(session, PREPARED);
-                sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddress(), true));
+                sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
                 executor.shutdown();
             }
 
@@ -587,7 +613,7 @@ public class LocalSessions
                 {
                     logger.error("Prepare phase for incremental repair session 
{} failed", sessionID, t);
                 }
-                sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddress(), false));
+                sendMessage(coordinator, new 
PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
                 failSession(sessionID, false);
                 executor.shutdown();
             }
@@ -604,7 +630,7 @@ public class LocalSessions
         }
     }
 
-    public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose 
propose)
+    public void handleFinalizeProposeMessage(InetAddressAndPort from, 
FinalizePropose propose)
     {
         logger.trace("received {} from {}", propose, from);
         UUID sessionID = propose.sessionID;
@@ -629,7 +655,7 @@ public class LocalSessions
              */
             syncTable();
 
-            sendMessage(from, new FinalizePromise(sessionID, 
getBroadcastAddress(), true));
+            sendMessage(from, new FinalizePromise(sessionID, 
getBroadcastAddressAndPort(), true));
             logger.debug("Received FinalizePropose message for incremental 
repair session {}, responded with FinalizePromise", sessionID);
         }
         catch (IllegalArgumentException e)
@@ -659,7 +685,7 @@ public class LocalSessions
      * as part of the compaction process, and avoids having to worry about in 
progress compactions interfering with the
      * promotion.
      */
-    public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit 
commit)
+    public void handleFinalizeCommitMessage(InetAddressAndPort from, 
FinalizeCommit commit)
     {
         logger.trace("received {} from {}", commit, from);
         UUID sessionID = commit.sessionID;
@@ -674,7 +700,7 @@ public class LocalSessions
         logger.info("Finalized local repair session {}", sessionID);
     }
 
-    public void handleFailSessionMessage(InetAddress from, FailSession msg)
+    public void handleFailSessionMessage(InetAddressAndPort from, FailSession 
msg)
     {
         logger.trace("received {} from {}", msg, from);
         failSession(msg.sessionID, false);
@@ -684,16 +710,16 @@ public class LocalSessions
     {
         logger.debug("Attempting to learn the outcome of unfinished local 
incremental repair session {}", session.sessionID);
         StatusRequest request = new StatusRequest(session.sessionID);
-        for (InetAddress participant : session.participants)
+        for (InetAddressAndPort participant : session.participants)
         {
-            if (!getBroadcastAddress().equals(participant) && 
isAlive(participant))
+            if (!getBroadcastAddressAndPort().equals(participant) && 
isAlive(participant))
             {
                 sendMessage(participant, request);
             }
         }
     }
 
-    public void handleStatusRequest(InetAddress from, StatusRequest request)
+    public void handleStatusRequest(InetAddressAndPort from, StatusRequest 
request)
     {
         logger.trace("received {} from {}", request, from);
         UUID sessionID = request.sessionID;
@@ -710,7 +736,7 @@ public class LocalSessions
        }
     }
 
-    public void handleStatusResponse(InetAddress from, StatusResponse response)
+    public void handleStatusResponse(InetAddressAndPort from, StatusResponse 
response)
     {
         logger.trace("received {} from {}", response, from);
         UUID sessionID = response.sessionID;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java 
b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index 015b558..156fde7 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Objects;
 
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.RepairResult;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.SyncStat;
@@ -42,14 +42,14 @@ public class SyncStatSummary
 
     private static class Session
     {
-        final InetAddress src;
-        final InetAddress dst;
+        final InetAddressAndPort src;
+        final InetAddressAndPort dst;
 
         int files = 0;
         long bytes = 0;
         long ranges = 0;
 
-        Session(InetAddress src, InetAddress dst)
+        Session(InetAddressAndPort src, InetAddressAndPort dst)
         {
             this.src = src;
             this.dst = dst;
@@ -84,7 +84,7 @@ public class SyncStatSummary
         int ranges = -1;
         boolean totalsCalculated = false;
 
-        final Map<Pair<InetAddress, InetAddress>, Session> sessions = new 
HashMap<>();
+        final Map<Pair<InetAddressAndPort, InetAddressAndPort>, Session> 
sessions = new HashMap<>();
 
         Table(String keyspace, String table)
         {
@@ -92,9 +92,9 @@ public class SyncStatSummary
             this.table = table;
         }
 
-        Session getOrCreate(InetAddress from, InetAddress to)
+        Session getOrCreate(InetAddressAndPort from, InetAddressAndPort to)
         {
-            Pair<InetAddress, InetAddress> k = Pair.create(from, to);
+            Pair<InetAddressAndPort, InetAddressAndPort> k = Pair.create(from, 
to);
             if (!sessions.containsKey(k))
             {
                 sessions.put(k, new Session(from, to));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java 
b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
index b75ad7f..6d76269 100644
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -39,13 +40,13 @@ public class AsymmetricSyncRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new SyncRequestSerializer();
 
-    public final InetAddress initiator;
-    public final InetAddress fetchingNode;
-    public final InetAddress fetchFrom;
+    public final InetAddressAndPort initiator;
+    public final InetAddressAndPort fetchingNode;
+    public final InetAddressAndPort fetchFrom;
     public final Collection<Range<Token>> ranges;
     public final PreviewKind previewKind;
 
-    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, 
InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> 
ranges, PreviewKind previewKind)
+    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort 
initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, 
Collection<Range<Token>> ranges, PreviewKind previewKind)
     {
         super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
         this.initiator = initiator;
@@ -80,9 +81,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public void serialize(AsymmetricSyncRequest message, DataOutputPlus 
out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            CompactEndpointSerializationHelper.serialize(message.initiator, 
out);
-            CompactEndpointSerializationHelper.serialize(message.fetchingNode, 
out);
-            CompactEndpointSerializationHelper.serialize(message.fetchFrom, 
out);
+            
CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, 
version);
+            
CompactEndpointSerializationHelper.instance.serialize(message.fetchingNode, 
out, version);
+            
CompactEndpointSerializationHelper.instance.serialize(message.fetchFrom, out, 
version);
             out.writeInt(message.ranges.size());
             for (Range<Token> range : message.ranges)
             {
@@ -95,9 +96,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public AsymmetricSyncRequest deserialize(DataInputPlus in, int 
version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, 
version);
-            InetAddress owner = 
CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress src = 
CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress dst = 
CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort owner = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort src = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort dst = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
@@ -109,7 +110,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public long serializedSize(AsymmetricSyncRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, 
version);
-            size += 3 * 
CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, 
version);
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(message.fetchingNode,
 version);
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(message.fetchFrom, 
version);
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, 
version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java 
b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index 6c28347..449748a 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -19,24 +19,22 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class FinalizePromise extends RepairMessage
 {
     public final UUID sessionID;
-    public final InetAddress participant;
+    public final InetAddressAndPort participant;
     public final boolean promised;
 
-    public FinalizePromise(UUID sessionID, InetAddress participant, boolean 
promised)
+    public FinalizePromise(UUID sessionID, InetAddressAndPort participant, 
boolean promised)
     {
         super(Type.FINALIZE_PROMISE, null);
         assert sessionID != null;
@@ -68,26 +66,24 @@ public class FinalizePromise extends RepairMessage
 
     public static MessageSerializer serializer = new 
MessageSerializer<FinalizePromise>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
-
         public void serialize(FinalizePromise msg, DataOutputPlus out, int 
version) throws IOException
         {
             UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
-            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), 
out);
+            
CompactEndpointSerializationHelper.instance.serialize(msg.participant, out, 
version);
             out.writeBoolean(msg.promised);
         }
 
         public FinalizePromise deserialize(DataInputPlus in, int version) 
throws IOException
         {
             return new 
FinalizePromise(UUIDSerializer.serializer.deserialize(in, version),
-                                       
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                       
CompactEndpointSerializationHelper.instance.deserialize(in, version),
                                        in.readBoolean());
         }
 
         public long serializedSize(FinalizePromise msg, int version)
         {
             long size = 
UUIDSerializer.serializer.serializedSize(msg.sessionID, version);
-            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant));
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(msg.participant, 
version);
             size += TypeSizes.sizeof(msg.promised);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index 57056ef..9aae256 100644
--- 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -29,18 +28,17 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareConsistentRequest extends RepairMessage
 {
     public final UUID parentSession;
-    public final InetAddress coordinator;
-    public final Set<InetAddress> participants;
+    public final InetAddressAndPort coordinator;
+    public final Set<InetAddressAndPort> participants;
 
-    public PrepareConsistentRequest(UUID parentSession, InetAddress 
coordinator, Set<InetAddress> participants)
+    public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort 
coordinator, Set<InetAddressAndPort> participants)
     {
         super(Type.CONSISTENT_REQUEST, null);
         assert parentSession != null;
@@ -82,28 +80,27 @@ public class PrepareConsistentRequest extends RepairMessage
 
     public static MessageSerializer serializer = new 
MessageSerializer<PrepareConsistentRequest>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
 
         public void serialize(PrepareConsistentRequest request, DataOutputPlus 
out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(request.parentSession, out, 
version);
-            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator),
 out);
+            
CompactEndpointSerializationHelper.instance.serialize(request.coordinator, out, 
version);
             out.writeInt(request.participants.size());
-            for (InetAddress peer : request.participants)
+            for (InetAddressAndPort peer : request.participants)
             {
-                
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out);
+                CompactEndpointSerializationHelper.instance.serialize(peer, 
out, version);
             }
         }
 
         public PrepareConsistentRequest deserialize(DataInputPlus in, int 
version) throws IOException
         {
             UUID sessionId = UUIDSerializer.serializer.deserialize(in, 
version);
-            InetAddress coordinator = 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+            InetAddressAndPort coordinator = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int numPeers = in.readInt();
-            Set<InetAddress> peers = new HashSet<>(numPeers);
+            Set<InetAddressAndPort> peers = new HashSet<>(numPeers);
             for (int i = 0; i < numPeers; i++)
             {
-                InetAddress peer = 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+                InetAddressAndPort peer = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
                 peers.add(peer);
             }
             return new PrepareConsistentRequest(sessionId, coordinator, peers);
@@ -112,11 +109,11 @@ public class PrepareConsistentRequest extends 
RepairMessage
         public long serializedSize(PrepareConsistentRequest request, int 
version)
         {
             long size = 
UUIDSerializer.serializer.serializedSize(request.parentSession, version);
-            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator));
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(request.coordinator, 
version);
             size += TypeSizes.sizeof(request.participants.size());
-            for (InetAddress peer : request.participants)
+            for (InetAddressAndPort peer : request.participants)
             {
-                size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer));
+                size += 
CompactEndpointSerializationHelper.instance.serializedSize(peer, version);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index cf4410a..630f18e 100644
--- 
a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ 
b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -19,24 +19,22 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareConsistentResponse extends RepairMessage
 {
     public final UUID parentSession;
-    public final InetAddress participant;
+    public final InetAddressAndPort participant;
     public final boolean success;
 
-    public PrepareConsistentResponse(UUID parentSession, InetAddress 
participant, boolean success)
+    public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort 
participant, boolean success)
     {
         super(Type.CONSISTENT_RESPONSE, null);
         assert parentSession != null;
@@ -68,25 +66,24 @@ public class PrepareConsistentResponse extends RepairMessage
 
     public static MessageSerializer serializer = new 
MessageSerializer<PrepareConsistentResponse>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = 
InetAddressSerializer.instance;
         public void serialize(PrepareConsistentResponse response, 
DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(response.parentSession, out, 
version);
-            
ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant),
 out);
+            
CompactEndpointSerializationHelper.instance.serialize(response.participant, 
out, version);
             out.writeBoolean(response.success);
         }
 
         public PrepareConsistentResponse deserialize(DataInputPlus in, int 
version) throws IOException
         {
             return new 
PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version),
-                                                 
inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                                 
CompactEndpointSerializationHelper.instance.deserialize(in, version),
                                                  in.readBoolean());
         }
 
         public long serializedSize(PrepareConsistentResponse response, int 
version)
         {
             long size = 
UUIDSerializer.serializer.serializedSize(response.parentSession, version);
-            size += 
ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant));
+            size += 
CompactEndpointSerializationHelper.instance.serializedSize(response.participant,
 version);
             size += TypeSizes.sizeof(response.success);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java 
b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 7b68daf..1f1344d 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -26,6 +25,7 @@ import java.util.Objects;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.NodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.streaming.SessionSummary;
@@ -53,7 +53,7 @@ public class SyncComplete extends RepairMessage
         this.summaries = summaries;
     }
 
-    public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress 
endpoint2, boolean success, List<SessionSummary> summaries)
+    public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, 
InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
     {
         super(Type.SYNC_COMPLETE, desc);
         this.summaries = summaries;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java 
b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 01601e2..a0bf4e2 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -45,14 +45,14 @@ public class SyncRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new SyncRequestSerializer();
 
-    public final InetAddress initiator;
-    public final InetAddress src;
-    public final InetAddress dst;
+    public final InetAddressAndPort initiator;
+    public final InetAddressAndPort src;
+    public final InetAddressAndPort dst;
     public final Collection<Range<Token>> ranges;
     public final PreviewKind previewKind;
 
-    public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress 
src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
-    {
+   public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, 
InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> 
ranges, PreviewKind previewKind)
+   {
         super(Type.SYNC_REQUEST, desc);
         this.initiator = initiator;
         this.src = src;
@@ -87,9 +87,9 @@ public class SyncRequest extends RepairMessage
         public void serialize(SyncRequest message, DataOutputPlus out, int 
version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            CompactEndpointSerializationHelper.serialize(message.initiator, 
out);
-            CompactEndpointSerializationHelper.serialize(message.src, out);
-            CompactEndpointSerializationHelper.serialize(message.dst, out);
+            
CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, 
version);
+            CompactEndpointSerializationHelper.instance.serialize(message.src, 
out, version);
+            CompactEndpointSerializationHelper.instance.serialize(message.dst, 
out, version);
             out.writeInt(message.ranges.size());
             for (Range<Token> range : message.ranges)
             {
@@ -102,9 +102,9 @@ public class SyncRequest extends RepairMessage
         public SyncRequest deserialize(DataInputPlus in, int version) throws 
IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, 
version);
-            InetAddress owner = 
CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress src = 
CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress dst = 
CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort owner = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort src = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort dst = 
CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
@@ -116,7 +116,7 @@ public class SyncRequest extends RepairMessage
         public long serializedSize(SyncRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, 
version);
-            size += 3 * 
CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += 3 * 
CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, 
version);
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, 
version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index ef19c25..c8881e5 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.schema;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
@@ -39,6 +38,7 @@ import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -59,10 +59,10 @@ public class MigrationManager
 
     private MigrationManager() {}
 
-    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState 
state)
+    public static void scheduleSchemaPull(InetAddressAndPort endpoint, 
EndpointState state)
     {
         UUID schemaVersion = state.getSchemaVersion();
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && 
schemaVersion != null)
+        if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && 
schemaVersion != null)
             maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
@@ -70,7 +70,7 @@ public class MigrationManager
      * If versions differ this node sends request with local migration list to 
the endpoint
      * and expecting to receive a list of migrations to apply locally.
      */
-    private static void maybeScheduleSchemaPull(final UUID theirVersion, final 
InetAddress endpoint)
+    private static void maybeScheduleSchemaPull(final UUID theirVersion, final 
InetAddressAndPort endpoint)
     {
         if (Schema.instance.getVersion() == null)
         {
@@ -130,7 +130,7 @@ public class MigrationManager
         }
     }
 
-    private static Future<?> submitMigrationTask(InetAddress endpoint)
+    private static Future<?> submitMigrationTask(InetAddressAndPort endpoint)
     {
         /*
          * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
@@ -139,7 +139,7 @@ public class MigrationManager
         return StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(endpoint));
     }
 
-    static boolean shouldPullSchemaFrom(InetAddress endpoint)
+    static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint)
     {
         /*
          * Don't request schema from nodes with a differnt or unknonw major 
version (may have incompatible schema)
@@ -427,7 +427,7 @@ public class MigrationManager
             FBUtilities.waitOnFuture(announce(mutations));
     }
 
-    private static void pushSchemaMutation(InetAddress endpoint, 
Collection<Mutation> schema)
+    private static void pushSchemaMutation(InetAddressAndPort endpoint, 
Collection<Mutation> schema)
     {
         MessageOut<Collection<Mutation>> msg = new 
MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
                                                                 schema,
@@ -446,10 +446,10 @@ public class MigrationManager
             }
         });
 
-        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
         {
             // only push schema to nodes with known and equal versions
-            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
                     MessagingService.instance().knowsVersion(endpoint) &&
                     MessagingService.instance().getRawVersion(endpoint) == 
MessagingService.current_version)
                 pushSchemaMutation(endpoint, schema);
@@ -486,11 +486,11 @@ public class MigrationManager
 
         Schema.instance.clear();
 
-        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
-        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+        Set<InetAddressAndPort> liveEndpoints = 
Gossiper.instance.getLiveMembers();
+        liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort());
 
         // force migration if there are nodes around
-        for (InetAddress node : liveEndpoints)
+        for (InetAddressAndPort node : liveEndpoints)
         {
             if (shouldPullSchemaFrom(node))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java 
b/src/java/org/apache/cassandra/schema/MigrationTask.java
index 73e396d..6ff206a 100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.schema;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -46,9 +46,9 @@ final class MigrationTask extends WrappedRunnable
 
     private static final Set<BootstrapState> monitoringBootstrapStates = 
EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
 
-    private final InetAddress endpoint;
+    private final InetAddressAndPort endpoint;
 
-    MigrationTask(InetAddress endpoint)
+    MigrationTask(InetAddressAndPort endpoint)
     {
         this.endpoint = endpoint;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7ff0b9b..e06131e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -60,12 +60,12 @@ public abstract class AbstractReadExecutor
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractReadExecutor.class);
 
     protected final ReadCommand command;
-    protected final List<InetAddress> targetReplicas;
+    protected final List<InetAddressAndPort> targetReplicas;
     protected final ReadCallback handler;
     protected final TraceState traceState;
     protected final ColumnFamilyStore cfs;
 
-    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, 
long queryStartNanoTime)
+    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> 
targetReplicas, long queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
@@ -78,27 +78,27 @@ public abstract class AbstractReadExecutor
         // TODO: we need this when talking with pre-3.0 nodes. So if we 
preserve the digest format moving forward, we can get rid of this once
         // we stop being compatible with pre-3.0 nodes.
         int digestVersion = MessagingService.current_version;
-        for (InetAddress replica : targetReplicas)
+        for (InetAddressAndPort replica : targetReplicas)
             digestVersion = Math.min(digestVersion, 
MessagingService.instance().getVersion(replica));
         command.setDigestVersion(digestVersion);
     }
 
-    protected void makeDataRequests(Iterable<InetAddress> endpoints)
+    protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
     {
         makeRequests(command, endpoints);
 
     }
 
-    protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+    protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
     {
         makeRequests(command.copyAsDigestQuery(), endpoints);
     }
 
-    private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> 
endpoints)
+    private void makeRequests(ReadCommand readCommand, 
Iterable<InetAddressAndPort> endpoints)
     {
         boolean hasLocalEndpoint = false;
 
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             if (StorageProxy.canDoLocalRequest(endpoint))
             {
@@ -132,7 +132,7 @@ public abstract class AbstractReadExecutor
      *
      * @return target replicas + the extra replica, *IF* we speculated.
      */
-    public abstract Collection<InetAddress> getContactedReplicas();
+    public abstract Collection<InetAddressAndPort> getContactedReplicas();
 
     /**
      * send the initial set of requests
@@ -184,12 +184,12 @@ public abstract class AbstractReadExecutor
     public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        List<InetAddress> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+        List<InetAddressAndPort> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
         // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do 
not miscount DC responses
         ReadRepairDecision repairDecision = consistencyLevel == 
ConsistencyLevel.EACH_QUORUM
                                             ? ReadRepairDecision.NONE
                                             : 
newReadRepairDecision(command.metadata());
-        List<InetAddress> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+        List<InetAddressAndPort> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
 
         // Throw UAE early if we don't have enough replicas.
         consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
@@ -223,12 +223,12 @@ public abstract class AbstractReadExecutor
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
-        InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+        InetAddressAndPort extraReplica = 
allReplicas.get(targetReplicas.size());
         // With repair decision DC_LOCAL all replicas/target replicas may be 
in different order, so
         // we might have to find a replacement that's not already in 
targetReplicas.
         if (repairDecision == ReadRepairDecision.DC_LOCAL && 
targetReplicas.contains(extraReplica))
         {
-            for (InetAddress address : allReplicas)
+            for (InetAddressAndPort address : allReplicas)
             {
                 if (!targetReplicas.contains(address))
                 {
@@ -269,7 +269,7 @@ public abstract class AbstractReadExecutor
          */
         private final boolean logFailedSpeculation;
 
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddress> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
             this.logFailedSpeculation = logFailedSpeculation;
@@ -290,7 +290,7 @@ public abstract class AbstractReadExecutor
             }
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return targetReplicas;
         }
@@ -304,7 +304,7 @@ public abstract class AbstractReadExecutor
                                        ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
-                                       List<InetAddress> targetReplicas,
+                                       List<InetAddressAndPort> targetReplicas,
                                        long queryStartNanoTime)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
@@ -314,7 +314,7 @@ public abstract class AbstractReadExecutor
         {
             // if CL + RR result in covering all replicas, getReadExecutor 
forces AlwaysSpeculating.  So we know
             // that the last replica in our list is "extra."
-            List<InetAddress> initialReplicas = targetReplicas.subList(0, 
targetReplicas.size() - 1);
+            List<InetAddressAndPort> initialReplicas = 
targetReplicas.subList(0, targetReplicas.size() - 1);
 
             if (handler.blockfor < initialReplicas.size())
             {
@@ -347,7 +347,7 @@ public abstract class AbstractReadExecutor
                 if (handler.resolver.isDataPresent())
                     retryCommand = command.copyAsDigestQuery();
 
-                InetAddress extraReplica = Iterables.getLast(targetReplicas);
+                InetAddressAndPort extraReplica = 
Iterables.getLast(targetReplicas);
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", 
extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
@@ -355,7 +355,7 @@ public abstract class AbstractReadExecutor
             }
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return speculated
                  ? targetReplicas
@@ -378,7 +378,7 @@ public abstract class AbstractReadExecutor
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
-                                             List<InetAddress> targetReplicas,
+                                             List<InetAddressAndPort> 
targetReplicas,
                                              long queryStartNanoTime)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
@@ -389,7 +389,7 @@ public abstract class AbstractReadExecutor
             // no-op
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return targetReplicas;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index b5eaadb..9d800a0 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -47,15 +47,15 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
-    protected final Collection<InetAddress> naturalEndpoints;
+    protected final Collection<InetAddressAndPort> naturalEndpoints;
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
-    protected final Collection<InetAddress> pendingEndpoints;
+    protected final Collection<InetAddressAndPort> pendingEndpoints;
     protected final WriteType writeType;
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
     = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;
-    private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
+    private final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
     private final long queryStartNanoTime;
     private volatile boolean supportsBackPressure = true;
 
@@ -72,8 +72,8 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      * @param queryStartNanoTime
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
-                                           Collection<InetAddress> 
naturalEndpoints,
-                                           Collection<InetAddress> 
pendingEndpoints,
+                                           Collection<InetAddressAndPort> 
naturalEndpoints,
+                                           Collection<InetAddressAndPort> 
pendingEndpoints,
                                            ConsistencyLevel consistencyLevel,
                                            Runnable callback,
                                            WriteType writeType,
@@ -208,7 +208,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     /**
      * @return true if the message counts towards the totalBlockFor() threshold
      */
-    protected boolean waitingFor(InetAddress from)
+    protected boolean waitingFor(InetAddressAndPort from)
     {
         return true;
     }
@@ -236,7 +236,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     }
 
     @Override
-    public void onFailure(InetAddress from, RequestFailureReason failureReason)
+    public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
     {
         logger.trace("Got failure from {}", from);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to