Expose 2I to the rest of nodetool Patch by Jason Brown, reviewed by brandonwilliams for CASSANDRA-4464
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cef8eb07 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cef8eb07 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cef8eb07 Branch: refs/heads/trunk Commit: cef8eb07dfb4b81cd4e985bc86c5506602650c93 Parents: f309183 Author: Brandon Williams <[email protected]> Authored: Wed Feb 6 08:33:33 2013 -0600 Committer: Brandon Williams <[email protected]> Committed: Wed Feb 6 08:33:33 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionManager.java | 1 - .../apache/cassandra/service/StorageService.java | 67 ++++++++++++--- src/java/org/apache/cassandra/tools/NodeCmd.java | 7 ++- src/java/org/apache/cassandra/tools/NodeProbe.java | 69 ++++++++++++-- 5 files changed, 122 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bdab1c5..905db57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * Make sstable directory picking blacklist-aware again (CASSANDRA-5193) * Correctly expire gossip states for edge cases (CASSANDRA-5216) * Improve handling of directory creation failures (CASSANDRA-5196) + * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464) 1.2.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 168a3f3..1d9af16 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -278,7 +278,6 @@ public class CompactionManager implements CompactionManagerMBean { public void perform(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) { - assert !cfs.isIndex(); for (final SSTableReader sstable : sstables) { // SSTables are marked by the caller http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 34b2f12..5f00c88 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -35,6 +35,8 @@ import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; import com.google.common.collect.*; + +import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -2084,7 +2086,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException("Cleanup of the system table is neither necessary nor wise"); CounterId.OneShotRenewer counterIdRenewer = new CounterId.OneShotRenewer(); - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies)) { cfStore.forceCleanup(counterIdRenewer); } @@ -2092,19 +2094,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies)) cfStore.scrub(); } public void upgradeSSTables(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, tableName, columnFamilies)) cfStore.sstablesRewrite(); } public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies)) { cfStore.forceMajorCompaction(); } @@ -2157,7 +2159,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new IOException("You must supply a table name"); if (columnFamilyName == null) - throw new IOException("You mus supply a column family name"); + throw new IOException("You must supply a column family name"); + if (columnFamilyName.contains(".")) + throw new IllegalArgumentException("Cannot take a snapshot of a secondary index by itself. Run snapshot on the column family that owns the index."); if (tag == null || tag.equals("")) throw new IOException("You must supply a snapshot name."); @@ -2207,7 +2211,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Cleared out snapshot directories"); } - public Iterable<ColumnFamilyStore> getValidColumnFamilies(String tableName, String... cfNames) throws IOException + /** + * @param allowIndexes Allow index CF names to be passed in + * @param autoAddIndexes Automatically add secondary indexes if a CF has them + * @param tableName keyspace + * @param cfNames CFs + */ + public Iterable<ColumnFamilyStore> getValidColumnFamilies(boolean allowIndexes, boolean autoAddIndexes, String tableName, String... cfNames) throws IOException { Table table = getValidTable(tableName); @@ -2219,14 +2229,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Set<ColumnFamilyStore> valid = new HashSet<ColumnFamilyStore>(); for (String cfName : cfNames) { - ColumnFamilyStore cfStore = table.getColumnFamilyStore(cfName); + //if the CF name is an index, just flush the CF that owns the index + String baseCfName = cfName; + String idxName = null; + if (cfName.contains(".")) // secondary index + { + if(!allowIndexes) + { + logger.warn("Operation not allowed on secondary Index column family ({})", cfName); + continue; + } + + String[] parts = cfName.split("\\.", 2); + baseCfName = parts[0]; + idxName = parts[1]; + } + + ColumnFamilyStore cfStore = table.getColumnFamilyStore(baseCfName); if (cfStore == null) { // this means there was a cf passed in that is not recognized in the keyspace. report it and continue. - logger.warn(String.format("Invalid column family specified: %s. Proceeding with others.", cfName)); + logger.warn(String.format("Invalid column family specified: %s. Proceeding with others.", baseCfName)); continue; } - valid.add(cfStore); + if (idxName != null) + { + Collection< SecondaryIndex > indexes = cfStore.indexManager.getIndexesByNames(new HashSet<String>(Arrays.asList(cfName))); + if (indexes.isEmpty()) + logger.warn(String.format("Invalid column family index specified: %s/%s. Proceeding with others.", baseCfName, idxName)); + else + valid.add(Iterables.get(indexes, 0).getIndexCfs()); + } + else + { + valid.add(cfStore); + if(autoAddIndexes) + { + for(SecondaryIndex si : cfStore.indexManager.getIndexes()) + { + logger.info("adding secondary index {} to operation", si.getIndexName()); + valid.add(si.getIndexCfs()); + } + } + } } return valid; } @@ -2240,7 +2285,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void forceTableFlush(final String tableName, final String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, tableName, columnFamilies)) { logger.debug("Forcing flush on keyspace " + tableName + ", CF " + cfStore.getColumnFamilyName()); cfStore.forceBlockingFlush(); @@ -2367,7 +2412,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException { ArrayList<String> names = new ArrayList<String>(); - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies)) { names.add(cfStore.getColumnFamilyName()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 4438b3d..b74ffb7 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -748,7 +748,12 @@ public class NodeCmd // print out column family statistics for this table for (ColumnFamilyStoreMBean cfstore : columnFamilies) { - outs.println("\t\tColumn Family: " + cfstore.getColumnFamilyName()); + String cfName = cfstore.getColumnFamilyName(); + if(cfName.contains(".")) + outs.println("\t\tColumn Family (index): " + cfName); + else + outs.println("\t\tColumn Family: " + cfName); + outs.println("\t\tSSTable count: " + cfstore.getLiveSSTableCount()); int[] leveledSStables = cfstore.getSSTableCountPerLevel(); if (leveledSStables != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 80abb87..45f6e18 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -581,7 +581,10 @@ public class NodeProbe ColumnFamilyStoreMBean cfsProxy = null; try { - Set<ObjectName> beans = mbeanServerConn.queryNames(new ObjectName("org.apache.cassandra.db:type=*ColumnFamilies,keyspace=" + ks + ",columnfamily=" + cf), null); + String type = cf.contains(".") ? "IndexColumnFamilies" : "ColumnFamilies"; + Set<ObjectName> beans = mbeanServerConn.queryNames( + new ObjectName("org.apache.cassandra.db:type=*" + type +",keyspace=" + ks + ",columnfamily=" + cf), null); + if (beans.isEmpty()) throw new MalformedObjectNameException("couldn't find that bean"); assert beans.size() == 1; @@ -792,28 +795,74 @@ public class NodeProbe class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> { - private Iterator<ObjectName> resIter; private MBeanServerConnection mbeanServerConn; + Iterator<Entry<String, ColumnFamilyStoreMBean>> mbeans; public ColumnFamilyStoreMBeanIterator(MBeanServerConnection mbeanServerConn) - throws MalformedObjectNameException, NullPointerException, IOException + throws MalformedObjectNameException, NullPointerException, IOException { - ObjectName query = new ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*"); - resIter = mbeanServerConn.queryNames(query, null).iterator(); this.mbeanServerConn = mbeanServerConn; + List<Entry<String, ColumnFamilyStoreMBean>> cfMbeans = getCFSMBeans(mbeanServerConn, "ColumnFamilies"); + cfMbeans.addAll(getCFSMBeans(mbeanServerConn, "IndexColumnFamilies")); + Collections.sort(cfMbeans, new Comparator<Entry<String, ColumnFamilyStoreMBean>>() + { + public int compare(Entry<String, ColumnFamilyStoreMBean> e1, Entry<String, ColumnFamilyStoreMBean> e2) + { + //compare keyspace, then CF name, then normal vs. index + int tableCmp = e1.getKey().compareTo(e2.getKey()); + if(tableCmp != 0) + return tableCmp; + + // get CF name and split it for index name + String q = e1.getValue().getColumnFamilyName(); + String h = e2.getValue().getColumnFamilyName(); + String e1CF[] = e1.getValue().getColumnFamilyName().split("\\."); + String e2CF[] = e1.getValue().getColumnFamilyName().split("\\."); + assert e1CF.length <= 2 && e2CF.length <= 2 : "unexpected split count for column family name"; + + //if neither are indexes, just compare CF names + if(e1CF.length == 1 && e2CF.length == 1) + return e1CF[0].compareTo(e2CF[0]); + + //check if it's the same CF + int cfNameCmp = e1CF[0].compareTo(e2CF[0]); + if(cfNameCmp != 0) + return cfNameCmp; + + // if both are indexes (for the same CF), compare them + if(e1CF.length == 2 && e2CF.length == 2) + return e1CF[1].compareTo(e2CF[1]); + + //if length of e1CF is 1, it's not an index, so sort it higher + return e1CF.length == 1 ? 1 : -1; + } + }); + mbeans = cfMbeans.iterator(); + } + + private List<Entry<String, ColumnFamilyStoreMBean>> getCFSMBeans(MBeanServerConnection mbeanServerConn, String type) + throws MalformedObjectNameException, IOException + { + ObjectName query = new ObjectName("org.apache.cassandra.db:type=" + type +",*"); + Set<ObjectName> cfObjects = mbeanServerConn.queryNames(query, null); + List<Entry<String, ColumnFamilyStoreMBean>> mbeans = new ArrayList<Entry<String, ColumnFamilyStoreMBean>>(cfObjects.size()); + for(ObjectName n : cfObjects) + { + String tableName = n.getKeyProperty("keyspace"); + ColumnFamilyStoreMBean cfsProxy = JMX.newMBeanProxy(mbeanServerConn, n, ColumnFamilyStoreMBean.class); + mbeans.add(new AbstractMap.SimpleImmutableEntry<String, ColumnFamilyStoreMBean>(tableName, cfsProxy)); + } + return mbeans; } public boolean hasNext() { - return resIter.hasNext(); + return mbeans.hasNext(); } public Entry<String, ColumnFamilyStoreMBean> next() { - ObjectName objectName = resIter.next(); - String tableName = objectName.getKeyProperty("keyspace"); - ColumnFamilyStoreMBean cfsProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, ColumnFamilyStoreMBean.class); - return new AbstractMap.SimpleImmutableEntry<String, ColumnFamilyStoreMBean>(tableName, cfsProxy); + return mbeans.next(); } public void remove()
