Expose 2I to the rest of nodetool
Patch by Jason Brown, reviewed by brandonwilliams for CASSANDRA-4464


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cef8eb07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cef8eb07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cef8eb07

Branch: refs/heads/trunk
Commit: cef8eb07dfb4b81cd4e985bc86c5506602650c93
Parents: f309183
Author: Brandon Williams <[email protected]>
Authored: Wed Feb 6 08:33:33 2013 -0600
Committer: Brandon Williams <[email protected]>
Committed: Wed Feb 6 08:33:33 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/db/compaction/CompactionManager.java |    1 -
 .../apache/cassandra/service/StorageService.java   |   67 ++++++++++++---
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    7 ++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   69 ++++++++++++--
 5 files changed, 122 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdab1c5..905db57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Make sstable directory picking blacklist-aware again (CASSANDRA-5193)
  * Correctly expire gossip states for edge cases (CASSANDRA-5216)
  * Improve handling of directory creation failures (CASSANDRA-5196)
+ * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464)
 
 
 1.2.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 168a3f3..1d9af16 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -278,7 +278,6 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             public void perform(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables)
             {
-                assert !cfs.isIndex();
                 for (final SSTableReader sstable : sstables)
                 {
                     // SSTables are marked by the caller

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 34b2f12..5f00c88 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -35,6 +35,8 @@ import javax.management.NotificationBroadcasterSupport;
 import javax.management.ObjectName;
 
 import com.google.common.collect.*;
+
+import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -2084,7 +2086,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new RuntimeException("Cleanup of the system table is neither 
necessary nor wise");
 
         CounterId.OneShotRenewer counterIdRenewer = new 
CounterId.OneShotRenewer();
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
tableName, columnFamilies))
         {
             cfStore.forceCleanup(counterIdRenewer);
         }
@@ -2092,19 +2094,19 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void scrub(String tableName, String... columnFamilies) throws 
IOException, ExecutionException, InterruptedException
     {
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
tableName, columnFamilies))
             cfStore.scrub();
     }
 
     public void upgradeSSTables(String tableName, String... columnFamilies) 
throws IOException, ExecutionException, InterruptedException
     {
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, 
tableName, columnFamilies))
             cfStore.sstablesRewrite();
     }
 
     public void forceTableCompaction(String tableName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
tableName, columnFamilies))
         {
             cfStore.forceMajorCompaction();
         }
@@ -2157,7 +2159,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new IOException("You must supply a table name");
 
         if (columnFamilyName == null)
-            throw new IOException("You mus supply a column family name");
+            throw new IOException("You must supply a column family name");
+        if (columnFamilyName.contains("."))
+            throw new IllegalArgumentException("Cannot take a snapshot of a 
secondary index by itself. Run snapshot on the column family that owns the 
index.");
 
         if (tag == null || tag.equals(""))
             throw new IOException("You must supply a snapshot name.");
@@ -2207,7 +2211,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             logger.debug("Cleared out snapshot directories");
     }
 
-    public Iterable<ColumnFamilyStore> getValidColumnFamilies(String 
tableName, String... cfNames) throws IOException
+    /**
+     * @param allowIndexes Allow index CF names to be passed in
+     * @param autoAddIndexes Automatically add secondary indexes if a CF has 
them
+     * @param tableName keyspace
+     * @param cfNames CFs
+     */
+    public Iterable<ColumnFamilyStore> getValidColumnFamilies(boolean 
allowIndexes, boolean autoAddIndexes, String tableName, String... cfNames) 
throws IOException
     {
         Table table = getValidTable(tableName);
 
@@ -2219,14 +2229,49 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         Set<ColumnFamilyStore> valid = new HashSet<ColumnFamilyStore>();
         for (String cfName : cfNames)
         {
-            ColumnFamilyStore cfStore = table.getColumnFamilyStore(cfName);
+            //if the CF name is an index, just flush the CF that owns the index
+            String baseCfName = cfName;
+            String idxName = null;
+            if (cfName.contains(".")) // secondary index
+            {
+                if(!allowIndexes)
+                {
+                   logger.warn("Operation not allowed on secondary Index 
column family ({})", cfName);
+                    continue;
+                }
+
+                String[] parts = cfName.split("\\.", 2);
+                baseCfName = parts[0];
+                idxName = parts[1];
+            }
+
+            ColumnFamilyStore cfStore = table.getColumnFamilyStore(baseCfName);
             if (cfStore == null)
             {
                 // this means there was a cf passed in that is not recognized 
in the keyspace. report it and continue.
-                logger.warn(String.format("Invalid column family specified: 
%s. Proceeding with others.", cfName));
+                logger.warn(String.format("Invalid column family specified: 
%s. Proceeding with others.", baseCfName));
                 continue;
             }
-            valid.add(cfStore);
+            if (idxName != null)
+            {
+                Collection< SecondaryIndex > indexes = 
cfStore.indexManager.getIndexesByNames(new 
HashSet<String>(Arrays.asList(cfName)));
+                if (indexes.isEmpty())
+                    logger.warn(String.format("Invalid column family index 
specified: %s/%s. Proceeding with others.", baseCfName, idxName));
+                else
+                    valid.add(Iterables.get(indexes, 0).getIndexCfs());
+            }
+            else
+            {
+                valid.add(cfStore);
+                if(autoAddIndexes)
+                {
+                    for(SecondaryIndex si : cfStore.indexManager.getIndexes())
+                    {
+                        logger.info("adding secondary index {} to operation", 
si.getIndexName());
+                        valid.add(si.getIndexCfs());
+                    }
+                }
+            }
         }
         return valid;
     }
@@ -2240,7 +2285,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public void forceTableFlush(final String tableName, final String... 
columnFamilies)
                 throws IOException, ExecutionException, InterruptedException
     {
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, 
tableName, columnFamilies))
         {
             logger.debug("Forcing flush on keyspace " + tableName + ", CF " + 
cfStore.getColumnFamilyName());
             cfStore.forceBlockingFlush();
@@ -2367,7 +2412,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> 
range, final String tableName, boolean isSequential, boolean  isLocal, final 
String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
-        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
tableName, columnFamilies))
         {
             names.add(cfStore.getColumnFamilyName());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 4438b3d..b74ffb7 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -748,7 +748,12 @@ public class NodeCmd
             // print out column family statistics for this table
             for (ColumnFamilyStoreMBean cfstore : columnFamilies)
             {
-                outs.println("\t\tColumn Family: " + 
cfstore.getColumnFamilyName());
+                String cfName = cfstore.getColumnFamilyName();
+                if(cfName.contains("."))
+                    outs.println("\t\tColumn Family (index): " + cfName);
+                else
+                    outs.println("\t\tColumn Family: " + cfName);
+
                 outs.println("\t\tSSTable count: " + 
cfstore.getLiveSSTableCount());
                 int[] leveledSStables = cfstore.getSSTableCountPerLevel();
                 if (leveledSStables != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cef8eb07/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 80abb87..45f6e18 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -581,7 +581,10 @@ public class NodeProbe
         ColumnFamilyStoreMBean cfsProxy = null;
         try
         {
-            Set<ObjectName> beans = mbeanServerConn.queryNames(new 
ObjectName("org.apache.cassandra.db:type=*ColumnFamilies,keyspace=" + ks + 
",columnfamily=" + cf), null);
+            String type = cf.contains(".") ? "IndexColumnFamilies" : 
"ColumnFamilies";
+            Set<ObjectName> beans = mbeanServerConn.queryNames(
+                    new ObjectName("org.apache.cassandra.db:type=*" + type 
+",keyspace=" + ks + ",columnfamily=" + cf), null);
+
             if (beans.isEmpty())
                 throw new MalformedObjectNameException("couldn't find that 
bean");
             assert beans.size() == 1;
@@ -792,28 +795,74 @@ public class NodeProbe
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>
 {
-    private Iterator<ObjectName> resIter;
     private MBeanServerConnection mbeanServerConn;
+    Iterator<Entry<String, ColumnFamilyStoreMBean>> mbeans;
 
     public ColumnFamilyStoreMBeanIterator(MBeanServerConnection 
mbeanServerConn)
-    throws MalformedObjectNameException, NullPointerException, IOException
+        throws MalformedObjectNameException, NullPointerException, IOException
     {
-        ObjectName query = new 
ObjectName("org.apache.cassandra.db:type=ColumnFamilies,*");
-        resIter = mbeanServerConn.queryNames(query, null).iterator();
         this.mbeanServerConn = mbeanServerConn;
+        List<Entry<String, ColumnFamilyStoreMBean>> cfMbeans = 
getCFSMBeans(mbeanServerConn, "ColumnFamilies");
+        cfMbeans.addAll(getCFSMBeans(mbeanServerConn, "IndexColumnFamilies"));
+        Collections.sort(cfMbeans, new Comparator<Entry<String, 
ColumnFamilyStoreMBean>>()
+        {
+            public int compare(Entry<String, ColumnFamilyStoreMBean> e1, 
Entry<String, ColumnFamilyStoreMBean> e2)
+            {
+                //compare keyspace, then CF name, then normal vs. index
+                int tableCmp = e1.getKey().compareTo(e2.getKey());
+                if(tableCmp != 0)
+                    return tableCmp;
+
+                // get CF name and split it for index name
+                String q = e1.getValue().getColumnFamilyName();
+                String h = e2.getValue().getColumnFamilyName();
+                String e1CF[] = 
e1.getValue().getColumnFamilyName().split("\\.");
+                String e2CF[] = 
e1.getValue().getColumnFamilyName().split("\\.");
+                assert e1CF.length <= 2 && e2CF.length <= 2 : "unexpected 
split count for column family name";
+
+                //if neither are indexes, just compare CF names
+                if(e1CF.length == 1 && e2CF.length == 1)
+                    return e1CF[0].compareTo(e2CF[0]);
+
+                //check if it's the same CF
+                int cfNameCmp = e1CF[0].compareTo(e2CF[0]);
+                if(cfNameCmp != 0)
+                    return cfNameCmp;
+
+                // if both are indexes (for the same CF), compare them
+                if(e1CF.length == 2 && e2CF.length == 2)
+                    return e1CF[1].compareTo(e2CF[1]);
+
+                //if length of e1CF is 1, it's not an index, so sort it higher
+                return e1CF.length == 1 ? 1 : -1;
+            }
+        });
+        mbeans = cfMbeans.iterator();
+    }
+
+    private List<Entry<String, ColumnFamilyStoreMBean>> 
getCFSMBeans(MBeanServerConnection mbeanServerConn, String type)
+            throws MalformedObjectNameException, IOException
+    {
+        ObjectName query = new ObjectName("org.apache.cassandra.db:type=" + 
type +",*");
+        Set<ObjectName> cfObjects = mbeanServerConn.queryNames(query, null);
+        List<Entry<String, ColumnFamilyStoreMBean>> mbeans = new 
ArrayList<Entry<String, ColumnFamilyStoreMBean>>(cfObjects.size());
+        for(ObjectName n : cfObjects)
+        {
+            String tableName = n.getKeyProperty("keyspace");
+            ColumnFamilyStoreMBean cfsProxy = 
JMX.newMBeanProxy(mbeanServerConn, n, ColumnFamilyStoreMBean.class);
+            mbeans.add(new AbstractMap.SimpleImmutableEntry<String, 
ColumnFamilyStoreMBean>(tableName, cfsProxy));
+        }
+        return mbeans;
     }
 
     public boolean hasNext()
     {
-        return resIter.hasNext();
+        return mbeans.hasNext();
     }
 
     public Entry<String, ColumnFamilyStoreMBean> next()
     {
-        ObjectName objectName = resIter.next();
-        String tableName = objectName.getKeyProperty("keyspace");
-        ColumnFamilyStoreMBean cfsProxy = JMX.newMBeanProxy(mbeanServerConn, 
objectName, ColumnFamilyStoreMBean.class);
-        return new AbstractMap.SimpleImmutableEntry<String, 
ColumnFamilyStoreMBean>(tableName, cfsProxy);
+        return mbeans.next();
     }
 
     public void remove()

Reply via email to