http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 4469384..91a1bff 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +53,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.*;
@@ -91,47 +93,52 @@ public final class SystemKeyspace
     public static final String PAXOS = "paxos";
     public static final String BUILT_INDEXES = "IndexInfo";
     public static final String LOCAL = "local";
-    public static final String PEERS = "peers";
-    public static final String PEER_EVENTS = "peer_events";
+    public static final String PEERS_V2 = "peers_v2";
+    public static final String PEER_EVENTS_V2 = "peer_events_v2";
     public static final String RANGE_XFERS = "range_xfers";
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
     public static final String AVAILABLE_RANGES = "available_ranges";
     public static final String TRANSFERRED_RANGES = "transferred_ranges";
+    public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
     public static final String VIEW_BUILDS_IN_PROGRESS = 
"view_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
     public static final String REPAIRS = "repairs";
 
+    @Deprecated public static final String LEGACY_PEERS = "peers";
+    @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
+    @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = 
"transferred_ranges";
+
     public static final TableMetadata Batches =
         parse(BATCHES,
-              "batches awaiting replay",
-              "CREATE TABLE %s ("
-              + "id timeuuid,"
-              + "mutations list<blob>,"
-              + "version int,"
-              + "PRIMARY KEY ((id)))")
-              .partitioner(new LocalPartitioner(TimeUUIDType.instance))
-              .compaction(CompactionParams.scts(singletonMap("min_threshold", 
"2")))
-              .build();
+                "batches awaiting replay",
+                "CREATE TABLE %s ("
+                + "id timeuuid,"
+                + "mutations list<blob>,"
+                + "version int,"
+                + "PRIMARY KEY ((id)))")
+                .partitioner(new LocalPartitioner(TimeUUIDType.instance))
+                
.compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
+                .build();
 
     private static final TableMetadata Paxos =
         parse(PAXOS,
-              "in-progress paxos proposals",
-              "CREATE TABLE %s ("
-              + "row_key blob,"
-              + "cf_id UUID,"
-              + "in_progress_ballot timeuuid,"
-              + "most_recent_commit blob,"
-              + "most_recent_commit_at timeuuid,"
-              + "most_recent_commit_version int,"
-              + "proposal blob,"
-              + "proposal_ballot timeuuid,"
-              + "proposal_version int,"
-              + "PRIMARY KEY ((row_key), cf_id))")
-              .compaction(CompactionParams.lcs(emptyMap()))
-              .build();
+                "in-progress paxos proposals",
+                "CREATE TABLE %s ("
+                + "row_key blob,"
+                + "cf_id UUID,"
+                + "in_progress_ballot timeuuid,"
+                + "most_recent_commit blob,"
+                + "most_recent_commit_at timeuuid,"
+                + "most_recent_commit_version int,"
+                + "proposal blob,"
+                + "proposal_ballot timeuuid,"
+                + "proposal_version int,"
+                + "PRIMARY KEY ((row_key), cf_id))")
+                .compaction(CompactionParams.lcs(emptyMap()))
+                .build();
 
     private static final TableMetadata BuiltIndexes =
         parse(BUILT_INDEXES,
@@ -145,122 +152,130 @@ public final class SystemKeyspace
 
     private static final TableMetadata Local =
         parse(LOCAL,
-              "information about the local node",
-              "CREATE TABLE %s ("
-              + "key text,"
-              + "bootstrapped text,"
-              + "broadcast_address inet,"
-              + "cluster_name text,"
-              + "cql_version text,"
-              + "data_center text,"
-              + "gossip_generation int,"
-              + "host_id uuid,"
-              + "listen_address inet,"
-              + "native_protocol_version text,"
-              + "partitioner text,"
-              + "rack text,"
-              + "release_version text,"
-              + "rpc_address inet,"
-              + "schema_version uuid,"
-              + "tokens set<varchar>,"
-              + "truncated_at map<uuid, blob>,"
-              + "PRIMARY KEY ((key)))")
-              .recordDeprecatedSystemColumn("thrift_version", 
UTF8Type.instance)
-              .build();
-
-    private static final TableMetadata Peers =
-        parse(PEERS,
-              "information about known peers in the cluster",
-              "CREATE TABLE %s ("
-              + "peer inet,"
-              + "data_center text,"
-              + "host_id uuid,"
-              + "preferred_ip inet,"
-              + "rack text,"
-              + "release_version text,"
-              + "rpc_address inet,"
-              + "schema_version uuid,"
-              + "tokens set<varchar>,"
-              + "PRIMARY KEY ((peer)))")
-              .build();
-
-    private static final TableMetadata PeerEvents =
-        parse(PEER_EVENTS,
-              "events related to peers",
-              "CREATE TABLE %s ("
-              + "peer inet,"
-              + "hints_dropped map<uuid, int>,"
-              + "PRIMARY KEY ((peer)))")
-              .build();
+                "information about the local node",
+                "CREATE TABLE %s ("
+                + "key text,"
+                + "bootstrapped text,"
+                + "broadcast_address inet,"
+                + "broadcast_port int,"
+                + "cluster_name text,"
+                + "cql_version text,"
+                + "data_center text,"
+                + "gossip_generation int,"
+                + "host_id uuid,"
+                + "listen_address inet,"
+                + "listen_port int,"
+                + "native_protocol_version text,"
+                + "partitioner text,"
+                + "rack text,"
+                + "release_version text,"
+                + "rpc_address inet,"
+                + "rpc_port int,"
+                + "schema_version uuid,"
+                + "tokens set<varchar>,"
+                + "truncated_at map<uuid, blob>,"
+                + "PRIMARY KEY ((key)))"
+                ).recordDeprecatedSystemColumn("thrift_version", 
UTF8Type.instance)
+                .build();
+
+    private static final TableMetadata PeersV2 =
+        parse(PEERS_V2,
+                "information about known peers in the cluster",
+                "CREATE TABLE %s ("
+                + "peer inet,"
+                + "peer_port int,"
+                + "data_center text,"
+                + "host_id uuid,"
+                + "preferred_ip inet,"
+                + "preferred_port int,"
+                + "rack text,"
+                + "release_version text,"
+                + "native_address inet,"
+                + "native_port int,"
+                + "schema_version uuid,"
+                + "tokens set<varchar>,"
+                + "PRIMARY KEY ((peer), peer_port))")
+                .build();
+
+    private static final TableMetadata PeerEventsV2 =
+        parse(PEER_EVENTS_V2,
+                "events related to peers",
+                "CREATE TABLE %s ("
+                + "peer inet,"
+                + "peer_port int,"
+                + "hints_dropped map<uuid, int>,"
+                + "PRIMARY KEY ((peer), peer_port))")
+                .build();
 
     private static final TableMetadata RangeXfers =
         parse(RANGE_XFERS,
-              "ranges requested for transfer",
-              "CREATE TABLE %s ("
-              + "token_bytes blob,"
-              + "requested_at timestamp,"
-              + "PRIMARY KEY ((token_bytes)))")
-              .build();
+                "ranges requested for transfer",
+                "CREATE TABLE %s ("
+                + "token_bytes blob,"
+                + "requested_at timestamp,"
+                + "PRIMARY KEY ((token_bytes)))")
+                .build();
 
     private static final TableMetadata CompactionHistory =
         parse(COMPACTION_HISTORY,
-              "week-long compaction history",
-              "CREATE TABLE %s ("
-              + "id uuid,"
-              + "bytes_in bigint,"
-              + "bytes_out bigint,"
-              + "columnfamily_name text,"
-              + "compacted_at timestamp,"
-              + "keyspace_name text,"
-              + "rows_merged map<int, bigint>,"
-              + "PRIMARY KEY ((id)))")
-              .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
-              .build();
+                "week-long compaction history",
+                "CREATE TABLE %s ("
+                + "id uuid,"
+                + "bytes_in bigint,"
+                + "bytes_out bigint,"
+                + "columnfamily_name text,"
+                + "compacted_at timestamp,"
+                + "keyspace_name text,"
+                + "rows_merged map<int, bigint>,"
+                + "PRIMARY KEY ((id)))")
+                .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
+                .build();
 
     private static final TableMetadata SSTableActivity =
         parse(SSTABLE_ACTIVITY,
-              "historic sstable read rates",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "columnfamily_name text,"
-              + "generation int,"
-              + "rate_120m double,"
-              + "rate_15m double,"
-              + "PRIMARY KEY ((keyspace_name, columnfamily_name, 
generation)))")
-              .build();
+                "historic sstable read rates",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "columnfamily_name text,"
+                + "generation int,"
+                + "rate_120m double,"
+                + "rate_15m double,"
+                + "PRIMARY KEY ((keyspace_name, columnfamily_name, 
generation)))")
+                .build();
 
     private static final TableMetadata SizeEstimates =
         parse(SIZE_ESTIMATES,
-              "per-table primary range size estimates",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "table_name text,"
-              + "range_start text,"
-              + "range_end text,"
-              + "mean_partition_size bigint,"
-              + "partitions_count bigint,"
-              + "PRIMARY KEY ((keyspace_name), table_name, range_start, 
range_end))")
-              .build();
+                "per-table primary range size estimates",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "table_name text,"
+                + "range_start text,"
+                + "range_end text,"
+                + "mean_partition_size bigint,"
+                + "partitions_count bigint,"
+                + "PRIMARY KEY ((keyspace_name), table_name, range_start, 
range_end))")
+                .build();
 
     private static final TableMetadata AvailableRanges =
         parse(AVAILABLE_RANGES,
-              "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "ranges set<blob>,"
-              + "PRIMARY KEY ((keyspace_name)))")
-              .build();
-
-    private static final TableMetadata TransferredRanges =
-        parse(TRANSFERRED_RANGES,
-              "record of transferred ranges for streaming operation",
-              "CREATE TABLE %s ("
-              + "operation text,"
-              + "peer inet,"
-              + "keyspace_name text,"
-              + "ranges set<blob>,"
-              + "PRIMARY KEY ((operation, keyspace_name), peer))")
-              .build();
+                "available keyspace/ranges during bootstrap/replace that are 
ready to be served",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "ranges set<blob>,"
+                + "PRIMARY KEY ((keyspace_name)))")
+                .build();
+
+    private static final TableMetadata TransferredRangesV2 =
+        parse(TRANSFERRED_RANGES_V2,
+                "record of transferred ranges for streaming operation",
+                "CREATE TABLE %s ("
+                + "operation text,"
+                + "peer inet,"
+                + "peer_port int,"
+                + "keyspace_name text,"
+                + "ranges set<blob>,"
+                + "PRIMARY KEY ((operation, keyspace_name), peer, peer_port))")
+                .build();
 
     private static final TableMetadata ViewBuildsInProgress =
         parse(VIEW_BUILDS_IN_PROGRESS,
@@ -277,38 +292,79 @@ public final class SystemKeyspace
 
     private static final TableMetadata BuiltViews =
         parse(BUILT_VIEWS,
-              "built views",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "view_name text,"
-              + "status_replicated boolean,"
-              + "PRIMARY KEY ((keyspace_name), view_name))")
-              .build();
+                "built views",
+                "CREATE TABLE %s ("
+                + "keyspace_name text,"
+                + "view_name text,"
+                + "status_replicated boolean,"
+                + "PRIMARY KEY ((keyspace_name), view_name))")
+                .build();
 
     private static final TableMetadata PreparedStatements =
         parse(PREPARED_STATEMENTS,
-              "prepared statements",
-              "CREATE TABLE %s ("
-              + "prepared_id blob,"
-              + "logged_keyspace text,"
-              + "query_string text,"
-              + "PRIMARY KEY ((prepared_id)))")
-              .build();
+                "prepared statements",
+                "CREATE TABLE %s ("
+                + "prepared_id blob,"
+                + "logged_keyspace text,"
+                + "query_string text,"
+                + "PRIMARY KEY ((prepared_id)))")
+                .build();
 
     private static final TableMetadata Repairs =
         parse(REPAIRS,
-              "repairs",
-              "CREATE TABLE %s ("
-              + "parent_id timeuuid, "
-              + "started_at timestamp, "
-              + "last_update timestamp, "
-              + "repaired_at timestamp, "
-              + "state int, "
-              + "coordinator inet, "
-              + "participants set<inet>, "
-              + "ranges set<blob>, "
-              + "cfids set<uuid>, "
-              + "PRIMARY KEY (parent_id))").build();
+          "repairs",
+          "CREATE TABLE %s ("
+          + "parent_id timeuuid, "
+          + "started_at timestamp, "
+          + "last_update timestamp, "
+          + "repaired_at timestamp, "
+          + "state int, "
+          + "coordinator inet, "
+          + "coordinator_port int,"
+          + "participants set<inet>,"
+          + "participants_wp set<text>,"
+          + "ranges set<blob>, "
+          + "cfids set<uuid>, "
+          + "PRIMARY KEY (parent_id))").build();
+
+    @Deprecated
+    private static final TableMetadata LegacyPeers =
+        parse(LEGACY_PEERS,
+            "information about known peers in the cluster",
+            "CREATE TABLE %s ("
+            + "peer inet,"
+            + "data_center text,"
+            + "host_id uuid,"
+            + "preferred_ip inet,"
+            + "rack text,"
+            + "release_version text,"
+            + "rpc_address inet,"
+            + "schema_version uuid,"
+            + "tokens set<varchar>,"
+            + "PRIMARY KEY ((peer)))")
+            .build();
+
+    @Deprecated
+    private static final TableMetadata LegacyPeerEvents =
+        parse(LEGACY_PEER_EVENTS,
+            "events related to peers",
+            "CREATE TABLE %s ("
+            + "peer inet,"
+            + "hints_dropped map<uuid, int>,"
+            + "PRIMARY KEY ((peer)))")
+            .build();
+
+    @Deprecated
+    private static final TableMetadata LegacyTransferredRanges =
+        parse(LEGACY_TRANSFERRED_RANGES,
+            "record of transferred ranges for streaming operation",
+            "CREATE TABLE %s ("
+            + "operation text,"
+            + "peer inet,"
+            + "keyspace_name text,"
+            + "ranges set<blob>,"
+            + "PRIMARY KEY ((operation, keyspace_name), peer))")
+            .build();
 
     private static TableMetadata.Builder parse(String table, String 
description, String cql)
     {
@@ -331,14 +387,17 @@ public final class SystemKeyspace
                          Batches,
                          Paxos,
                          Local,
-                         Peers,
-                         PeerEvents,
+                         PeersV2,
+                         LegacyPeers,
+                         PeerEventsV2,
+                         LegacyPeerEvents,
                          RangeXfers,
                          CompactionHistory,
                          SSTableActivity,
                          SizeEstimates,
                          AvailableRanges,
-                         TransferredRanges,
+                         TransferredRangesV2,
+                         LegacyTransferredRanges,
                          ViewBuildsInProgress,
                          BuiltViews,
                          PreparedStatements,
@@ -384,9 +443,12 @@ public final class SystemKeyspace
                      "rack," +
                      "partitioner," +
                      "rpc_address," +
+                     "rpc_port," +
                      "broadcast_address," +
-                     "listen_address" +
-                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                     "broadcast_port," +
+                     "listen_address," +
+                     "listen_port" +
+                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         executeOnceInternal(format(req, LOCAL),
                             LOCAL,
@@ -394,12 +456,15 @@ public final class SystemKeyspace
                             FBUtilities.getReleaseVersionString(),
                             QueryProcessor.CQL_VERSION.toString(),
                             String.valueOf(ProtocolVersion.CURRENT.asInt()),
-                            
snitch.getDatacenter(FBUtilities.getBroadcastAddress()),
-                            snitch.getRack(FBUtilities.getBroadcastAddress()),
+                            
snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()),
+                            
snitch.getRack(FBUtilities.getBroadcastAddressAndPort()),
                             
DatabaseDescriptor.getPartitioner().getClass().getName(),
                             DatabaseDescriptor.getRpcAddress(),
-                            FBUtilities.getBroadcastAddress(),
-                            FBUtilities.getLocalAddress());
+                            DatabaseDescriptor.getNativeTransportPort(),
+                            FBUtilities.getJustBroadcastAddress(),
+                            DatabaseDescriptor.getStoragePort(),
+                            FBUtilities.getJustLocalAddress(),
+                            DatabaseDescriptor.getStoragePort());
     }
 
     public static void updateCompactionHistory(String ksname,
@@ -461,11 +526,10 @@ public final class SystemKeyspace
     {
         String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND 
view_name = ?";
         executeInternal(String.format(buildReq, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEW_BUILDS_IN_PROGRESS), keyspaceName, 
viewName);
-        forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS);
 
         String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND 
view_name = ? IF EXISTS";
         executeInternal(String.format(builtReq, 
SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName);
-        forceBlockingFlush(BUILT_VIEWS);
+        forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS, BUILT_VIEWS);
     }
 
     public static void finishViewBuildStatus(String ksname, String viewName)
@@ -609,39 +673,64 @@ public final class SystemKeyspace
     /**
      * Record tokens being used by another node
      */
-    public static synchronized void updateTokens(InetAddress ep, 
Collection<Token> tokens)
+    public static synchronized void updateTokens(InetAddressAndPort ep, 
Collection<Token> tokens)
     {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
             return;
 
         String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)";
-        executeInternal(format(req, PEERS), ep, tokensAsSet(tokens));
+        executeInternal(String.format(req, LEGACY_PEERS), ep.address, 
tokensAsSet(tokens));
+        req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, 
?)";
+        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, 
tokensAsSet(tokens));
     }
 
-    public static synchronized void updatePreferredIP(InetAddress ep, 
InetAddress preferred_ip)
+    public static synchronized void updatePreferredIP(InetAddressAndPort ep, 
InetAddressAndPort preferred_ip)
     {
         if (getPreferredIP(ep) == preferred_ip)
             return;
 
         String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, 
?)";
-        executeInternal(format(req, PEERS), ep, preferred_ip);
-        forceBlockingFlush(PEERS);
+        executeInternal(String.format(req, LEGACY_PEERS), ep.address, 
preferred_ip.address);
+        req = "INSERT INTO system.%s (peer, peer_port, preferred_ip, 
preferred_port) VALUES (?, ?, ?, ?)";
+        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, 
preferred_ip.address, preferred_ip.port);
+        forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
     }
 
-    public static synchronized void updatePeerInfo(InetAddress ep, String 
columnName, Object value)
+    public static synchronized void updatePeerInfo(InetAddressAndPort ep, 
String columnName, Object value)
     {
-        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
             return;
 
         String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
-        executeInternal(format(req, PEERS, columnName), ep, value);
+        executeInternal(String.format(req, LEGACY_PEERS, columnName), 
ep.address, value);
+        //This column doesn't match across the two tables
+        if (columnName.equals("rpc_address"))
+        {
+            columnName = "native_address";
+        }
+        req = "INSERT INTO system.%s (peer, peer_port, %s) VALUES (?, ?, ?)";
+        executeInternal(String.format(req, PEERS_V2, columnName), ep.address, 
ep.port, value);
     }
 
-    public static synchronized void updateHintsDropped(InetAddress ep, UUID 
timePeriod, int value)
+    public static synchronized void updatePeerNativeAddress(InetAddressAndPort 
ep, InetAddressAndPort address)
+    {
+        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
+            return;
+
+        String req = "INSERT INTO system.%s (peer, rpc_address) VALUES (?, ?)";
+        executeInternal(String.format(req, LEGACY_PEERS), ep.address, 
address.address);
+        req = "INSERT INTO system.%s (peer, peer_port, native_address, 
native_port) VALUES (?, ?, ?, ?)";
+        executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, 
address.address, address.port);
+    }
+
+
+    public static synchronized void updateHintsDropped(InetAddressAndPort ep, 
UUID timePeriod, int value)
     {
         // with 30 day TTL
         String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? 
] = ? WHERE peer = ?";
-        executeInternal(format(req, PEER_EVENTS), timePeriod, value, ep);
+        executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, 
value, ep.address);
+        req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? 
WHERE peer = ? AND peer_port = ?";
+        executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, 
ep.address, ep.port);
     }
 
     public static synchronized void updateSchemaVersion(UUID version)
@@ -673,11 +762,13 @@ public final class SystemKeyspace
     /**
      * Remove stored tokens being used by another node
      */
-    public static synchronized void removeEndpoint(InetAddress ep)
+    public static synchronized void removeEndpoint(InetAddressAndPort ep)
     {
         String req = "DELETE FROM system.%s WHERE peer = ?";
-        executeInternal(format(req, PEERS), ep);
-        forceBlockingFlush(PEERS);
+        executeInternal(String.format(req, LEGACY_PEERS), ep.address);
+        req = String.format("DELETE FROM system.%s WHERE peer = ? AND 
peer_port = ?", PEERS_V2);
+        executeInternal(req, ep.address, ep.port);
+        forceBlockingFlush(LEGACY_PEERS, PEERS_V2);
     }
 
     /**
@@ -696,22 +787,32 @@ public final class SystemKeyspace
         forceBlockingFlush(LOCAL);
     }
 
-    public static void forceBlockingFlush(String cfname)
+    public static void forceBlockingFlush(String ...cfnames)
     {
         if (!DatabaseDescriptor.isUnsafeSystem())
-            
FBUtilities.waitOnFuture(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(cfname).forceFlush());
+        {
+            List<ListenableFuture<CommitLogPosition>> futures = new 
ArrayList<>();
+
+            for (String cfname : cfnames)
+            {
+                
futures.add(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(cfname).forceFlush());
+            }
+            FBUtilities.waitOnFutures(futures);
+        }
     }
 
     /**
      * Return a map of stored tokens to IP addresses
      *
      */
-    public static SetMultimap<InetAddress, Token> loadTokens()
+    public static SetMultimap<InetAddressAndPort, Token> loadTokens()
     {
-        SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens 
FROM system." + PEERS))
+        SetMultimap<InetAddressAndPort, Token> tokenMap = 
HashMultimap.create();
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, 
peer_port, tokens FROM system." + PEERS_V2))
         {
-            InetAddress peer = row.getInetAddress("peer");
+            InetAddress address = row.getInetAddress("peer");
+            Integer port = row.getInt("peer_port");
+            InetAddressAndPort peer = 
InetAddressAndPort.getByAddressOverrideDefaults(address, port);
             if (row.has("tokens"))
                 tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", 
UTF8Type.instance)));
         }
@@ -723,12 +824,14 @@ public final class SystemKeyspace
      * Return a map of store host_ids to IP addresses
      *
      */
-    public static Map<InetAddress, UUID> loadHostIds()
+    public static Map<InetAddressAndPort, UUID> loadHostIds()
     {
-        Map<InetAddress, UUID> hostIdMap = new HashMap<>();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id 
FROM system." + PEERS))
+        Map<InetAddressAndPort, UUID> hostIdMap = new HashMap<>();
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, 
peer_port, host_id FROM system." + PEERS_V2))
         {
-            InetAddress peer = row.getInetAddress("peer");
+            InetAddress address = row.getInetAddress("peer");
+            Integer port = row.getInt("peer_port");
+            InetAddressAndPort peer = 
InetAddressAndPort.getByAddressOverrideDefaults(address, port);
             if (row.has("host_id"))
             {
                 hostIdMap.put(peer, row.getUUID("host_id"));
@@ -743,24 +846,29 @@ public final class SystemKeyspace
      * @param ep endpoint address to check
      * @return Preferred IP for given endpoint if present, otherwise returns 
given ep
      */
-    public static InetAddress getPreferredIP(InetAddress ep)
+    public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep)
     {
-        String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
-        UntypedResultSet result = executeInternal(format(req, PEERS), ep);
+        String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE 
peer=? AND peer_port = ?";
+        UntypedResultSet result = executeInternal(String.format(req, 
PEERS_V2), ep.address, ep.port);
         if (!result.isEmpty() && result.one().has("preferred_ip"))
-            return result.one().getInetAddress("preferred_ip");
+        {
+            UntypedResultSet.Row row = result.one();
+            return 
InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("preferred_ip"),
 row.getInt("preferred_port"));
+        }
         return ep;
     }
 
     /**
      * Return a map of IP addresses containing a map of dc and rack info
      */
-    public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
+    public static Map<InetAddressAndPort, Map<String,String>> loadDcRackInfo()
     {
-        Map<InetAddress, Map<String, String>> result = new HashMap<>();
-        for (UntypedResultSet.Row row : executeInternal("SELECT peer, 
data_center, rack from system." + PEERS))
+        Map<InetAddressAndPort, Map<String, String>> result = new HashMap<>();
+        for (UntypedResultSet.Row row : executeInternal("SELECT peer, 
peer_port, data_center, rack from system." + PEERS_V2))
         {
-            InetAddress peer = row.getInetAddress("peer");
+            InetAddress address = row.getInetAddress("peer");
+            Integer port = row.getInt("peer_port");
+            InetAddressAndPort peer = 
InetAddressAndPort.getByAddressOverrideDefaults(address, port);
             if (row.has("data_center") && row.has("rack"))
             {
                 Map<String, String> dcRack = new HashMap<>();
@@ -779,16 +887,16 @@ public final class SystemKeyspace
      * @param ep endpoint address to check
      * @return Release version or null if version is unknown.
      */
-    public static CassandraVersion getReleaseVersion(InetAddress ep)
+    public static CassandraVersion getReleaseVersion(InetAddressAndPort ep)
     {
         try
         {
-            if (FBUtilities.getBroadcastAddress().equals(ep))
+            if (FBUtilities.getBroadcastAddressAndPort().equals(ep))
             {
                 return new 
CassandraVersion(FBUtilities.getReleaseVersionString());
             }
-            String req = "SELECT release_version FROM system.%s WHERE peer=?";
-            UntypedResultSet result = executeInternal(format(req, PEERS), ep);
+            String req = "SELECT release_version FROM system.%s WHERE peer=? 
AND peer_port=?";
+            UntypedResultSet result = executeInternal(String.format(req, 
PEERS_V2), ep.address, ep.port);
             if (result != null && result.one().has("release_version"))
             {
                 return new 
CassandraVersion(result.one().getString("release_version"));
@@ -1197,7 +1305,7 @@ public final class SystemKeyspace
     }
 
     public static synchronized void updateTransferredRanges(StreamOperation 
streamOperation,
-                                                         InetAddress peer,
+                                                         InetAddressAndPort 
peer,
                                                          String keyspace,
                                                          
Collection<Range<Token>> streamedRanges)
     {
@@ -1207,17 +1315,21 @@ public final class SystemKeyspace
         {
             rangesToUpdate.add(rangeToBytes(range));
         }
-        executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, 
streamOperation.getDescription(), peer, keyspace);
+        executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), 
rangesToUpdate, streamOperation.getDescription(), peer.address, keyspace);
+        cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation = ? 
AND peer = ? AND peer_port = ? AND keyspace_name = ?";
+        executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), 
rangesToUpdate, streamOperation.getDescription(), peer.address, peer.port, 
keyspace);
     }
 
-    public static synchronized Map<InetAddress, Set<Range<Token>>> 
getTransferredRanges(String description, String keyspace, IPartitioner 
partitioner)
+    public static synchronized Map<InetAddressAndPort, Set<Range<Token>>> 
getTransferredRanges(String description, String keyspace, IPartitioner 
partitioner)
     {
-        Map<InetAddress, Set<Range<Token>>> result = new HashMap<>();
+        Map<InetAddressAndPort, Set<Range<Token>>> result = new HashMap<>();
         String query = "SELECT * FROM system.%s WHERE operation = ? AND 
keyspace_name = ?";
-        UntypedResultSet rs = executeInternal(format(query, 
TRANSFERRED_RANGES), description, keyspace);
+        UntypedResultSet rs = executeInternal(String.format(query, 
TRANSFERRED_RANGES_V2), description, keyspace);
         for (UntypedResultSet.Row row : rs)
         {
-            InetAddress peer = row.getInetAddress("peer");
+            InetAddress peerAddress = row.getInetAddress("peer");
+            int port = row.getInt("peer_port");
+            InetAddressAndPort peer = 
InetAddressAndPort.getByAddressOverrideDefaults(peerAddress, port);
             Set<ByteBuffer> rawRanges = row.getSet("ranges", 
BytesType.instance);
             Set<Range<Token>> ranges = 
Sets.newHashSetWithExpectedSize(rawRanges.size());
             for (ByteBuffer rawRange : rawRanges)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java 
b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
new file mode 100644
index 0000000..ea5ff59
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+/**
+ * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra 
columns and some keys
+ * that are changed.
+ *
+ * Can't just add the additional columns because they are primary key columns 
and C* doesn't support changing
+ * key columns even if it's just clustering columns.
+ */
+public class SystemKeyspaceMigrator40
+{
+    static final String legacyPeersName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS);
+    static final String peersName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2);
+    static final String legacyPeerEventsName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS);
+    static final String peerEventsName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
+    static final String legacyTransferredRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
+    static final String transferredRangesName = String.format("%s.%s", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SystemKeyspaceMigrator40.class);
+
+    private SystemKeyspaceMigrator40() {}
+
+    public static void migrate()
+    {
+        migratePeers();
+        migratePeerEvents();
+        migrateTransferredRanges();
+    }
+
+    private static void migratePeers()
+    {
+        ColumnFamilyStore newPeers = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2);
+
+        if (!newPeers.isEmpty())
+             return;
+
+        logger.info("{} table was empty, migrating legacy {}, if this fails 
you should fix the issue and then truncate {} to have it try again.",
+                                  peersName, legacyPeersName, peersName);
+
+        String query = String.format("SELECT * FROM %s",
+                                     legacyPeersName);
+
+        String insert = String.format("INSERT INTO %s ( "
+                                      + "peer, "
+                                      + "peer_port, "
+                                      + "data_center, "
+                                      + "host_id, "
+                                      + "preferred_ip, "
+                                      + "preferred_port, "
+                                      + "rack, "
+                                      + "release_version, "
+                                      + "native_address, "
+                                      + "native_port, "
+                                      + "schema_version, "
+                                      + "tokens) "
+                                      + " values ( ?, ?, ? , ? , ?, ?, ?, ?, 
?, ?, ?, ?)",
+                                      peersName);
+
+        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
+        int transferred = 0;
+        logger.info("Migrating rows from legacy {} to {}", legacyPeersName, 
peersName);
+        for (UntypedResultSet.Row row : rows)
+        {
+            logger.debug("Transferring row {}", transferred);
+            QueryProcessor.executeInternal(insert,
+                                           row.has("peer") ? 
row.getInetAddress("peer") : null,
+                                           DatabaseDescriptor.getStoragePort(),
+                                           row.has("data_center") ? 
row.getString("data_center") : null,
+                                           row.has("host_id") ? 
row.getUUID("host_id") : null,
+                                           row.has("preferred_ip") ? 
row.getInetAddress("preferred_ip") : null,
+                                           DatabaseDescriptor.getStoragePort(),
+                                           row.has("rack") ? 
row.getString("rack") : null,
+                                           row.has("release_version") ? 
row.getString("release_version") : null,
+                                           row.has("rpc_address") ? 
row.getInetAddress("rpc_address") : null,
+                                           
DatabaseDescriptor.getNativeTransportPort(),
+                                           row.has("schema_version") ? 
row.getUUID("schema_version") : null,
+                                           row.has("tokens") ? 
row.getSet("tokens", UTF8Type.instance) : null);
+            transferred++;
+        }
+        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyPeersName, peersName);
+    }
+
+    private static void migratePeerEvents()
+    {
+        ColumnFamilyStore newPeerEvents = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEER_EVENTS_V2);
+
+        if (!newPeerEvents.isEmpty())
+            return;
+
+        logger.info("{} table was empty, migrating legacy {} to {}", 
peerEventsName, legacyPeerEventsName, peerEventsName);
+
+        String query = String.format("SELECT * FROM %s",
+                                     legacyPeerEventsName);
+
+        String insert = String.format("INSERT INTO %s ( "
+                                      + "peer, "
+                                      + "peer_port, "
+                                      + "hints_dropped) "
+                                      + " values ( ?, ?, ? )",
+                                      peerEventsName);
+
+        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
+        int transferred = 0;
+        for (UntypedResultSet.Row row : rows)
+        {
+            logger.debug("Transferring row {}", transferred);
+            QueryProcessor.executeInternal(insert,
+                                           row.has("peer") ? 
row.getInetAddress("peer") : null,
+                                           DatabaseDescriptor.getStoragePort(),
+                                           row.has("hints_dropped") ? 
row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance) : null);
+            transferred++;
+        }
+        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyPeerEventsName, peerEventsName);
+    }
+
+    static void migrateTransferredRanges()
+    {
+        ColumnFamilyStore newTransferredRanges = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.TRANSFERRED_RANGES_V2);
+
+        if (!newTransferredRanges.isEmpty())
+            return;
+
+        logger.info("{} table was empty, migrating legacy {} to {}", 
transferredRangesName, legacyTransferredRangesName, transferredRangesName);
+
+        String query = String.format("SELECT * FROM %s",
+                                     legacyTransferredRangesName);
+
+        String insert = String.format("INSERT INTO %s ("
+                                      + "operation, "
+                                      + "peer, "
+                                      + "peer_port, "
+                                      + "keyspace_name, "
+                                      + "ranges) "
+                                      + " values ( ?, ?, ? , ?, ?)",
+                                      transferredRangesName);
+
+        UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(query, 1000);
+        int transferred = 0;
+        for (UntypedResultSet.Row row : rows)
+        {
+            logger.debug("Transferring row {}", transferred);
+            QueryProcessor.executeInternal(insert,
+                                           row.has("operation") ? 
row.getString("operation") : null,
+                                           row.has("peer") ? 
row.getInetAddress("peer") : null,
+                                           DatabaseDescriptor.getStoragePort(),
+                                           row.has("keyspace_name") ? 
row.getString("keyspace_name") : null,
+                                           row.has("ranges") ? 
row.getSet("ranges", BytesType.instance) : null);
+            transferred++;
+        }
+
+        logger.info("Migrated {} rows from legacy {} to {}", transferred, 
legacyTransferredRangesName, transferredRangesName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java 
b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 4dc1766..df16943 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db.view;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -27,6 +26,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -58,14 +58,14 @@ public final class ViewUtils
      *
      * @return Optional.empty() if this method is called using a base token 
which does not belong to this replica
      */
-    public static Optional<InetAddress> getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
+    public static Optional<InetAddressAndPort> getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
     {
         AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
 
-        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-        List<InetAddress> baseEndpoints = new ArrayList<>();
-        List<InetAddress> viewEndpoints = new ArrayList<>();
-        for (InetAddress baseEndpoint : 
replicationStrategy.getNaturalEndpoints(baseToken))
+        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        List<InetAddressAndPort> baseEndpoints = new ArrayList<>();
+        List<InetAddressAndPort> viewEndpoints = new ArrayList<>();
+        for (InetAddressAndPort baseEndpoint : 
replicationStrategy.getNaturalEndpoints(baseToken))
         {
             // An endpoint is local if we're not using Net
             if (!(replicationStrategy instanceof NetworkTopologyStrategy) ||
@@ -73,10 +73,10 @@ public final class ViewUtils
                 baseEndpoints.add(baseEndpoint);
         }
 
-        for (InetAddress viewEndpoint : 
replicationStrategy.getNaturalEndpoints(viewToken))
+        for (InetAddressAndPort viewEndpoint : 
replicationStrategy.getNaturalEndpoints(viewToken))
         {
             // If we are a base endpoint which is also a view replica, we use 
ourselves as our view replica
-            if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
+            if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                 return Optional.of(viewEndpoint);
 
             // We have to remove any endpoint which is shared between the base 
and the view, as it will select itself
@@ -92,7 +92,7 @@ public final class ViewUtils
         // Since the same replication strategy is used, the same placement 
should be used and we should get the same
         // number of replicas for all of the tokens in the ring.
         assert baseEndpoints.size() == viewEndpoints.size() : "Replication 
strategy should have the same number of endpoints for the base and the view";
-        int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
+        int baseIdx = 
baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort());
 
         if (baseIdx < 0)
             //This node is not a base replica of this key, so we return empty

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java 
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index a25f867..432586b 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.dht;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,6 +37,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.*;
@@ -51,12 +51,12 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
     private static final Logger logger = 
LoggerFactory.getLogger(BootStrapper.class);
 
     /* endpoint that needs to be bootstrapped */
-    protected final InetAddress address;
+    protected final InetAddressAndPort address;
     /* token of the node being bootstrapped. */
     protected final Collection<Token> tokens;
     protected final TokenMetadata tokenMetadata;
 
-    public BootStrapper(InetAddress address, Collection<Token> tokens, 
TokenMetadata tmd)
+    public BootStrapper(InetAddressAndPort address, Collection<Token> tokens, 
TokenMetadata tmd)
     {
         assert address != null;
         assert tokens != null && !tokens.isEmpty();
@@ -159,7 +159,7 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
      * otherwise, if allocationKeyspace is specified use the token allocation 
algorithm to generate suitable tokens
      * else choose num_tokens tokens at random
      */
-    public static Collection<Token> getBootstrapTokens(final TokenMetadata 
metadata, InetAddress address, int schemaWaitDelay) throws 
ConfigurationException
+    public static Collection<Token> getBootstrapTokens(final TokenMetadata 
metadata, InetAddressAndPort address, int schemaWaitDelay) throws 
ConfigurationException
     {
         String allocationKeyspace = 
DatabaseDescriptor.getAllocateTokensForKeyspace();
         Collection<String> initialTokens = 
DatabaseDescriptor.getInitialTokens();
@@ -199,13 +199,13 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
     }
 
     static Collection<Token> allocateTokens(final TokenMetadata metadata,
-                                            InetAddress address,
+                                            InetAddressAndPort address,
                                             String allocationKeyspace,
                                             int numTokens,
                                             int schemaWaitDelay)
     {
         StorageService.instance.waitForSchema(schemaWaitDelay);
-        if 
(!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+        if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
             Gossiper.waitToSettle();
 
         Keyspace ks = Keyspace.open(allocationKeyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java 
b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index d407212..b90bc96 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@ public class RangeFetchMapCalculator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RangeFetchMapCalculator.class);
     private static final long TRIVIAL_RANGE_LIMIT = 1000;
-    private final Multimap<Range<Token>, InetAddress> rangesWithSources;
+    private final Multimap<Range<Token>, InetAddressAndPort> rangesWithSources;
     private final Collection<RangeStreamer.ISourceFilter> sourceFilters;
     private final String keyspace;
     //We need two Vertices to act as source and destination in the algorithm
@@ -80,7 +81,7 @@ public class RangeFetchMapCalculator
     private final Vertex destinationVertex = 
OuterVertex.getDestinationVertex();
     private final Set<Range<Token>> trivialRanges;
 
-    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddress> 
rangesWithSources,
+    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddressAndPort> 
rangesWithSources,
                                    Collection<RangeStreamer.ISourceFilter> 
sourceFilters,
                                    String keyspace)
     {
@@ -108,16 +109,16 @@ public class RangeFetchMapCalculator
         return false;
     }
 
-    public Multimap<InetAddress, Range<Token>> getRangeFetchMap()
+    public Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMap()
     {
-        Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create();
+        Multimap<InetAddressAndPort, Range<Token>> fetchMap = 
HashMultimap.create();
         fetchMap.putAll(getRangeFetchMapForNonTrivialRanges());
         fetchMap.putAll(getRangeFetchMapForTrivialRanges(fetchMap));
         return fetchMap;
     }
 
     @VisibleForTesting
-    Multimap<InetAddress, Range<Token>> getRangeFetchMapForNonTrivialRanges()
+    Multimap<InetAddressAndPort, Range<Token>> 
getRangeFetchMapForNonTrivialRanges()
     {
         //Get the graph with edges between ranges and their source endpoints
         MutableCapacityGraph<Vertex, Integer> graph = getGraph();
@@ -148,19 +149,19 @@ public class RangeFetchMapCalculator
     }
 
     @VisibleForTesting
-    Multimap<InetAddress, Range<Token>> 
getRangeFetchMapForTrivialRanges(Multimap<InetAddress, Range<Token>> 
optimisedMap)
+    Multimap<InetAddressAndPort, Range<Token>> 
getRangeFetchMapForTrivialRanges(Multimap<InetAddressAndPort, Range<Token>> 
optimisedMap)
     {
-        Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create();
+        Multimap<InetAddressAndPort, Range<Token>> fetchMap = 
HashMultimap.create();
         for (Range<Token> trivialRange : trivialRanges)
         {
             boolean added = false;
             boolean localDCCheck = true;
             while (!added)
             {
-                List<InetAddress> srcs = new 
ArrayList<>(rangesWithSources.get(trivialRange));
+                List<InetAddressAndPort> srcs = new 
ArrayList<>(rangesWithSources.get(trivialRange));
                 // sort with the endpoint having the least number of streams 
first:
                 srcs.sort(Comparator.comparingInt(o -> 
optimisedMap.get(o).size()));
-                for (InetAddress src : srcs)
+                for (InetAddressAndPort src : srcs)
                 {
                     if (passFilters(src, localDCCheck))
                     {
@@ -202,9 +203,9 @@ public class RangeFetchMapCalculator
      * @param result Flow algorithm result
      * @return  Multi Map of Machine to Ranges
      */
-    private Multimap<InetAddress, Range<Token>> 
getRangeFetchMapFromGraphResult(MutableCapacityGraph<Vertex, Integer> graph, 
MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>> result)
+    private Multimap<InetAddressAndPort, Range<Token>> 
getRangeFetchMapFromGraphResult(MutableCapacityGraph<Vertex, Integer> graph, 
MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>> result)
     {
-        final Multimap<InetAddress, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
+        final Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
         if(result == null)
             return rangeFetchMapMap;
         final Function<CapacityEdge<Vertex, Integer>, Integer> flowFunction = 
result.calcFlowFunction();
@@ -346,13 +347,13 @@ public class RangeFetchMapCalculator
     private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> 
capacityGraph, RangeVertex rangeVertex, boolean localDCCheck)
     {
         boolean sourceFound = false;
-        for (InetAddress endpoint : 
rangesWithSources.get(rangeVertex.getRange()))
+        for (InetAddressAndPort endpoint : 
rangesWithSources.get(rangeVertex.getRange()))
         {
             if (passFilters(endpoint, localDCCheck))
             {
                 sourceFound = true;
                 // if we pass filters, it means that we don't filter away 
localhost and we can count it as a source:
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                     continue; // but don't add localhost to the graph to avoid 
streaming locally
                 final Vertex endpointVertex = new EndpointVertex(endpoint);
                 capacityGraph.insertVertex(rangeVertex);
@@ -363,7 +364,7 @@ public class RangeFetchMapCalculator
         return sourceFound;
     }
 
-    private boolean isInLocalDC(InetAddress endpoint)
+    private boolean isInLocalDC(InetAddressAndPort endpoint)
     {
         return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
     }
@@ -374,7 +375,7 @@ public class RangeFetchMapCalculator
      * @param localDCCheck Allow endpoints with local DC
      * @return   True if filters pass this endpoint
      */
-    private boolean passFilters(final InetAddress endpoint, boolean 
localDCCheck)
+    private boolean passFilters(final InetAddressAndPort endpoint, boolean 
localDCCheck)
     {
         for (RangeStreamer.ISourceFilter filter : sourceFilters)
         {
@@ -410,15 +411,15 @@ public class RangeFetchMapCalculator
      */
     private static class EndpointVertex extends Vertex
     {
-        private final InetAddress endpoint;
+        private final InetAddressAndPort endpoint;
 
-        public EndpointVertex(InetAddress endpoint)
+        public EndpointVertex(InetAddressAndPort endpoint)
         {
             assert endpoint != null;
             this.endpoint = endpoint;
         }
 
-        public InetAddress getEndpoint()
+        public InetAddressAndPort getEndpoint()
         {
             return endpoint;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index eabb212..439ebc6 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.dht;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -26,8 +25,9 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.service.ActiveRepairService;
+
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,10 +59,10 @@ public class RangeStreamer
     /* current token ring */
     private final TokenMetadata metadata;
     /* address of this node */
-    private final InetAddress address;
+    private final InetAddressAndPort address;
     /* streaming description */
     private final String description;
-    private final Multimap<String, Map.Entry<InetAddress, 
Collection<Range<Token>>>> toFetch = HashMultimap.create();
+    private final Multimap<String, Map.Entry<InetAddressAndPort, 
Collection<Range<Token>>>> toFetch = HashMultimap.create();
     private final Set<ISourceFilter> sourceFilters = new HashSet<>();
     private final StreamPlan streamPlan;
     private final boolean useStrictConsistency;
@@ -74,7 +74,7 @@ public class RangeStreamer
      */
     public static interface ISourceFilter
     {
-        public boolean shouldInclude(InetAddress endpoint);
+        public boolean shouldInclude(InetAddressAndPort endpoint);
     }
 
     /**
@@ -90,7 +90,7 @@ public class RangeStreamer
             this.fd = fd;
         }
 
-        public boolean shouldInclude(InetAddress endpoint)
+        public boolean shouldInclude(InetAddressAndPort endpoint)
         {
             return fd.isAlive(endpoint);
         }
@@ -110,7 +110,7 @@ public class RangeStreamer
             this.snitch = snitch;
         }
 
-        public boolean shouldInclude(InetAddress endpoint)
+        public boolean shouldInclude(InetAddressAndPort endpoint)
         {
             return snitch.getDatacenter(endpoint).equals(sourceDc);
         }
@@ -121,9 +121,9 @@ public class RangeStreamer
      */
     public static class ExcludeLocalNodeFilter implements ISourceFilter
     {
-        public boolean shouldInclude(InetAddress endpoint)
+        public boolean shouldInclude(InetAddressAndPort endpoint)
         {
-            return !FBUtilities.getBroadcastAddress().equals(endpoint);
+            return !FBUtilities.getBroadcastAddressAndPort().equals(endpoint);
         }
     }
 
@@ -132,14 +132,14 @@ public class RangeStreamer
      */
     public static class WhitelistedSourcesFilter implements ISourceFilter
     {
-        private final Set<InetAddress> whitelistedSources;
+        private final Set<InetAddressAndPort> whitelistedSources;
 
-        public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources)
+        public WhitelistedSourcesFilter(Set<InetAddressAndPort> 
whitelistedSources)
         {
             this.whitelistedSources = whitelistedSources;
         }
 
-        public boolean shouldInclude(InetAddress endpoint)
+        public boolean shouldInclude(InetAddressAndPort endpoint)
         {
             return whitelistedSources.contains(endpoint);
         }
@@ -147,7 +147,7 @@ public class RangeStreamer
 
     public RangeStreamer(TokenMetadata metadata,
                          Collection<Token> tokens,
-                         InetAddress address,
+                         InetAddressAndPort address,
                          StreamOperation streamOperation,
                          boolean useStrictConsistency,
                          IEndpointSnitch snitch,
@@ -186,18 +186,18 @@ public class RangeStreamer
         }
 
         boolean useStrictSource = useStrictSourcesForRanges(keyspaceName);
-        Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSource
+        Multimap<Range<Token>, InetAddressAndPort> rangesForKeyspace = 
useStrictSource
                 ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : 
getAllRangesWithSourcesFor(keyspaceName, ranges);
 
-        for (Map.Entry<Range<Token>, InetAddress> entry : 
rangesForKeyspace.entries())
+        for (Map.Entry<Range<Token>, InetAddressAndPort> entry : 
rangesForKeyspace.entries())
             logger.info("{}: range {} exists on {} for keyspace {}", 
description, entry.getKey(), entry.getValue(), keyspaceName);
 
         AbstractReplicationStrategy strat = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-        Multimap<InetAddress, Range<Token>> rangeFetchMap = useStrictSource || 
strat == null || strat.getReplicationFactor() == 1
+        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMap = 
useStrictSource || strat == null || strat.getReplicationFactor() == 1
                                                             ? 
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, 
useStrictConsistency)
                                                             : 
getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName);
 
-        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
rangeFetchMap.asMap().entrySet())
+        for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : 
rangeFetchMap.asMap().entrySet())
         {
             if (logger.isTraceEnabled())
             {
@@ -226,19 +226,19 @@ public class RangeStreamer
      *
      * @throws java.lang.IllegalStateException when there is no source to get 
data streamed
      */
-    private Multimap<Range<Token>, InetAddress> 
getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> 
desiredRanges)
+    private Multimap<Range<Token>, InetAddressAndPort> 
getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> 
desiredRanges)
     {
         AbstractReplicationStrategy strat = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = 
strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
+        Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = 
strat.getRangeAddresses(metadata.cloneOnlyTokenMap());
 
-        Multimap<Range<Token>, InetAddress> rangeSources = 
ArrayListMultimap.create();
+        Multimap<Range<Token>, InetAddressAndPort> rangeSources = 
ArrayListMultimap.create();
         for (Range<Token> desiredRange : desiredRanges)
         {
             for (Range<Token> range : rangeAddresses.keySet())
             {
                 if (range.contains(desiredRange))
                 {
-                    List<InetAddress> preferred = 
snitch.getSortedListByProximity(address, rangeAddresses.get(range));
+                    List<InetAddressAndPort> preferred = 
snitch.getSortedListByProximity(address, rangeAddresses.get(range));
                     rangeSources.putAll(desiredRange, preferred);
                     break;
                 }
@@ -258,30 +258,30 @@ public class RangeStreamer
      *
      * @throws java.lang.IllegalStateException when there is no source to get 
data streamed, or more than 1 source found.
      */
-    private Multimap<Range<Token>, InetAddress> 
getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> 
desiredRanges)
+    private Multimap<Range<Token>, InetAddressAndPort> 
getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> 
desiredRanges)
     {
         assert tokens != null;
         AbstractReplicationStrategy strat = 
Keyspace.open(keyspace).getReplicationStrategy();
 
         // Active ranges
         TokenMetadata metadataClone = metadata.cloneOnlyTokenMap();
-        Multimap<Range<Token>, InetAddress> addressRanges = 
strat.getRangeAddresses(metadataClone);
+        Multimap<Range<Token>, InetAddressAndPort> addressRanges = 
strat.getRangeAddresses(metadataClone);
 
         // Pending ranges
         metadataClone.updateNormalTokens(tokens, address);
-        Multimap<Range<Token>, InetAddress> pendingRangeAddresses = 
strat.getRangeAddresses(metadataClone);
+        Multimap<Range<Token>, InetAddressAndPort> pendingRangeAddresses = 
strat.getRangeAddresses(metadataClone);
 
         // Collects the source that will have its range moved to the new node
-        Multimap<Range<Token>, InetAddress> rangeSources = 
ArrayListMultimap.create();
+        Multimap<Range<Token>, InetAddressAndPort> rangeSources = 
ArrayListMultimap.create();
 
         for (Range<Token> desiredRange : desiredRanges)
         {
-            for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : 
addressRanges.asMap().entrySet())
+            for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> 
preEntry : addressRanges.asMap().entrySet())
             {
                 if (preEntry.getKey().contains(desiredRange))
                 {
-                    Set<InetAddress> oldEndpoints = 
Sets.newHashSet(preEntry.getValue());
-                    Set<InetAddress> newEndpoints = 
Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
+                    Set<InetAddressAndPort> oldEndpoints = 
Sets.newHashSet(preEntry.getValue());
+                    Set<InetAddressAndPort> newEndpoints = 
Sets.newHashSet(pendingRangeAddresses.get(desiredRange));
 
                     // Due to CASSANDRA-5953 we can have a higher RF then we 
have endpoints.
                     // So we need to be careful to only be strict when 
endpoints == RF
@@ -296,14 +296,14 @@ public class RangeStreamer
             }
 
             // Validate
-            Collection<InetAddress> addressList = 
rangeSources.get(desiredRange);
+            Collection<InetAddressAndPort> addressList = 
rangeSources.get(desiredRange);
             if (addressList == null || addressList.isEmpty())
                 throw new IllegalStateException("No sources found for " + 
desiredRange);
 
             if (addressList.size() > 1)
                 throw new IllegalStateException("Multiple endpoints found for 
" + desiredRange);
 
-            InetAddress sourceIp = addressList.iterator().next();
+            InetAddressAndPort sourceIp = addressList.iterator().next();
             EndpointState sourceState = 
Gossiper.instance.getEndpointStateForEndpoint(sourceIp);
             if (Gossiper.instance.isEnabled() && (sourceState == null || 
!sourceState.isAlive()))
                 throw new RuntimeException("A node required to move the data 
consistently is down (" + sourceIp + "). " +
@@ -320,17 +320,17 @@ public class RangeStreamer
      * @param keyspace keyspace name
      * @return Map of source endpoint to collection of ranges
      */
-    private static Multimap<InetAddress, Range<Token>> 
getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources,
-                                                                        
Collection<ISourceFilter> sourceFilters, String keyspace,
-                                                                        
boolean useStrictConsistency)
+    private static Multimap<InetAddressAndPort, Range<Token>> 
getRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources,
+                                                                               
Collection<ISourceFilter> sourceFilters, String keyspace,
+                                                                               
boolean useStrictConsistency)
     {
-        Multimap<InetAddress, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
+        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
         for (Range<Token> range : rangesWithSources.keySet())
         {
             boolean foundSource = false;
 
             outer:
-            for (InetAddress address : rangesWithSources.get(range))
+            for (InetAddressAndPort address : rangesWithSources.get(range))
             {
                 for (ISourceFilter filter : sourceFilters)
                 {
@@ -338,7 +338,7 @@ public class RangeStreamer
                         continue outer;
                 }
 
-                if (address.equals(FBUtilities.getBroadcastAddress()))
+                if (address.equals(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     // If localhost is a source, we have found one, but we 
don't add it to the map to avoid streaming locally
                     foundSource = true;
@@ -371,11 +371,11 @@ public class RangeStreamer
     }
 
 
-    private static Multimap<InetAddress, Range<Token>> 
getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources,
-                                                                        
Collection<ISourceFilter> sourceFilters, String keyspace)
+    private static Multimap<InetAddressAndPort, Range<Token>> 
getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> 
rangesWithSources,
+                                                                               
         Collection<ISourceFilter> sourceFilters, String keyspace)
     {
         RangeFetchMapCalculator calculator = new 
RangeFetchMapCalculator(rangesWithSources, sourceFilters, keyspace);
-        Multimap<InetAddress, Range<Token>> rangeFetchMapMap = 
calculator.getRangeFetchMap();
+        Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = 
calculator.getRangeFetchMap();
         logger.info("Output from RangeFetchMapCalculator for keyspace {}", 
keyspace);
         validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace);
         return rangeFetchMapMap;
@@ -387,11 +387,11 @@ public class RangeStreamer
      * @param rangeFetchMapMap
      * @param keyspace
      */
-    private static void validateRangeFetchMap(Multimap<Range<Token>, 
InetAddress> rangesWithSources, Multimap<InetAddress, Range<Token>> 
rangeFetchMapMap, String keyspace)
+    private static void validateRangeFetchMap(Multimap<Range<Token>, 
InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, 
Range<Token>> rangeFetchMapMap, String keyspace)
     {
-        for (Map.Entry<InetAddress, Range<Token>> entry : 
rangeFetchMapMap.entries())
+        for (Map.Entry<InetAddressAndPort, Range<Token>> entry : 
rangeFetchMapMap.entries())
         {
-            if(entry.getKey().equals(FBUtilities.getBroadcastAddress()))
+            if(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
             {
                 throw new IllegalStateException("Trying to stream locally. 
Range: " + entry.getValue()
                                         + " in keyspace " + keyspace);
@@ -407,26 +407,26 @@ public class RangeStreamer
         }
     }
 
-    public static Multimap<InetAddress, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String 
keyspace,
-                                                                 
IFailureDetector fd, boolean useStrictConsistency)
+    public static Multimap<InetAddressAndPort, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSourceTarget, 
String keyspace,
+                                                                        
IFailureDetector fd, boolean useStrictConsistency)
     {
         return getRangeFetchMap(rangesWithSourceTarget, 
Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), 
keyspace, useStrictConsistency);
     }
 
     // For testing purposes
     @VisibleForTesting
-    Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> 
toFetch()
+    Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> 
toFetch()
     {
         return toFetch;
     }
 
     public StreamResultFuture fetchAsync()
     {
-        for (Map.Entry<String, Map.Entry<InetAddress, 
Collection<Range<Token>>>> entry : toFetch.entries())
+        for (Map.Entry<String, Map.Entry<InetAddressAndPort, 
Collection<Range<Token>>>> entry : toFetch.entries())
         {
             String keyspace = entry.getKey();
-            InetAddress source = entry.getValue().getKey();
-            InetAddress preferred = SystemKeyspace.getPreferredIP(source);
+            InetAddressAndPort source = entry.getValue().getKey();
+            InetAddressAndPort preferred = 
SystemKeyspace.getPreferredIP(source);
             Collection<Range<Token>> ranges = entry.getValue().getValue();
 
             // filter out already streamed ranges

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 7c9d22c..61082df 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.dht.tokenallocator;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -37,6 +36,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -48,7 +48,7 @@ public class TokenAllocation
 
     public static Collection<Token> allocateTokens(final TokenMetadata 
tokenMetadata,
                                                    final 
AbstractReplicationStrategy rs,
-                                                   final InetAddress endpoint,
+                                                   final InetAddressAndPort 
endpoint,
                                                    int numTokens)
     {
         TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
@@ -81,7 +81,7 @@ public class TokenAllocation
         {
             while (tokenMetadata.getEndpoint(t) != null)
             {
-                InetAddress other = tokenMetadata.getEndpoint(t);
+                InetAddressAndPort other = tokenMetadata.getEndpoint(t);
                 if (strategy.inAllocationRing(other))
                     throw new ConfigurationException(String.format("Allocated 
token %s already assigned to node %s. Is another node also allocating tokens?", 
t, other));
                 t = t.increaseSlightly();
@@ -92,9 +92,9 @@ public class TokenAllocation
     }
 
     // return the ratio of ownership for each endpoint
-    public static Map<InetAddress, Double> 
evaluateReplicatedOwnership(TokenMetadata tokenMetadata, 
AbstractReplicationStrategy rs)
+    public static Map<InetAddressAndPort, Double> 
evaluateReplicatedOwnership(TokenMetadata tokenMetadata, 
AbstractReplicationStrategy rs)
     {
-        Map<InetAddress, Double> ownership = Maps.newHashMap();
+        Map<InetAddressAndPort, Double> ownership = Maps.newHashMap();
         List<Token> sortedTokens = tokenMetadata.sortedTokens();
         Iterator<Token> it = sortedTokens.iterator();
         Token current = it.next();
@@ -109,11 +109,11 @@ public class TokenAllocation
         return ownership;
     }
 
-    static void addOwnership(final TokenMetadata tokenMetadata, final 
AbstractReplicationStrategy rs, Token current, Token next, Map<InetAddress, 
Double> ownership)
+    static void addOwnership(final TokenMetadata tokenMetadata, final 
AbstractReplicationStrategy rs, Token current, Token next, 
Map<InetAddressAndPort, Double> ownership)
     {
         double size = current.size(next);
         Token representative = current.getPartitioner().midpoint(current, 
next);
-        for (InetAddress n : rs.calculateNaturalEndpoints(representative, 
tokenMetadata))
+        for (InetAddressAndPort n : 
rs.calculateNaturalEndpoints(representative, tokenMetadata))
         {
             Double v = ownership.get(n);
             ownership.put(n, v != null ? v + size : size);
@@ -126,11 +126,11 @@ public class TokenAllocation
     }
 
     public static SummaryStatistics replicatedOwnershipStats(TokenMetadata 
tokenMetadata,
-                                                             
AbstractReplicationStrategy rs, InetAddress endpoint)
+                                                             
AbstractReplicationStrategy rs, InetAddressAndPort endpoint)
     {
         SummaryStatistics stat = new SummaryStatistics();
         StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint);
-        for (Map.Entry<InetAddress, Double> en : 
evaluateReplicatedOwnership(tokenMetadata, rs).entrySet())
+        for (Map.Entry<InetAddressAndPort, Double> en : 
evaluateReplicatedOwnership(tokenMetadata, rs).entrySet())
         {
             // Filter only in the same datacentre.
             if (strategy.inAllocationRing(en.getKey()))
@@ -139,10 +139,10 @@ public class TokenAllocation
         return stat;
     }
 
-    static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, 
StrategyAdapter strategy)
+    static TokenAllocator<InetAddressAndPort> create(TokenMetadata 
tokenMetadata, StrategyAdapter strategy)
     {
-        NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>();
-        for (Map.Entry<Token, InetAddress> en : 
tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
+        NavigableMap<Token, InetAddressAndPort> sortedTokens = new TreeMap<>();
+        for (Map.Entry<Token, InetAddressAndPort> en : 
tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet())
         {
             if (strategy.inAllocationRing(en.getValue()))
                 sortedTokens.put(en.getKey(), en.getValue());
@@ -150,15 +150,15 @@ public class TokenAllocation
         return TokenAllocatorFactory.createTokenAllocator(sortedTokens, 
strategy, tokenMetadata.partitioner);
     }
 
-    interface StrategyAdapter extends ReplicationStrategy<InetAddress>
+    interface StrategyAdapter extends ReplicationStrategy<InetAddressAndPort>
     {
         // return true iff the provided endpoint occurs in the same virtual 
token-ring we are allocating for
         // i.e. the set of the nodes that share ownership with the node we are 
allocating
         // alternatively: return false if the endpoint's ownership is 
independent of the node we are allocating tokens for
-        boolean inAllocationRing(InetAddress other);
+        boolean inAllocationRing(InetAddressAndPort other);
     }
 
-    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final AbstractReplicationStrategy rs, final InetAddress endpoint)
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final AbstractReplicationStrategy rs, final InetAddressAndPort endpoint)
     {
         if (rs instanceof NetworkTopologyStrategy)
             return getStrategy(tokenMetadata, (NetworkTopologyStrategy) rs, 
rs.snitch, endpoint);
@@ -167,7 +167,7 @@ public class TokenAllocation
         throw new ConfigurationException("Token allocation does not support 
replication strategy " + rs.getClass().getSimpleName());
     }
 
-    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final SimpleStrategy rs, final InetAddress endpoint)
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final SimpleStrategy rs, final InetAddressAndPort endpoint)
     {
         final int replicas = rs.getReplicationFactor();
 
@@ -180,20 +180,20 @@ public class TokenAllocation
             }
 
             @Override
-            public Object getGroup(InetAddress unit)
+            public Object getGroup(InetAddressAndPort unit)
             {
                 return unit;
             }
 
             @Override
-            public boolean inAllocationRing(InetAddress other)
+            public boolean inAllocationRing(InetAddressAndPort other)
             {
                 return true;
             }
         };
     }
 
-    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final 
InetAddress endpoint)
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, 
final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final 
InetAddressAndPort endpoint)
     {
         final String dc = snitch.getDatacenter(endpoint);
         final int replicas = rs.getReplicationFactor(dc);
@@ -210,13 +210,13 @@ public class TokenAllocation
                 }
 
                 @Override
-                public Object getGroup(InetAddress unit)
+                public Object getGroup(InetAddressAndPort unit)
                 {
                     return unit;
                 }
 
                 @Override
-                public boolean inAllocationRing(InetAddress other)
+                public boolean inAllocationRing(InetAddressAndPort other)
                 {
                     return dc.equals(snitch.getDatacenter(other));
                 }
@@ -237,13 +237,13 @@ public class TokenAllocation
                 }
 
                 @Override
-                public Object getGroup(InetAddress unit)
+                public Object getGroup(InetAddressAndPort unit)
                 {
                     return snitch.getRack(unit);
                 }
 
                 @Override
-                public boolean inAllocationRing(InetAddress other)
+                public boolean inAllocationRing(InetAddressAndPort other)
                 {
                     return dc.equals(snitch.getDatacenter(other));
                 }
@@ -261,13 +261,13 @@ public class TokenAllocation
                 }
 
                 @Override
-                public Object getGroup(InetAddress unit)
+                public Object getGroup(InetAddressAndPort unit)
                 {
                     return unit;
                 }
 
                 @Override
-                public boolean inAllocationRing(InetAddress other)
+                public boolean inAllocationRing(InetAddressAndPort other)
                 {
                     return dc.equals(snitch.getDatacenter(other));
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index 58acb56..5fdba02 100644
--- 
a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ 
b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.dht.tokenallocator;
 
-import java.net.InetAddress;
 import java.util.NavigableMap;
 
 import org.slf4j.Logger;
@@ -26,13 +25,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class TokenAllocatorFactory
 {
     private static final Logger logger = 
LoggerFactory.getLogger(TokenAllocatorFactory.class);
-    public static TokenAllocator<InetAddress> 
createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens,
-                                                     
ReplicationStrategy<InetAddress> strategy,
-                                                     IPartitioner partitioner)
+    public static TokenAllocator<InetAddressAndPort> 
createTokenAllocator(NavigableMap<Token, InetAddressAndPort> sortedTokens,
+                                                                          
ReplicationStrategy<InetAddressAndPort> strategy,
+                                                                          
IPartitioner partitioner)
     {
         if(strategy.replicas() == 1)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to