http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 4469384..91a1bff 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +53,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.*; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.*; @@ -91,47 +93,52 @@ public final class SystemKeyspace public static final String PAXOS = "paxos"; public static final String BUILT_INDEXES = "IndexInfo"; public static final String LOCAL = "local"; - public static final String PEERS = "peers"; - public static final String PEER_EVENTS = "peer_events"; + public static final String PEERS_V2 = "peers_v2"; + public static final String PEER_EVENTS_V2 = "peer_events_v2"; public static final String RANGE_XFERS = "range_xfers"; public static final String COMPACTION_HISTORY = "compaction_history"; public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; public static final String TRANSFERRED_RANGES = "transferred_ranges"; + public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2"; public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress"; public static final String BUILT_VIEWS = "built_views"; public static final String PREPARED_STATEMENTS = "prepared_statements"; public static final String REPAIRS = "repairs"; + @Deprecated public static final String LEGACY_PEERS = "peers"; + @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events"; + @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges"; + public static final TableMetadata Batches = parse(BATCHES, - "batches awaiting replay", - "CREATE TABLE %s (" - + "id timeuuid," - + "mutations list<blob>," - + "version int," - + "PRIMARY KEY ((id)))") - .partitioner(new LocalPartitioner(TimeUUIDType.instance)) - .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) - .build(); + "batches awaiting replay", + "CREATE TABLE %s (" + + "id timeuuid," + + "mutations list<blob>," + + "version int," + + "PRIMARY KEY ((id)))") + .partitioner(new LocalPartitioner(TimeUUIDType.instance)) + .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) + .build(); private static final TableMetadata Paxos = parse(PAXOS, - "in-progress paxos proposals", - "CREATE TABLE %s (" - + "row_key blob," - + "cf_id UUID," - + "in_progress_ballot timeuuid," - + "most_recent_commit blob," - + "most_recent_commit_at timeuuid," - + "most_recent_commit_version int," - + "proposal blob," - + "proposal_ballot timeuuid," - + "proposal_version int," - + "PRIMARY KEY ((row_key), cf_id))") - .compaction(CompactionParams.lcs(emptyMap())) - .build(); + "in-progress paxos proposals", + "CREATE TABLE %s (" + + "row_key blob," + + "cf_id UUID," + + "in_progress_ballot timeuuid," + + "most_recent_commit blob," + + "most_recent_commit_at timeuuid," + + "most_recent_commit_version int," + + "proposal blob," + + "proposal_ballot timeuuid," + + "proposal_version int," + + "PRIMARY KEY ((row_key), cf_id))") + .compaction(CompactionParams.lcs(emptyMap())) + .build(); private static final TableMetadata BuiltIndexes = parse(BUILT_INDEXES, @@ -145,122 +152,130 @@ public final class SystemKeyspace private static final TableMetadata Local = parse(LOCAL, - "information about the local node", - "CREATE TABLE %s (" - + "key text," - + "bootstrapped text," - + "broadcast_address inet," - + "cluster_name text," - + "cql_version text," - + "data_center text," - + "gossip_generation int," - + "host_id uuid," - + "listen_address inet," - + "native_protocol_version text," - + "partitioner text," - + "rack text," - + "release_version text," - + "rpc_address inet," - + "schema_version uuid," - + "tokens set<varchar>," - + "truncated_at map<uuid, blob>," - + "PRIMARY KEY ((key)))") - .recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance) - .build(); - - private static final TableMetadata Peers = - parse(PEERS, - "information about known peers in the cluster", - "CREATE TABLE %s (" - + "peer inet," - + "data_center text," - + "host_id uuid," - + "preferred_ip inet," - + "rack text," - + "release_version text," - + "rpc_address inet," - + "schema_version uuid," - + "tokens set<varchar>," - + "PRIMARY KEY ((peer)))") - .build(); - - private static final TableMetadata PeerEvents = - parse(PEER_EVENTS, - "events related to peers", - "CREATE TABLE %s (" - + "peer inet," - + "hints_dropped map<uuid, int>," - + "PRIMARY KEY ((peer)))") - .build(); + "information about the local node", + "CREATE TABLE %s (" + + "key text," + + "bootstrapped text," + + "broadcast_address inet," + + "broadcast_port int," + + "cluster_name text," + + "cql_version text," + + "data_center text," + + "gossip_generation int," + + "host_id uuid," + + "listen_address inet," + + "listen_port int," + + "native_protocol_version text," + + "partitioner text," + + "rack text," + + "release_version text," + + "rpc_address inet," + + "rpc_port int," + + "schema_version uuid," + + "tokens set<varchar>," + + "truncated_at map<uuid, blob>," + + "PRIMARY KEY ((key)))" + ).recordDeprecatedSystemColumn("thrift_version", UTF8Type.instance) + .build(); + + private static final TableMetadata PeersV2 = + parse(PEERS_V2, + "information about known peers in the cluster", + "CREATE TABLE %s (" + + "peer inet," + + "peer_port int," + + "data_center text," + + "host_id uuid," + + "preferred_ip inet," + + "preferred_port int," + + "rack text," + + "release_version text," + + "native_address inet," + + "native_port int," + + "schema_version uuid," + + "tokens set<varchar>," + + "PRIMARY KEY ((peer), peer_port))") + .build(); + + private static final TableMetadata PeerEventsV2 = + parse(PEER_EVENTS_V2, + "events related to peers", + "CREATE TABLE %s (" + + "peer inet," + + "peer_port int," + + "hints_dropped map<uuid, int>," + + "PRIMARY KEY ((peer), peer_port))") + .build(); private static final TableMetadata RangeXfers = parse(RANGE_XFERS, - "ranges requested for transfer", - "CREATE TABLE %s (" - + "token_bytes blob," - + "requested_at timestamp," - + "PRIMARY KEY ((token_bytes)))") - .build(); + "ranges requested for transfer", + "CREATE TABLE %s (" + + "token_bytes blob," + + "requested_at timestamp," + + "PRIMARY KEY ((token_bytes)))") + .build(); private static final TableMetadata CompactionHistory = parse(COMPACTION_HISTORY, - "week-long compaction history", - "CREATE TABLE %s (" - + "id uuid," - + "bytes_in bigint," - + "bytes_out bigint," - + "columnfamily_name text," - + "compacted_at timestamp," - + "keyspace_name text," - + "rows_merged map<int, bigint>," - + "PRIMARY KEY ((id)))") - .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) - .build(); + "week-long compaction history", + "CREATE TABLE %s (" + + "id uuid," + + "bytes_in bigint," + + "bytes_out bigint," + + "columnfamily_name text," + + "compacted_at timestamp," + + "keyspace_name text," + + "rows_merged map<int, bigint>," + + "PRIMARY KEY ((id)))") + .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)) + .build(); private static final TableMetadata SSTableActivity = parse(SSTABLE_ACTIVITY, - "historic sstable read rates", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "generation int," - + "rate_120m double," - + "rate_15m double," - + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))") - .build(); + "historic sstable read rates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "generation int," + + "rate_120m double," + + "rate_15m double," + + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))") + .build(); private static final TableMetadata SizeEstimates = parse(SIZE_ESTIMATES, - "per-table primary range size estimates", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "range_start text," - + "range_end text," - + "mean_partition_size bigint," - + "partitions_count bigint," - + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") - .build(); + "per-table primary range size estimates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "range_start text," + + "range_end text," + + "mean_partition_size bigint," + + "partitions_count bigint," + + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") + .build(); private static final TableMetadata AvailableRanges = parse(AVAILABLE_RANGES, - "available keyspace/ranges during bootstrap/replace that are ready to be served", - "CREATE TABLE %s (" - + "keyspace_name text," - + "ranges set<blob>," - + "PRIMARY KEY ((keyspace_name)))") - .build(); - - private static final TableMetadata TransferredRanges = - parse(TRANSFERRED_RANGES, - "record of transferred ranges for streaming operation", - "CREATE TABLE %s (" - + "operation text," - + "peer inet," - + "keyspace_name text," - + "ranges set<blob>," - + "PRIMARY KEY ((operation, keyspace_name), peer))") - .build(); + "available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((keyspace_name)))") + .build(); + + private static final TableMetadata TransferredRangesV2 = + parse(TRANSFERRED_RANGES_V2, + "record of transferred ranges for streaming operation", + "CREATE TABLE %s (" + + "operation text," + + "peer inet," + + "peer_port int," + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((operation, keyspace_name), peer, peer_port))") + .build(); private static final TableMetadata ViewBuildsInProgress = parse(VIEW_BUILDS_IN_PROGRESS, @@ -277,38 +292,79 @@ public final class SystemKeyspace private static final TableMetadata BuiltViews = parse(BUILT_VIEWS, - "built views", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "status_replicated boolean," - + "PRIMARY KEY ((keyspace_name), view_name))") - .build(); + "built views", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "status_replicated boolean," + + "PRIMARY KEY ((keyspace_name), view_name))") + .build(); private static final TableMetadata PreparedStatements = parse(PREPARED_STATEMENTS, - "prepared statements", - "CREATE TABLE %s (" - + "prepared_id blob," - + "logged_keyspace text," - + "query_string text," - + "PRIMARY KEY ((prepared_id)))") - .build(); + "prepared statements", + "CREATE TABLE %s (" + + "prepared_id blob," + + "logged_keyspace text," + + "query_string text," + + "PRIMARY KEY ((prepared_id)))") + .build(); private static final TableMetadata Repairs = parse(REPAIRS, - "repairs", - "CREATE TABLE %s (" - + "parent_id timeuuid, " - + "started_at timestamp, " - + "last_update timestamp, " - + "repaired_at timestamp, " - + "state int, " - + "coordinator inet, " - + "participants set<inet>, " - + "ranges set<blob>, " - + "cfids set<uuid>, " - + "PRIMARY KEY (parent_id))").build(); + "repairs", + "CREATE TABLE %s (" + + "parent_id timeuuid, " + + "started_at timestamp, " + + "last_update timestamp, " + + "repaired_at timestamp, " + + "state int, " + + "coordinator inet, " + + "coordinator_port int," + + "participants set<inet>," + + "participants_wp set<text>," + + "ranges set<blob>, " + + "cfids set<uuid>, " + + "PRIMARY KEY (parent_id))").build(); + + @Deprecated + private static final TableMetadata LegacyPeers = + parse(LEGACY_PEERS, + "information about known peers in the cluster", + "CREATE TABLE %s (" + + "peer inet," + + "data_center text," + + "host_id uuid," + + "preferred_ip inet," + + "rack text," + + "release_version text," + + "rpc_address inet," + + "schema_version uuid," + + "tokens set<varchar>," + + "PRIMARY KEY ((peer)))") + .build(); + + @Deprecated + private static final TableMetadata LegacyPeerEvents = + parse(LEGACY_PEER_EVENTS, + "events related to peers", + "CREATE TABLE %s (" + + "peer inet," + + "hints_dropped map<uuid, int>," + + "PRIMARY KEY ((peer)))") + .build(); + + @Deprecated + private static final TableMetadata LegacyTransferredRanges = + parse(LEGACY_TRANSFERRED_RANGES, + "record of transferred ranges for streaming operation", + "CREATE TABLE %s (" + + "operation text," + + "peer inet," + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((operation, keyspace_name), peer))") + .build(); private static TableMetadata.Builder parse(String table, String description, String cql) { @@ -331,14 +387,17 @@ public final class SystemKeyspace Batches, Paxos, Local, - Peers, - PeerEvents, + PeersV2, + LegacyPeers, + PeerEventsV2, + LegacyPeerEvents, RangeXfers, CompactionHistory, SSTableActivity, SizeEstimates, AvailableRanges, - TransferredRanges, + TransferredRangesV2, + LegacyTransferredRanges, ViewBuildsInProgress, BuiltViews, PreparedStatements, @@ -384,9 +443,12 @@ public final class SystemKeyspace "rack," + "partitioner," + "rpc_address," + + "rpc_port," + "broadcast_address," + - "listen_address" + - ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "broadcast_port," + + "listen_address," + + "listen_port" + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); executeOnceInternal(format(req, LOCAL), LOCAL, @@ -394,12 +456,15 @@ public final class SystemKeyspace FBUtilities.getReleaseVersionString(), QueryProcessor.CQL_VERSION.toString(), String.valueOf(ProtocolVersion.CURRENT.asInt()), - snitch.getDatacenter(FBUtilities.getBroadcastAddress()), - snitch.getRack(FBUtilities.getBroadcastAddress()), + snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()), + snitch.getRack(FBUtilities.getBroadcastAddressAndPort()), DatabaseDescriptor.getPartitioner().getClass().getName(), DatabaseDescriptor.getRpcAddress(), - FBUtilities.getBroadcastAddress(), - FBUtilities.getLocalAddress()); + DatabaseDescriptor.getNativeTransportPort(), + FBUtilities.getJustBroadcastAddress(), + DatabaseDescriptor.getStoragePort(), + FBUtilities.getJustLocalAddress(), + DatabaseDescriptor.getStoragePort()); } public static void updateCompactionHistory(String ksname, @@ -461,11 +526,10 @@ public final class SystemKeyspace { String buildReq = "DELETE FROM %s.%s WHERE keyspace_name = ? AND view_name = ?"; executeInternal(String.format(buildReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, VIEW_BUILDS_IN_PROGRESS), keyspaceName, viewName); - forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS); String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ? IF EXISTS"; executeInternal(String.format(builtReq, SchemaConstants.SYSTEM_KEYSPACE_NAME, BUILT_VIEWS), keyspaceName, viewName); - forceBlockingFlush(BUILT_VIEWS); + forceBlockingFlush(VIEW_BUILDS_IN_PROGRESS, BUILT_VIEWS); } public static void finishViewBuildStatus(String ksname, String viewName) @@ -609,39 +673,64 @@ public final class SystemKeyspace /** * Record tokens being used by another node */ - public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens) + public static synchronized void updateTokens(InetAddressAndPort ep, Collection<Token> tokens) { - if (ep.equals(FBUtilities.getBroadcastAddress())) + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) return; String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; - executeInternal(format(req, PEERS), ep, tokensAsSet(tokens)); + executeInternal(String.format(req, LEGACY_PEERS), ep.address, tokensAsSet(tokens)); + req = "INSERT INTO system.%s (peer, peer_port, tokens) VALUES (?, ?, ?)"; + executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, tokensAsSet(tokens)); } - public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) + public static synchronized void updatePreferredIP(InetAddressAndPort ep, InetAddressAndPort preferred_ip) { if (getPreferredIP(ep) == preferred_ip) return; String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; - executeInternal(format(req, PEERS), ep, preferred_ip); - forceBlockingFlush(PEERS); + executeInternal(String.format(req, LEGACY_PEERS), ep.address, preferred_ip.address); + req = "INSERT INTO system.%s (peer, peer_port, preferred_ip, preferred_port) VALUES (?, ?, ?, ?)"; + executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, preferred_ip.address, preferred_ip.port); + forceBlockingFlush(LEGACY_PEERS, PEERS_V2); } - public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value) + public static synchronized void updatePeerInfo(InetAddressAndPort ep, String columnName, Object value) { - if (ep.equals(FBUtilities.getBroadcastAddress())) + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) return; String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(format(req, PEERS, columnName), ep, value); + executeInternal(String.format(req, LEGACY_PEERS, columnName), ep.address, value); + //This column doesn't match across the two tables + if (columnName.equals("rpc_address")) + { + columnName = "native_address"; + } + req = "INSERT INTO system.%s (peer, peer_port, %s) VALUES (?, ?, ?)"; + executeInternal(String.format(req, PEERS_V2, columnName), ep.address, ep.port, value); } - public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) + public static synchronized void updatePeerNativeAddress(InetAddressAndPort ep, InetAddressAndPort address) + { + if (ep.equals(FBUtilities.getBroadcastAddressAndPort())) + return; + + String req = "INSERT INTO system.%s (peer, rpc_address) VALUES (?, ?)"; + executeInternal(String.format(req, LEGACY_PEERS), ep.address, address.address); + req = "INSERT INTO system.%s (peer, peer_port, native_address, native_port) VALUES (?, ?, ?, ?)"; + executeInternal(String.format(req, PEERS_V2), ep.address, ep.port, address.address, address.port); + } + + + public static synchronized void updateHintsDropped(InetAddressAndPort ep, UUID timePeriod, int value) { // with 30 day TTL String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?"; - executeInternal(format(req, PEER_EVENTS), timePeriod, value, ep); + executeInternal(String.format(req, LEGACY_PEER_EVENTS), timePeriod, value, ep.address); + req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ? AND peer_port = ?"; + executeInternal(String.format(req, PEER_EVENTS_V2), timePeriod, value, ep.address, ep.port); } public static synchronized void updateSchemaVersion(UUID version) @@ -673,11 +762,13 @@ public final class SystemKeyspace /** * Remove stored tokens being used by another node */ - public static synchronized void removeEndpoint(InetAddress ep) + public static synchronized void removeEndpoint(InetAddressAndPort ep) { String req = "DELETE FROM system.%s WHERE peer = ?"; - executeInternal(format(req, PEERS), ep); - forceBlockingFlush(PEERS); + executeInternal(String.format(req, LEGACY_PEERS), ep.address); + req = String.format("DELETE FROM system.%s WHERE peer = ? AND peer_port = ?", PEERS_V2); + executeInternal(req, ep.address, ep.port); + forceBlockingFlush(LEGACY_PEERS, PEERS_V2); } /** @@ -696,22 +787,32 @@ public final class SystemKeyspace forceBlockingFlush(LOCAL); } - public static void forceBlockingFlush(String cfname) + public static void forceBlockingFlush(String ...cfnames) { if (!DatabaseDescriptor.isUnsafeSystem()) - FBUtilities.waitOnFuture(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(cfname).forceFlush()); + { + List<ListenableFuture<CommitLogPosition>> futures = new ArrayList<>(); + + for (String cfname : cfnames) + { + futures.add(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(cfname).forceFlush()); + } + FBUtilities.waitOnFutures(futures); + } } /** * Return a map of stored tokens to IP addresses * */ - public static SetMultimap<InetAddress, Token> loadTokens() + public static SetMultimap<InetAddressAndPort, Token> loadTokens() { - SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS)) + SetMultimap<InetAddressAndPort, Token> tokenMap = HashMultimap.create(); + for (UntypedResultSet.Row row : executeInternal("SELECT peer, peer_port, tokens FROM system." + PEERS_V2)) { - InetAddress peer = row.getInetAddress("peer"); + InetAddress address = row.getInetAddress("peer"); + Integer port = row.getInt("peer_port"); + InetAddressAndPort peer = InetAddressAndPort.getByAddressOverrideDefaults(address, port); if (row.has("tokens")) tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", UTF8Type.instance))); } @@ -723,12 +824,14 @@ public final class SystemKeyspace * Return a map of store host_ids to IP addresses * */ - public static Map<InetAddress, UUID> loadHostIds() + public static Map<InetAddressAndPort, UUID> loadHostIds() { - Map<InetAddress, UUID> hostIdMap = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS)) + Map<InetAddressAndPort, UUID> hostIdMap = new HashMap<>(); + for (UntypedResultSet.Row row : executeInternal("SELECT peer, peer_port, host_id FROM system." + PEERS_V2)) { - InetAddress peer = row.getInetAddress("peer"); + InetAddress address = row.getInetAddress("peer"); + Integer port = row.getInt("peer_port"); + InetAddressAndPort peer = InetAddressAndPort.getByAddressOverrideDefaults(address, port); if (row.has("host_id")) { hostIdMap.put(peer, row.getUUID("host_id")); @@ -743,24 +846,29 @@ public final class SystemKeyspace * @param ep endpoint address to check * @return Preferred IP for given endpoint if present, otherwise returns given ep */ - public static InetAddress getPreferredIP(InetAddress ep) + public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep) { - String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(format(req, PEERS), ep); + String req = "SELECT preferred_ip, preferred_port FROM system.%s WHERE peer=? AND peer_port = ?"; + UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port); if (!result.isEmpty() && result.one().has("preferred_ip")) - return result.one().getInetAddress("preferred_ip"); + { + UntypedResultSet.Row row = result.one(); + return InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("preferred_ip"), row.getInt("preferred_port")); + } return ep; } /** * Return a map of IP addresses containing a map of dc and rack info */ - public static Map<InetAddress, Map<String,String>> loadDcRackInfo() + public static Map<InetAddressAndPort, Map<String,String>> loadDcRackInfo() { - Map<InetAddress, Map<String, String>> result = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) + Map<InetAddressAndPort, Map<String, String>> result = new HashMap<>(); + for (UntypedResultSet.Row row : executeInternal("SELECT peer, peer_port, data_center, rack from system." + PEERS_V2)) { - InetAddress peer = row.getInetAddress("peer"); + InetAddress address = row.getInetAddress("peer"); + Integer port = row.getInt("peer_port"); + InetAddressAndPort peer = InetAddressAndPort.getByAddressOverrideDefaults(address, port); if (row.has("data_center") && row.has("rack")) { Map<String, String> dcRack = new HashMap<>(); @@ -779,16 +887,16 @@ public final class SystemKeyspace * @param ep endpoint address to check * @return Release version or null if version is unknown. */ - public static CassandraVersion getReleaseVersion(InetAddress ep) + public static CassandraVersion getReleaseVersion(InetAddressAndPort ep) { try { - if (FBUtilities.getBroadcastAddress().equals(ep)) + if (FBUtilities.getBroadcastAddressAndPort().equals(ep)) { return new CassandraVersion(FBUtilities.getReleaseVersionString()); } - String req = "SELECT release_version FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(format(req, PEERS), ep); + String req = "SELECT release_version FROM system.%s WHERE peer=? AND peer_port=?"; + UntypedResultSet result = executeInternal(String.format(req, PEERS_V2), ep.address, ep.port); if (result != null && result.one().has("release_version")) { return new CassandraVersion(result.one().getString("release_version")); @@ -1197,7 +1305,7 @@ public final class SystemKeyspace } public static synchronized void updateTransferredRanges(StreamOperation streamOperation, - InetAddress peer, + InetAddressAndPort peer, String keyspace, Collection<Range<Token>> streamedRanges) { @@ -1207,17 +1315,21 @@ public final class SystemKeyspace { rangesToUpdate.add(rangeToBytes(range)); } - executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer, keyspace); + executeInternal(format(cql, LEGACY_TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer.address, keyspace); + cql = "UPDATE system.%s SET ranges = ranges + ? WHERE operation = ? AND peer = ? AND peer_port = ? AND keyspace_name = ?"; + executeInternal(String.format(cql, TRANSFERRED_RANGES_V2), rangesToUpdate, streamOperation.getDescription(), peer.address, peer.port, keyspace); } - public static synchronized Map<InetAddress, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner) + public static synchronized Map<InetAddressAndPort, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner) { - Map<InetAddress, Set<Range<Token>>> result = new HashMap<>(); + Map<InetAddressAndPort, Set<Range<Token>>> result = new HashMap<>(); String query = "SELECT * FROM system.%s WHERE operation = ? AND keyspace_name = ?"; - UntypedResultSet rs = executeInternal(format(query, TRANSFERRED_RANGES), description, keyspace); + UntypedResultSet rs = executeInternal(String.format(query, TRANSFERRED_RANGES_V2), description, keyspace); for (UntypedResultSet.Row row : rs) { - InetAddress peer = row.getInetAddress("peer"); + InetAddress peerAddress = row.getInetAddress("peer"); + int port = row.getInt("peer_port"); + InetAddressAndPort peer = InetAddressAndPort.getByAddressOverrideDefaults(peerAddress, port); Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance); Set<Range<Token>> ranges = Sets.newHashSetWithExpectedSize(rawRanges.size()); for (ByteBuffer rawRange : rawRanges)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java new file mode 100644 index 0000000..ea5ff59 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; + +/** + * Migrate 3.0 versions of some tables to 4.0. In this case it's just extra columns and some keys + * that are changed. + * + * Can't just add the additional columns because they are primary key columns and C* doesn't support changing + * key columns even if it's just clustering columns. + */ +public class SystemKeyspaceMigrator40 +{ + static final String legacyPeersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEERS); + static final String peersName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEERS_V2); + static final String legacyPeerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_PEER_EVENTS); + static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2); + static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES); + static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2); + + private static final Logger logger = LoggerFactory.getLogger(SystemKeyspaceMigrator40.class); + + private SystemKeyspaceMigrator40() {} + + public static void migrate() + { + migratePeers(); + migratePeerEvents(); + migrateTransferredRanges(); + } + + private static void migratePeers() + { + ColumnFamilyStore newPeers = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEERS_V2); + + if (!newPeers.isEmpty()) + return; + + logger.info("{} table was empty, migrating legacy {}, if this fails you should fix the issue and then truncate {} to have it try again.", + peersName, legacyPeersName, peersName); + + String query = String.format("SELECT * FROM %s", + legacyPeersName); + + String insert = String.format("INSERT INTO %s ( " + + "peer, " + + "peer_port, " + + "data_center, " + + "host_id, " + + "preferred_ip, " + + "preferred_port, " + + "rack, " + + "release_version, " + + "native_address, " + + "native_port, " + + "schema_version, " + + "tokens) " + + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?, ?, ?, ?)", + peersName); + + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000); + int transferred = 0; + logger.info("Migrating rows from legacy {} to {}", legacyPeersName, peersName); + for (UntypedResultSet.Row row : rows) + { + logger.debug("Transferring row {}", transferred); + QueryProcessor.executeInternal(insert, + row.has("peer") ? row.getInetAddress("peer") : null, + DatabaseDescriptor.getStoragePort(), + row.has("data_center") ? row.getString("data_center") : null, + row.has("host_id") ? row.getUUID("host_id") : null, + row.has("preferred_ip") ? row.getInetAddress("preferred_ip") : null, + DatabaseDescriptor.getStoragePort(), + row.has("rack") ? row.getString("rack") : null, + row.has("release_version") ? row.getString("release_version") : null, + row.has("rpc_address") ? row.getInetAddress("rpc_address") : null, + DatabaseDescriptor.getNativeTransportPort(), + row.has("schema_version") ? row.getUUID("schema_version") : null, + row.has("tokens") ? row.getSet("tokens", UTF8Type.instance) : null); + transferred++; + } + logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeersName, peersName); + } + + private static void migratePeerEvents() + { + ColumnFamilyStore newPeerEvents = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.PEER_EVENTS_V2); + + if (!newPeerEvents.isEmpty()) + return; + + logger.info("{} table was empty, migrating legacy {} to {}", peerEventsName, legacyPeerEventsName, peerEventsName); + + String query = String.format("SELECT * FROM %s", + legacyPeerEventsName); + + String insert = String.format("INSERT INTO %s ( " + + "peer, " + + "peer_port, " + + "hints_dropped) " + + " values ( ?, ?, ? )", + peerEventsName); + + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000); + int transferred = 0; + for (UntypedResultSet.Row row : rows) + { + logger.debug("Transferring row {}", transferred); + QueryProcessor.executeInternal(insert, + row.has("peer") ? row.getInetAddress("peer") : null, + DatabaseDescriptor.getStoragePort(), + row.has("hints_dropped") ? row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance) : null); + transferred++; + } + logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyPeerEventsName, peerEventsName); + } + + static void migrateTransferredRanges() + { + ColumnFamilyStore newTransferredRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.TRANSFERRED_RANGES_V2); + + if (!newTransferredRanges.isEmpty()) + return; + + logger.info("{} table was empty, migrating legacy {} to {}", transferredRangesName, legacyTransferredRangesName, transferredRangesName); + + String query = String.format("SELECT * FROM %s", + legacyTransferredRangesName); + + String insert = String.format("INSERT INTO %s (" + + "operation, " + + "peer, " + + "peer_port, " + + "keyspace_name, " + + "ranges) " + + " values ( ?, ?, ? , ?, ?)", + transferredRangesName); + + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000); + int transferred = 0; + for (UntypedResultSet.Row row : rows) + { + logger.debug("Transferring row {}", transferred); + QueryProcessor.executeInternal(insert, + row.has("operation") ? row.getString("operation") : null, + row.has("peer") ? row.getInetAddress("peer") : null, + DatabaseDescriptor.getStoragePort(), + row.has("keyspace_name") ? row.getString("keyspace_name") : null, + row.has("ranges") ? row.getSet("ranges", BytesType.instance) : null); + transferred++; + } + + logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyTransferredRangesName, transferredRangesName); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/db/view/ViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index 4dc1766..df16943 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.view; -import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -27,6 +26,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.utils.FBUtilities; @@ -58,14 +58,14 @@ public final class ViewUtils * * @return Optional.empty() if this method is called using a base token which does not belong to this replica */ - public static Optional<InetAddress> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) + public static Optional<InetAddressAndPort> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - List<InetAddress> baseEndpoints = new ArrayList<>(); - List<InetAddress> viewEndpoints = new ArrayList<>(); - for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); + List<InetAddressAndPort> baseEndpoints = new ArrayList<>(); + List<InetAddressAndPort> viewEndpoints = new ArrayList<>(); + for (InetAddressAndPort baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) { // An endpoint is local if we're not using Net if (!(replicationStrategy instanceof NetworkTopologyStrategy) || @@ -73,10 +73,10 @@ public final class ViewUtils baseEndpoints.add(baseEndpoint); } - for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) + for (InetAddressAndPort viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) { // If we are a base endpoint which is also a view replica, we use ourselves as our view replica - if (viewEndpoint.equals(FBUtilities.getBroadcastAddress())) + if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort())) return Optional.of(viewEndpoint); // We have to remove any endpoint which is shared between the base and the view, as it will select itself @@ -92,7 +92,7 @@ public final class ViewUtils // Since the same replication strategy is used, the same placement should be used and we should get the same // number of replicas for all of the tokens in the ring. assert baseEndpoints.size() == viewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view"; - int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress()); + int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort()); if (baseIdx < 0) //This node is not a base replica of this key, so we return empty http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index a25f867..432586b 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -18,7 +18,6 @@ package org.apache.cassandra.dht; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +37,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.*; @@ -51,12 +51,12 @@ public class BootStrapper extends ProgressEventNotifierSupport private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class); /* endpoint that needs to be bootstrapped */ - protected final InetAddress address; + protected final InetAddressAndPort address; /* token of the node being bootstrapped. */ protected final Collection<Token> tokens; protected final TokenMetadata tokenMetadata; - public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd) + public BootStrapper(InetAddressAndPort address, Collection<Token> tokens, TokenMetadata tmd) { assert address != null; assert tokens != null && !tokens.isEmpty(); @@ -159,7 +159,7 @@ public class BootStrapper extends ProgressEventNotifierSupport * otherwise, if allocationKeyspace is specified use the token allocation algorithm to generate suitable tokens * else choose num_tokens tokens at random */ - public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, InetAddress address, int schemaWaitDelay) throws ConfigurationException + public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, InetAddressAndPort address, int schemaWaitDelay) throws ConfigurationException { String allocationKeyspace = DatabaseDescriptor.getAllocateTokensForKeyspace(); Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens(); @@ -199,13 +199,13 @@ public class BootStrapper extends ProgressEventNotifierSupport } static Collection<Token> allocateTokens(final TokenMetadata metadata, - InetAddress address, + InetAddressAndPort address, String allocationKeyspace, int numTokens, int schemaWaitDelay) { StorageService.instance.waitForSchema(schemaWaitDelay); - if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) + if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) Gossiper.waitToSettle(); Keyspace ks = Keyspace.open(allocationKeyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java index d407212..b90bc96 100644 --- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java +++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java @@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public class RangeFetchMapCalculator { private static final Logger logger = LoggerFactory.getLogger(RangeFetchMapCalculator.class); private static final long TRIVIAL_RANGE_LIMIT = 1000; - private final Multimap<Range<Token>, InetAddress> rangesWithSources; + private final Multimap<Range<Token>, InetAddressAndPort> rangesWithSources; private final Collection<RangeStreamer.ISourceFilter> sourceFilters; private final String keyspace; //We need two Vertices to act as source and destination in the algorithm @@ -80,7 +81,7 @@ public class RangeFetchMapCalculator private final Vertex destinationVertex = OuterVertex.getDestinationVertex(); private final Set<Range<Token>> trivialRanges; - public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddress> rangesWithSources, + public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Collection<RangeStreamer.ISourceFilter> sourceFilters, String keyspace) { @@ -108,16 +109,16 @@ public class RangeFetchMapCalculator return false; } - public Multimap<InetAddress, Range<Token>> getRangeFetchMap() + public Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMap() { - Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create(); + Multimap<InetAddressAndPort, Range<Token>> fetchMap = HashMultimap.create(); fetchMap.putAll(getRangeFetchMapForNonTrivialRanges()); fetchMap.putAll(getRangeFetchMapForTrivialRanges(fetchMap)); return fetchMap; } @VisibleForTesting - Multimap<InetAddress, Range<Token>> getRangeFetchMapForNonTrivialRanges() + Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMapForNonTrivialRanges() { //Get the graph with edges between ranges and their source endpoints MutableCapacityGraph<Vertex, Integer> graph = getGraph(); @@ -148,19 +149,19 @@ public class RangeFetchMapCalculator } @VisibleForTesting - Multimap<InetAddress, Range<Token>> getRangeFetchMapForTrivialRanges(Multimap<InetAddress, Range<Token>> optimisedMap) + Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMapForTrivialRanges(Multimap<InetAddressAndPort, Range<Token>> optimisedMap) { - Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create(); + Multimap<InetAddressAndPort, Range<Token>> fetchMap = HashMultimap.create(); for (Range<Token> trivialRange : trivialRanges) { boolean added = false; boolean localDCCheck = true; while (!added) { - List<InetAddress> srcs = new ArrayList<>(rangesWithSources.get(trivialRange)); + List<InetAddressAndPort> srcs = new ArrayList<>(rangesWithSources.get(trivialRange)); // sort with the endpoint having the least number of streams first: srcs.sort(Comparator.comparingInt(o -> optimisedMap.get(o).size())); - for (InetAddress src : srcs) + for (InetAddressAndPort src : srcs) { if (passFilters(src, localDCCheck)) { @@ -202,9 +203,9 @@ public class RangeFetchMapCalculator * @param result Flow algorithm result * @return Multi Map of Machine to Ranges */ - private Multimap<InetAddress, Range<Token>> getRangeFetchMapFromGraphResult(MutableCapacityGraph<Vertex, Integer> graph, MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>> result) + private Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMapFromGraphResult(MutableCapacityGraph<Vertex, Integer> graph, MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>> result) { - final Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create(); + final Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = HashMultimap.create(); if(result == null) return rangeFetchMapMap; final Function<CapacityEdge<Vertex, Integer>, Integer> flowFunction = result.calcFlowFunction(); @@ -346,13 +347,13 @@ public class RangeFetchMapCalculator private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> capacityGraph, RangeVertex rangeVertex, boolean localDCCheck) { boolean sourceFound = false; - for (InetAddress endpoint : rangesWithSources.get(rangeVertex.getRange())) + for (InetAddressAndPort endpoint : rangesWithSources.get(rangeVertex.getRange())) { if (passFilters(endpoint, localDCCheck)) { sourceFound = true; // if we pass filters, it means that we don't filter away localhost and we can count it as a source: - if (endpoint.equals(FBUtilities.getBroadcastAddress())) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) continue; // but don't add localhost to the graph to avoid streaming locally final Vertex endpointVertex = new EndpointVertex(endpoint); capacityGraph.insertVertex(rangeVertex); @@ -363,7 +364,7 @@ public class RangeFetchMapCalculator return sourceFound; } - private boolean isInLocalDC(InetAddress endpoint) + private boolean isInLocalDC(InetAddressAndPort endpoint) { return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); } @@ -374,7 +375,7 @@ public class RangeFetchMapCalculator * @param localDCCheck Allow endpoints with local DC * @return True if filters pass this endpoint */ - private boolean passFilters(final InetAddress endpoint, boolean localDCCheck) + private boolean passFilters(final InetAddressAndPort endpoint, boolean localDCCheck) { for (RangeStreamer.ISourceFilter filter : sourceFilters) { @@ -410,15 +411,15 @@ public class RangeFetchMapCalculator */ private static class EndpointVertex extends Vertex { - private final InetAddress endpoint; + private final InetAddressAndPort endpoint; - public EndpointVertex(InetAddress endpoint) + public EndpointVertex(InetAddressAndPort endpoint) { assert endpoint != null; this.endpoint = endpoint; } - public InetAddress getEndpoint() + public InetAddressAndPort getEndpoint() { return endpoint; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index eabb212..439ebc6 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.dht; -import java.net.InetAddress; import java.util.*; import com.google.common.annotations.VisibleForTesting; @@ -26,8 +25,9 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.LocalStrategy; -import org.apache.cassandra.service.ActiveRepairService; + import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +59,10 @@ public class RangeStreamer /* current token ring */ private final TokenMetadata metadata; /* address of this node */ - private final InetAddress address; + private final InetAddressAndPort address; /* streaming description */ private final String description; - private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create(); + private final Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = HashMultimap.create(); private final Set<ISourceFilter> sourceFilters = new HashSet<>(); private final StreamPlan streamPlan; private final boolean useStrictConsistency; @@ -74,7 +74,7 @@ public class RangeStreamer */ public static interface ISourceFilter { - public boolean shouldInclude(InetAddress endpoint); + public boolean shouldInclude(InetAddressAndPort endpoint); } /** @@ -90,7 +90,7 @@ public class RangeStreamer this.fd = fd; } - public boolean shouldInclude(InetAddress endpoint) + public boolean shouldInclude(InetAddressAndPort endpoint) { return fd.isAlive(endpoint); } @@ -110,7 +110,7 @@ public class RangeStreamer this.snitch = snitch; } - public boolean shouldInclude(InetAddress endpoint) + public boolean shouldInclude(InetAddressAndPort endpoint) { return snitch.getDatacenter(endpoint).equals(sourceDc); } @@ -121,9 +121,9 @@ public class RangeStreamer */ public static class ExcludeLocalNodeFilter implements ISourceFilter { - public boolean shouldInclude(InetAddress endpoint) + public boolean shouldInclude(InetAddressAndPort endpoint) { - return !FBUtilities.getBroadcastAddress().equals(endpoint); + return !FBUtilities.getBroadcastAddressAndPort().equals(endpoint); } } @@ -132,14 +132,14 @@ public class RangeStreamer */ public static class WhitelistedSourcesFilter implements ISourceFilter { - private final Set<InetAddress> whitelistedSources; + private final Set<InetAddressAndPort> whitelistedSources; - public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources) + public WhitelistedSourcesFilter(Set<InetAddressAndPort> whitelistedSources) { this.whitelistedSources = whitelistedSources; } - public boolean shouldInclude(InetAddress endpoint) + public boolean shouldInclude(InetAddressAndPort endpoint) { return whitelistedSources.contains(endpoint); } @@ -147,7 +147,7 @@ public class RangeStreamer public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, - InetAddress address, + InetAddressAndPort address, StreamOperation streamOperation, boolean useStrictConsistency, IEndpointSnitch snitch, @@ -186,18 +186,18 @@ public class RangeStreamer } boolean useStrictSource = useStrictSourcesForRanges(keyspaceName); - Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSource + Multimap<Range<Token>, InetAddressAndPort> rangesForKeyspace = useStrictSource ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges); - for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries()) + for (Map.Entry<Range<Token>, InetAddressAndPort> entry : rangesForKeyspace.entries()) logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName); AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); - Multimap<InetAddress, Range<Token>> rangeFetchMap = useStrictSource || strat == null || strat.getReplicationFactor() == 1 + Multimap<InetAddressAndPort, Range<Token>> rangeFetchMap = useStrictSource || strat == null || strat.getReplicationFactor() == 1 ? getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency) : getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName); - for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangeFetchMap.asMap().entrySet()) + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangeFetchMap.asMap().entrySet()) { if (logger.isTraceEnabled()) { @@ -226,19 +226,19 @@ public class RangeStreamer * * @throws java.lang.IllegalStateException when there is no source to get data streamed */ - private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges) + private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges) { AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); - Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap()); + Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = strat.getRangeAddresses(metadata.cloneOnlyTokenMap()); - Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); + Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create(); for (Range<Token> desiredRange : desiredRanges) { for (Range<Token> range : rangeAddresses.keySet()) { if (range.contains(desiredRange)) { - List<InetAddress> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range)); + List<InetAddressAndPort> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range)); rangeSources.putAll(desiredRange, preferred); break; } @@ -258,30 +258,30 @@ public class RangeStreamer * * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found. */ - private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges) + private Multimap<Range<Token>, InetAddressAndPort> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges) { assert tokens != null; AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); // Active ranges TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); - Multimap<Range<Token>, InetAddress> addressRanges = strat.getRangeAddresses(metadataClone); + Multimap<Range<Token>, InetAddressAndPort> addressRanges = strat.getRangeAddresses(metadataClone); // Pending ranges metadataClone.updateNormalTokens(tokens, address); - Multimap<Range<Token>, InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); + Multimap<Range<Token>, InetAddressAndPort> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); // Collects the source that will have its range moved to the new node - Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); + Multimap<Range<Token>, InetAddressAndPort> rangeSources = ArrayListMultimap.create(); for (Range<Token> desiredRange : desiredRanges) { - for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : addressRanges.asMap().entrySet()) + for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> preEntry : addressRanges.asMap().entrySet()) { if (preEntry.getKey().contains(desiredRange)) { - Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue()); - Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); + Set<InetAddressAndPort> oldEndpoints = Sets.newHashSet(preEntry.getValue()); + Set<InetAddressAndPort> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. // So we need to be careful to only be strict when endpoints == RF @@ -296,14 +296,14 @@ public class RangeStreamer } // Validate - Collection<InetAddress> addressList = rangeSources.get(desiredRange); + Collection<InetAddressAndPort> addressList = rangeSources.get(desiredRange); if (addressList == null || addressList.isEmpty()) throw new IllegalStateException("No sources found for " + desiredRange); if (addressList.size() > 1) throw new IllegalStateException("Multiple endpoints found for " + desiredRange); - InetAddress sourceIp = addressList.iterator().next(); + InetAddressAndPort sourceIp = addressList.iterator().next(); EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp); if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive())) throw new RuntimeException("A node required to move the data consistently is down (" + sourceIp + "). " + @@ -320,17 +320,17 @@ public class RangeStreamer * @param keyspace keyspace name * @return Map of source endpoint to collection of ranges */ - private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, - Collection<ISourceFilter> sourceFilters, String keyspace, - boolean useStrictConsistency) + private static Multimap<InetAddressAndPort, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, + Collection<ISourceFilter> sourceFilters, String keyspace, + boolean useStrictConsistency) { - Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create(); + Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = HashMultimap.create(); for (Range<Token> range : rangesWithSources.keySet()) { boolean foundSource = false; outer: - for (InetAddress address : rangesWithSources.get(range)) + for (InetAddressAndPort address : rangesWithSources.get(range)) { for (ISourceFilter filter : sourceFilters) { @@ -338,7 +338,7 @@ public class RangeStreamer continue outer; } - if (address.equals(FBUtilities.getBroadcastAddress())) + if (address.equals(FBUtilities.getBroadcastAddressAndPort())) { // If localhost is a source, we have found one, but we don't add it to the map to avoid streaming locally foundSource = true; @@ -371,11 +371,11 @@ public class RangeStreamer } - private static Multimap<InetAddress, Range<Token>> getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, - Collection<ISourceFilter> sourceFilters, String keyspace) + private static Multimap<InetAddressAndPort, Range<Token>> getOptimizedRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, + Collection<ISourceFilter> sourceFilters, String keyspace) { RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, sourceFilters, keyspace); - Multimap<InetAddress, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap(); + Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap(); logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace); validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace); return rangeFetchMapMap; @@ -387,11 +387,11 @@ public class RangeStreamer * @param rangeFetchMapMap * @param keyspace */ - private static void validateRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, Multimap<InetAddress, Range<Token>> rangeFetchMapMap, String keyspace) + private static void validateRangeFetchMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace) { - for (Map.Entry<InetAddress, Range<Token>> entry : rangeFetchMapMap.entries()) + for (Map.Entry<InetAddressAndPort, Range<Token>> entry : rangeFetchMapMap.entries()) { - if(entry.getKey().equals(FBUtilities.getBroadcastAddress())) + if(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) { throw new IllegalStateException("Trying to stream locally. Range: " + entry.getValue() + " in keyspace " + keyspace); @@ -407,26 +407,26 @@ public class RangeStreamer } } - public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace, - IFailureDetector fd, boolean useStrictConsistency) + public static Multimap<InetAddressAndPort, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddressAndPort> rangesWithSourceTarget, String keyspace, + IFailureDetector fd, boolean useStrictConsistency) { return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace, useStrictConsistency); } // For testing purposes @VisibleForTesting - Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch() + Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch() { return toFetch; } public StreamResultFuture fetchAsync() { - for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries()) + for (Map.Entry<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> entry : toFetch.entries()) { String keyspace = entry.getKey(); - InetAddress source = entry.getValue().getKey(); - InetAddress preferred = SystemKeyspace.getPreferredIP(source); + InetAddressAndPort source = entry.getValue().getKey(); + InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue().getValue(); // filter out already streamed ranges http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index 7c9d22c..61082df 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.dht.tokenallocator; -import java.net.InetAddress; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -37,6 +36,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; @@ -48,7 +48,7 @@ public class TokenAllocation public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, - final InetAddress endpoint, + final InetAddressAndPort endpoint, int numTokens) { TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap(); @@ -81,7 +81,7 @@ public class TokenAllocation { while (tokenMetadata.getEndpoint(t) != null) { - InetAddress other = tokenMetadata.getEndpoint(t); + InetAddressAndPort other = tokenMetadata.getEndpoint(t); if (strategy.inAllocationRing(other)) throw new ConfigurationException(String.format("Allocated token %s already assigned to node %s. Is another node also allocating tokens?", t, other)); t = t.increaseSlightly(); @@ -92,9 +92,9 @@ public class TokenAllocation } // return the ratio of ownership for each endpoint - public static Map<InetAddress, Double> evaluateReplicatedOwnership(TokenMetadata tokenMetadata, AbstractReplicationStrategy rs) + public static Map<InetAddressAndPort, Double> evaluateReplicatedOwnership(TokenMetadata tokenMetadata, AbstractReplicationStrategy rs) { - Map<InetAddress, Double> ownership = Maps.newHashMap(); + Map<InetAddressAndPort, Double> ownership = Maps.newHashMap(); List<Token> sortedTokens = tokenMetadata.sortedTokens(); Iterator<Token> it = sortedTokens.iterator(); Token current = it.next(); @@ -109,11 +109,11 @@ public class TokenAllocation return ownership; } - static void addOwnership(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, Token current, Token next, Map<InetAddress, Double> ownership) + static void addOwnership(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, Token current, Token next, Map<InetAddressAndPort, Double> ownership) { double size = current.size(next); Token representative = current.getPartitioner().midpoint(current, next); - for (InetAddress n : rs.calculateNaturalEndpoints(representative, tokenMetadata)) + for (InetAddressAndPort n : rs.calculateNaturalEndpoints(representative, tokenMetadata)) { Double v = ownership.get(n); ownership.put(n, v != null ? v + size : size); @@ -126,11 +126,11 @@ public class TokenAllocation } public static SummaryStatistics replicatedOwnershipStats(TokenMetadata tokenMetadata, - AbstractReplicationStrategy rs, InetAddress endpoint) + AbstractReplicationStrategy rs, InetAddressAndPort endpoint) { SummaryStatistics stat = new SummaryStatistics(); StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint); - for (Map.Entry<InetAddress, Double> en : evaluateReplicatedOwnership(tokenMetadata, rs).entrySet()) + for (Map.Entry<InetAddressAndPort, Double> en : evaluateReplicatedOwnership(tokenMetadata, rs).entrySet()) { // Filter only in the same datacentre. if (strategy.inAllocationRing(en.getKey())) @@ -139,10 +139,10 @@ public class TokenAllocation return stat; } - static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy) + static TokenAllocator<InetAddressAndPort> create(TokenMetadata tokenMetadata, StrategyAdapter strategy) { - NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>(); - for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet()) + NavigableMap<Token, InetAddressAndPort> sortedTokens = new TreeMap<>(); + for (Map.Entry<Token, InetAddressAndPort> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet()) { if (strategy.inAllocationRing(en.getValue())) sortedTokens.put(en.getKey(), en.getValue()); @@ -150,15 +150,15 @@ public class TokenAllocation return TokenAllocatorFactory.createTokenAllocator(sortedTokens, strategy, tokenMetadata.partitioner); } - interface StrategyAdapter extends ReplicationStrategy<InetAddress> + interface StrategyAdapter extends ReplicationStrategy<InetAddressAndPort> { // return true iff the provided endpoint occurs in the same virtual token-ring we are allocating for // i.e. the set of the nodes that share ownership with the node we are allocating // alternatively: return false if the endpoint's ownership is independent of the node we are allocating tokens for - boolean inAllocationRing(InetAddress other); + boolean inAllocationRing(InetAddressAndPort other); } - static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, final InetAddress endpoint) + static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, final InetAddressAndPort endpoint) { if (rs instanceof NetworkTopologyStrategy) return getStrategy(tokenMetadata, (NetworkTopologyStrategy) rs, rs.snitch, endpoint); @@ -167,7 +167,7 @@ public class TokenAllocation throw new ConfigurationException("Token allocation does not support replication strategy " + rs.getClass().getSimpleName()); } - static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final SimpleStrategy rs, final InetAddress endpoint) + static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final SimpleStrategy rs, final InetAddressAndPort endpoint) { final int replicas = rs.getReplicationFactor(); @@ -180,20 +180,20 @@ public class TokenAllocation } @Override - public Object getGroup(InetAddress unit) + public Object getGroup(InetAddressAndPort unit) { return unit; } @Override - public boolean inAllocationRing(InetAddress other) + public boolean inAllocationRing(InetAddressAndPort other) { return true; } }; } - static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final InetAddress endpoint) + static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final NetworkTopologyStrategy rs, final IEndpointSnitch snitch, final InetAddressAndPort endpoint) { final String dc = snitch.getDatacenter(endpoint); final int replicas = rs.getReplicationFactor(dc); @@ -210,13 +210,13 @@ public class TokenAllocation } @Override - public Object getGroup(InetAddress unit) + public Object getGroup(InetAddressAndPort unit) { return unit; } @Override - public boolean inAllocationRing(InetAddress other) + public boolean inAllocationRing(InetAddressAndPort other) { return dc.equals(snitch.getDatacenter(other)); } @@ -237,13 +237,13 @@ public class TokenAllocation } @Override - public Object getGroup(InetAddress unit) + public Object getGroup(InetAddressAndPort unit) { return snitch.getRack(unit); } @Override - public boolean inAllocationRing(InetAddress other) + public boolean inAllocationRing(InetAddressAndPort other) { return dc.equals(snitch.getDatacenter(other)); } @@ -261,13 +261,13 @@ public class TokenAllocation } @Override - public Object getGroup(InetAddress unit) + public Object getGroup(InetAddressAndPort unit) { return unit; } @Override - public boolean inAllocationRing(InetAddress other) + public boolean inAllocationRing(InetAddressAndPort other) { return dc.equals(snitch.getDatacenter(other)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java index 58acb56..5fdba02 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java @@ -18,7 +18,6 @@ package org.apache.cassandra.dht.tokenallocator; -import java.net.InetAddress; import java.util.NavigableMap; import org.slf4j.Logger; @@ -26,13 +25,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; public class TokenAllocatorFactory { private static final Logger logger = LoggerFactory.getLogger(TokenAllocatorFactory.class); - public static TokenAllocator<InetAddress> createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens, - ReplicationStrategy<InetAddress> strategy, - IPartitioner partitioner) + public static TokenAllocator<InetAddressAndPort> createTokenAllocator(NavigableMap<Token, InetAddressAndPort> sortedTokens, + ReplicationStrategy<InetAddressAndPort> strategy, + IPartitioner partitioner) { if(strategy.replicas() == 1) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
