Author: brandonwilliams Date: Wed Aug 31 19:57:17 2011 New Revision: 1163772
URL: http://svn.apache.org/viewvc?rev=1163772&view=rev Log: Restore getRangeToEndpointMap. Patch by Nick Bailey, reviewed by brandonwilliams for CASSANDRA-3106 Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1163772&r1=1163771&r2=1163772&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 31 19:57:17 2011 @@ -161,7 +161,7 @@ public class StorageService implements I private IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); public VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); - + public static final StorageService instance = new StorageService(); public static IPartitioner getPartitioner() @@ -249,7 +249,7 @@ public class StorageService implements I MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler()); - + MessagingService.instance().registerVerbHandlers(Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); @@ -325,10 +325,10 @@ public class StorageService implements I try { Thread.sleep(1000L); } catch (InterruptedException e) {} StageManager.shutdownNow(); } - - public boolean isInitialized() - { - return initialized; + + public boolean isInitialized() + { + return initialized; } public synchronized void initClient() throws IOException, ConfigurationException @@ -351,7 +351,7 @@ public class StorageService implements I Gossiper.instance.register(this); Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering. MessagingService.instance().listen(FBUtilities.getLocalAddress()); - + // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply). try { @@ -597,6 +597,27 @@ public class StorageService implements I } /** + * for a keyspace, return the ranges and corresponding listen addresses. + * @param keyspace + * @return + */ + public Map<Range, List<String>> getRangeToEndpointMap(String keyspace) + { + // some people just want to get a visual representation of things. Allow null and set it to the first + // non-system table. + if (keyspace == null) + keyspace = Schema.instance.getNonSystemTables().get(0); + + /* All the ranges for the tokens */ + Map<Range, List<String>> map = new HashMap<Range, List<String>>(); + for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) + { + map.put(entry.getKey(), stringify(entry.getValue())); + } + return map; + } + + /** * for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace. * @param keyspace * @return @@ -686,7 +707,7 @@ public class StorageService implements I * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring. - * + * * Normal progression of ApplicationState.STATUS values for a node should be like this: * STATUS_BOOTSTRAPPING,token * if bootstrapping. stays this way until all files are received. @@ -1091,7 +1112,7 @@ public class StorageService implements I * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges) + private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges) { InetAddress myAddress = FBUtilities.getBroadcastAddress(); Multimap<Range, InetAddress> rangeAddresses = Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata_); @@ -1114,14 +1135,14 @@ public class StorageService implements I sourceRanges.put(source, range); break; } - } + } } return sourceRanges; } /** * Sends a notification to a node indicating we have finished replicating data. - * + * * @param local the local address * @param remote node to send notification to */ @@ -1135,7 +1156,7 @@ public class StorageService implements I while (failureDetector.isAlive(remote)) { IAsyncResult iar = MessagingService.instance().sendRR(msg, remote); - try + try { iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); return; // done @@ -1166,7 +1187,7 @@ public class StorageService implements I for (String table : Schema.instance.getNonSystemTables()) { - Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(table, endpoint); + Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(table, endpoint); Set<Range> myNewRanges = new HashSet<Range>(); for (Map.Entry<Range, InetAddress> entry : changedRanges.entries()) { @@ -1574,7 +1595,7 @@ public class StorageService implements I */ public void forceTableRepair(final String tableName, final String... columnFamilies) throws IOException { - if (Table.SYSTEM_TABLE.equals(tableName)) + if (Table.SYSTEM_TABLE.equals(tableName)) { return; } @@ -1678,7 +1699,7 @@ public class StorageService implements I if (logger_.isDebugEnabled()) logger_.debug("computing ranges for " + StringUtils.join(sortedTokens, ", ")); - if (sortedTokens.isEmpty()) + if (sortedTokens.isEmpty()) return Collections.emptyList(); List<Range> ranges = new ArrayList<Range>(); int size = sortedTokens.size(); @@ -1760,7 +1781,7 @@ public class StorageService implements I /** * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible for into pieces of roughly keysPerSplit - */ + */ public List<Token> getSplits(String table, String cfName, Range range, int keysPerSplit) { List<Token> tokens = new ArrayList<Token>(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1163772&r1=1163771&r2=1163772&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Aug 31 19:57:17 2011 @@ -34,7 +34,7 @@ import org.apache.cassandra.thrift.Unava public interface StorageServiceMBean -{ +{ /** * Retrieve the list of live nodes in the cluster, where "liveness" is * determined by the failure detector of the node being queried. @@ -111,6 +111,14 @@ public interface StorageServiceMBean public String getSavedCachesLocation(); /** + * Retrieve a map of range to end points that describe the ring topology + * of a Cassandra cluster. + * + * @return mapping of ranges to end points + */ + public Map<Range, List<String>> getRangeToEndpointMap(String keyspace); + + /** * Retrieve a map of range to rpc addresses that describe the ring topology * of a Cassandra cluster. * @@ -150,7 +158,7 @@ public interface StorageServiceMBean * @return generation number */ public int getCurrentGenerationNumber(); - + /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication.