Updated Branches: refs/heads/trunk 8b00f3a25 -> d5ec013ce
Store more informations in peers table patch by slebresne; reviewed by jbellis for CASSANDRA-4351 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5ec013c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5ec013c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5ec013c Branch: refs/heads/trunk Commit: d5ec013cee4f3d923d9618694716a265ab04fe1b Parents: 8b00f3a Author: Sylvain Lebresne <[email protected]> Authored: Fri Oct 5 09:34:51 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Oct 5 09:34:51 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 12 +- .../apache/cassandra/cql3/UntypedResultSet.java | 6 + src/java/org/apache/cassandra/db/SystemTable.java | 134 +++++++-------- .../apache/cassandra/service/StorageService.java | 19 ++ 5 files changed, 95 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 868183e..342135f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,7 @@ * Support repairing only the local DC nodes (CASSANDRA-4747) * Use rpc_address for binary protocol and change default port (CASSANRA-4751) * Fix use of collections in prepared statements (CASSANDRA-4739) + * Store more information into peers table (CASSANDRA-4351) 1.2-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 6abeb33..ef25d2a 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -158,13 +158,19 @@ public final class CFMetaData + "AND COMMENT='hints awaiting delivery'"); public static final CFMetaData PeersCf = compile(12, "CREATE TABLE " + SystemTable.PEERS_CF + " (" - + "token_bytes blob PRIMARY KEY," - + "peer inet" + + "peer inet PRIMARY KEY," + + "ring_id uuid," + + "tokens set<blob>," + + "schema_version uuid," + + "release_version text," + + "rpc_address inet," + + "data_center text," + + "rack text" + ") WITH COMMENT='known peers in the cluster'"); public static final CFMetaData LocalCf = compile(13, "CREATE TABLE " + SystemTable.LOCAL_CF + " (" + "key text PRIMARY KEY," - + "token_bytes blob," + + "tokens set<blob>," + "cluster_name text," + "gossip_generation int," + "bootstrapped text," http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/cql3/UntypedResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java index 203e4c1..ca3acf5 100644 --- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java +++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import com.google.common.collect.AbstractIterator; @@ -130,6 +131,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row> return DateType.instance.compose(data.get(column)); } + public <T> Set<T> getSet(String column, AbstractType<T> type) + { + return SetType.getInstance(type).compose(data.get(column)); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index ead325f..e2ff161 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -25,7 +25,8 @@ import java.util.*; import java.util.concurrent.ExecutionException; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,9 +159,9 @@ public class SystemTable } // serialize the old token as a collection of (one )tokens. Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value()); - String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token))); + String tokenBytes = serializeTokens(Collections.singleton(token)); // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason) - String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', '%s')"; + String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', '%s', '%s')"; processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name())); oldStatusCfs.truncate(); @@ -185,14 +186,43 @@ public class SystemTable return; } - IPartitioner p = StorageService.getPartitioner(); - for (Token token : tokens) + String req = "INSERT INTO system.%s (peer, tokens) VALUES ('%s', %s)"; + processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), serializeTokens(tokens))); + forceBlockingFlush(PEERS_CF); + } + + public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String value) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + return; + + String req = "INSERT INTO system.%s (peer, %s) VALUES ('%s', '%s')"; + processInternal(String.format(req, PEERS_CF, columnName, ep.getHostAddress(), value)); + } + + private static String serializeTokens(Collection<Token> tokens) + { + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + StringBuilder sb = new StringBuilder(); + sb.append("{"); + Iterator<Token> iter = tokens.iterator(); + while (iter.hasNext()) { - String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')"; - String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token)); - processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress())); + sb.append("'").append(ByteBufferUtil.bytesToHex(factory.toByteArray(iter.next()))).append("'"); + if (iter.hasNext()) + sb.append(","); } - forceBlockingFlush(PEERS_CF); + sb.append("}"); + return sb.toString(); + } + + private static Collection<Token> deserializeTokens(Collection<ByteBuffer> tokensBytes) + { + Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + List<Token> tokens = new ArrayList<Token>(tokensBytes.size()); + for (ByteBuffer tk : tokensBytes) + tokens.add(factory.fromByteArray(tk)); + return tokens; } /** @@ -200,13 +230,15 @@ public class SystemTable */ public static synchronized void removeTokens(Collection<Token> tokens) { - IPartitioner p = StorageService.getPartitioner(); - - for (Token token : tokens) + Set<Token> tokenSet = new HashSet<Token>(tokens); + for (Map.Entry<InetAddress, Collection<Token>> entry : loadTokens().asMap().entrySet()) { - String req = "DELETE FROM system.%s WHERE token_bytes = '%s'"; - String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token)); - processInternal(String.format(req, PEERS_CF, tokenBytes)); + Set<Token> toRemove = Sets.intersection(tokenSet, ((Set<Token>)entry.getValue())).immutableCopy(); + if (toRemove.isEmpty()) + continue; + + String req = "UPDATE system.%s SET tokens = tokens - %s WHERE peer = '%s'"; + processInternal(String.format(req, PEERS_CF, serializeTokens(toRemove), entry.getKey())); } forceBlockingFlush(PEERS_CF); } @@ -216,9 +248,8 @@ public class SystemTable */ public static synchronized void updateTokens(Collection<Token> tokens) { - String req = "INSERT INTO system.%s (key, token_bytes) VALUES ('%s', '%s')"; - String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(tokens)); - processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokenBytes)); + String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', %s)"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, serializeTokens(tokens))); forceBlockingFlush(LOCAL_CF); } @@ -238,53 +269,6 @@ public class SystemTable return tokens; } - /** Serialize a collection of tokens to bytes */ - private static ByteBuffer serializeTokens(Collection<Token> tokens) - { - // Guesstimate the total number of bytes needed - int estCapacity = (tokens.size() * 16) + (tokens.size() * 2); - ByteBuffer toks = ByteBuffer.allocate(estCapacity); - IPartitioner p = StorageService.getPartitioner(); - - for (Token token : tokens) - { - ByteBuffer tokenBytes = p.getTokenFactory().toByteArray(token); - - // If we blow the buffer, grow it by double - if (toks.remaining() < (2 + tokenBytes.remaining())) - { - estCapacity = estCapacity * 2; - ByteBuffer newToks = ByteBuffer.allocate(estCapacity); - toks.flip(); - newToks.put(toks); - toks = newToks; - } - - toks.putShort((short)tokenBytes.remaining()); - toks.put(tokenBytes); - } - - toks.flip(); - return toks; - } - - private static Collection<Token> deserializeTokens(ByteBuffer tokenBytes) - { - List<Token> tokens = new ArrayList<Token>(); - IPartitioner p = StorageService.getPartitioner(); - - while(tokenBytes.hasRemaining()) - { - short len = tokenBytes.getShort(); - ByteBuffer dup = tokenBytes.slice(); - dup.limit(len); - tokenBytes.position(tokenBytes.position() + len); - tokens.add(p.getTokenFactory().fromByteArray(dup)); - } - - return tokens; - } - private static void forceBlockingFlush(String cfname) { try @@ -305,13 +289,15 @@ public class SystemTable * Return a map of stored tokens to IP addresses * */ - public static Multimap<InetAddress, Token> loadTokens() + public static SetMultimap<InetAddress, Token> loadTokens() { - IPartitioner p = StorageService.getPartitioner(); - - Multimap<InetAddress, Token> tokenMap = HashMultimap.create(); - for (UntypedResultSet.Row row : processInternal("SELECT * FROM system." + PEERS_CF)) - tokenMap.put(row.getInetAddress("peer"), p.getTokenFactory().fromByteArray(row.getBytes("token_bytes"))); + SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create(); + for (UntypedResultSet.Row row : processInternal("SELECT peer, tokens FROM system." + PEERS_CF)) + { + InetAddress peer = row.getInetAddress("peer"); + if (row.has("tokens")) + tokenMap.putAll(peer, deserializeTokens(row.getSet("tokens", BytesType.instance))); + } return tokenMap; } @@ -361,11 +347,11 @@ public class SystemTable public static Collection<Token> getSavedTokens() { - String req = "SELECT token_bytes FROM system.%s WHERE key='%s'"; + String req = "SELECT tokens FROM system.%s WHERE key='%s'"; UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); - return result.isEmpty() || !result.one().has("token_bytes") + return result.isEmpty() || !result.one().has("tokens") ? Collections.<Token>emptyList() - : deserializeTokens(result.one().getBytes("token_bytes")); + : deserializeTokens(result.one().<ByteBuffer>getSet("tokens", BytesType.instance)); } public static int incrementAndGetGeneration() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5ec013c/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 01b9e80..47c4c92 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1120,6 +1120,25 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe handleStateMoving(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_RELOCATING)) handleStateRelocating(endpoint, pieces); + break; + case RELEASE_VERSION: + SystemTable.updatePeerInfo(endpoint, "release_version", value.value); + break; + case DC: + SystemTable.updatePeerInfo(endpoint, "data_center", value.value); + break; + case RACK: + SystemTable.updatePeerInfo(endpoint, "rack", value.value); + break; + case RPC_ADDRESS: + SystemTable.updatePeerInfo(endpoint, "rpc_address", value.value); + break; + case SCHEMA: + SystemTable.updatePeerInfo(endpoint, "schema_version", value.value); + break; + case HOST_ID: + SystemTable.updatePeerInfo(endpoint, "ring_id", value.value); + break; } }
