Author: jbellis
Date: Wed Dec 29 18:26:43 2010
New Revision: 1053696

URL: http://svn.apache.org/viewvc?rev=1053696&view=rev
Log:
merge r1053245, r1053247, r1053409 from 0.7

Added:
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IMessageCallback.java
      - copied unchanged from r1053245, 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
Modified:
    cassandra/branches/cassandra-0.7.0/   (props changed)
    cassandra/branches/cassandra-0.7.0/CHANGES.txt
    
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java

Propchange: cassandra/branches/cassandra-0.7.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1053218
-/cassandra/branches/cassandra-0.7:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6:922689-1053244
+/cassandra/branches/cassandra-0.7:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 /cassandra/trunk:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/branches/cassandra-0.7.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/CHANGES.txt?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7.0/CHANGES.txt Wed Dec 29 18:26:43 2010
@@ -4,6 +4,11 @@ dev
    histograms in StorageProxyMBean (CASSANDRA-1893)
  * fix CLI get recognition of supercolumns (CASSANDRA-1899)
  * enable keepalive on intra-cluster sockets (CASSANDRA-1766)
+ * count timeouts towards dynamicsnitch latencies (CASSANDRA-1905)
+ * Expose index-building status in JMX + cli schema description
+   (CASSANDRA-1871)
+ * allow [LOCAL|EACH]_QUORUM to be used with non-NetworkTopology 
+   replication Strategies
 
 
 0.7.0-rc3

Propchange: 
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: 
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
 Wed Dec 29 18:26:43 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cli;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
@@ -25,8 +26,11 @@ import com.google.common.base.Charsets;
 import org.antlr.runtime.tree.Tree;
 import org.apache.cassandra.auth.SimpleAuthenticator;
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -1206,6 +1210,11 @@ public class CliClient extends CliUserHe
 
     private void describeKeySpace(String keySpaceName, KsDef metadata) throws 
TException
     {
+        NodeProbe probe = sessionState.getNodeProbe();
+
+        // getting compaction manager MBean to displaying index building 
information
+        CompactionManagerMBean compactionManagerMBean = (probe == null) ? null 
: probe.getCompactionManagerProxy();
+
         // Describe and display
         sessionState.out.println("Keyspace: " + keySpaceName + ":");
         try
@@ -1213,10 +1222,12 @@ public class CliClient extends CliUserHe
             KsDef ks_def;
             ks_def = metadata == null ? 
thriftClient.describe_keyspace(keySpaceName) : metadata;
             sessionState.out.println("  Replication Strategy: " + 
ks_def.strategy_class);
+
             if (ks_def.strategy_class.endsWith(".NetworkTopologyStrategy"))
                 sessionState.out.println("    Options: " + 
FBUtilities.toString(ks_def.strategy_options));
             else
                 sessionState.out.println("    Replication Factor: " + 
ks_def.replication_factor);
+
             sessionState.out.println("  Column Families:");
 
             boolean isSuper;
@@ -1224,6 +1235,9 @@ public class CliClient extends CliUserHe
             Collections.sort(ks_def.cf_defs, new CfDefNamesComparator());
             for (CfDef cf_def : ks_def.cf_defs)
             {
+                // fetching bean for current column family store
+                ColumnFamilyStoreMBean cfMBean = (probe == null) ? null : 
probe.getCfsProxy(ks_def.getName(), cf_def.getName());
+
                 isSuper = cf_def.column_type.equals("Super");
                 sessionState.out.printf("    ColumnFamily: %s%s%n", 
cf_def.name, isSuper ? " (Super)" : "");
 
@@ -1241,6 +1255,12 @@ public class CliClient extends CliUserHe
                 sessionState.out.printf("      Compaction min/max thresholds: 
%s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
                 sessionState.out.printf("      Read repair chance: %s%n", 
cf_def.read_repair_chance);
 
+                // if we have connection to the cfMBean established
+                if (cfMBean != null)
+                {
+                    sessionState.out.printf("      Built indexes: %s%n", 
cfMBean.getBuiltIndexes());
+                }
+
                 if (cf_def.getColumn_metadataSize() != 0)
                 {
                     String leftSpace = "      ";
@@ -1281,6 +1301,26 @@ public class CliClient extends CliUserHe
                     }
                 }
             }
+
+            // compaction manager information
+            if (compactionManagerMBean != null)
+            {
+                String compactionType = 
compactionManagerMBean.getCompactionType();
+
+                // if ongoing compaction type is index build
+                if (compactionType != null && compactionType.contains("index 
build"))
+                {
+                    String indexName         = 
compactionManagerMBean.getColumnFamilyInProgress();
+                    long bytesCompacted      = 
compactionManagerMBean.getBytesCompacted();
+                    long totalBytesToProcess = 
compactionManagerMBean.getBytesTotalInProgress();
+
+                    sessionState.out.printf("%nCurrently building index %s, 
completed %d of %d bytes.%n", indexName, bytesCompacted, totalBytesToProcess);
+                }
+            }
+
+            // closing JMX connection
+            if (probe != null)
+                probe.close();
         }
         catch (InvalidRequestException e)
         {
@@ -1290,7 +1330,12 @@ public class CliClient extends CliUserHe
         {
             sessionState.out.println("Keyspace " + keySpaceName + " could not 
be found.");
         }
+        catch (IOException e)
+        {
+            sessionState.out.println("Error while closing JMX connection: " + 
e.getMessage());
+        }
     }
+
     // DESCRIBE KEYSPACE <keyspace_name> 
     private void executeDescribeKeySpace(Tree statement) throws TException, 
InvalidRequestException
     {

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
 Wed Dec 29 18:26:43 2010
@@ -39,7 +39,8 @@ public class CliOptions {
     private static final String BATCH_OPTION = "batch";
     private static final String HELP_OPTION = "help";
     private static final String FILE_OPTION = "file";
-    
+    private static final String JMX_PORT_OPTION = "jmxport";
+
     // Default values for optional command line arguments
     private static final int    DEFAULT_THRIFT_PORT = 9160;
 
@@ -57,12 +58,13 @@ public class CliOptions {
         options.addOption(KEYSPACE_OPTION, true, "cassandra keyspace user is 
authenticated against");
         options.addOption(BATCH_OPTION, false, "enabled batch mode (supress 
output; errors are fatal)");
         options.addOption(FILE_OPTION, true, "load statements from the 
specific file.");
+        options.addOption(JMX_PORT_OPTION, true, "JMX service port.");
         options.addOption(HELP_OPTION, false, "usage help.");
     }
 
     private static void printUsage()
     {
-        System.err.println("Usage: cassandra-cli --host hostname [--port 
<portname>] [--file <filename>] [--unframed] [--debug]");
+        System.err.println("Usage: cassandra-cli --host hostname [--port 
<port>] [--jmxport <port>] [--file <filename>] [--unframed] [--debug]");
         System.err.println("\t[--username username] [--password password] 
[--keyspace keyspace] [--batch] [--help]");
     }
 
@@ -138,6 +140,11 @@ public class CliOptions {
                 css.filename = cmd.getOptionValue(FILE_OPTION);
             }
 
+            if (cmd.hasOption(JMX_PORT_OPTION))
+            {
+                css.jmxPort = 
Integer.parseInt(cmd.getOptionValue(JMX_PORT_OPTION));
+            }
+
             if (cmd.hasOption(HELP_OPTION))
             {
                 printUsage();

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
 Wed Dec 29 18:26:43 2010
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.cli;
 
+import org.apache.cassandra.tools.NodeProbe;
+
 import java.io.InputStream;
 import java.io.PrintStream;
 
@@ -36,6 +38,7 @@ public class CliSessionState
     public String  keyspace;      // cassandra keyspace user is authenticating
     public boolean batch = false; // enable/disable batch processing mode
     public String  filename = ""; // file to read commands from
+    public int     jmxPort = 8080;// JMX service port
 
     /*
      * Streams to read/write from
@@ -65,4 +68,19 @@ public class CliSessionState
     {
         return !this.filename.isEmpty();
     }
+
+    public NodeProbe getNodeProbe()
+    {
+        try
+        {
+            return new NodeProbe(hostName, jmxPort);
+        }
+        catch (Exception e)
+        {
+            err.printf("WARNING: Could not connect to the JMX on %s:%d, 
information won't be shown.%n%n", hostName, jmxPort);
+        }
+
+        return null;
+    }
+
 }

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Wed Dec 29 18:26:43 2010
@@ -314,7 +314,7 @@ public class ColumnFamilyStore implement
             return;
 
         // if we're just linking in the index to indexedColumns on an 
already-built index post-restart, we're done
-        if (SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+        if (indexedCfs.isIndexBuilt())
             return;
 
         // build it asynchronously; addIndex gets called by CFS open and 
schema update, neither of which
@@ -1925,4 +1925,33 @@ public class ColumnFamilyStore implement
 
         return histogram;
     }
+
+    /**
+     * Check if index is already built for current store
+     * @return true if built, false otherwise
+     */
+    public boolean isIndexBuilt()
+    {
+        return SystemTable.isIndexBuilt(table.name, columnFamily);
+    }
+
+    /**
+     * Returns a list of the names of the built column indexes for current 
store
+     * @return list of the index names
+     */
+    public List<String> getBuiltIndexes()
+    {
+        List<String> indexes = new ArrayList<String>();
+
+        for (ColumnFamilyStore cfs : indexedColumns.values())
+        {
+            if (cfs.isIndexBuilt())
+            {
+                indexes.add(cfs.columnFamily); // store.columnFamily 
represents a name of the index
+            }
+        }
+
+        return indexes;
+    }
+
 }

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
 Wed Dec 29 18:26:43 2010
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -213,4 +214,10 @@ public interface ColumnFamilyStoreMBean
 
     public long[] getEstimatedRowSizeHistogram();
     public long[] getEstimatedColumnCountHistogram();
+
+    /**
+     * Returns a list of the names of the built column indexes for current 
store
+     * @return list of the index names
+     */
+    public List<String> getBuiltIndexes();
 }

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java 
Wed Dec 29 18:26:43 2010
@@ -643,7 +643,7 @@ public class Table
 
         public String getTaskType()
         {
-            return "Secondary index build";
+            return String.format("Secondary index build %s", cfs.columnFamily);
         }
     }
 

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Wed Dec 29 18:26:43 2010
@@ -224,6 +224,10 @@ public abstract class AbstractReplicatio
 
     public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver 
responseResolver, ConsistencyLevel consistencyLevel)
     {
+        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+        {
+            return new DatacenterQuorumResponseHandler(responseResolver, 
consistencyLevel, table);
+        }
         return new QuorumResponseHandler(responseResolver, consistencyLevel, 
table);
     }
 

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
 Wed Dec 29 18:26:43 2010
@@ -164,18 +164,4 @@ public class NetworkTopologyStrategy ext
         }
         return super.getWriteResponseHandler(writeEndpoints, hintedEndpoints, 
consistency_level);
     }
-
-    /**
-     * This method will generate the WRH object and returns. If the Consistency
-     * level is LOCAL_QUORUM/EACH_QUORUM then it will return a DCQRH.
-     */
-    @Override
-    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver 
responseResolver, ConsistencyLevel consistencyLevel)
-    {
-        if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || 
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
-        {
-            return new DatacenterQuorumResponseHandler(responseResolver, 
consistencyLevel, table);
-        }
-        return super.getQuorumResponseHandler(responseResolver, 
consistencyLevel);
-    }
 }

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
 Wed Dec 29 18:26:43 2010
@@ -94,6 +94,8 @@ class AsyncResult implements IAsyncResul
         {
             lock.unlock();
         }        
+
+        MessagingService.removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
 Wed Dec 29 18:26:43 2010
@@ -24,7 +24,7 @@ package org.apache.cassandra.net;
  * service.  In particular, if any shared state is referenced, making
  * response alone synchronized will not suffice.
  */
-public interface IAsyncCallback 
+public interface IAsyncCallback extends IMessageCallback
 {
        /**
         * @param msg response received.

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
 Wed Dec 29 18:26:43 2010
@@ -22,7 +22,7 @@ import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public interface IAsyncResult
+public interface IAsyncResult extends IMessageCallback
 {    
     /**
      * Same operation as the above get() but allows the calling

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
 Wed Dec 29 18:26:43 2010
@@ -36,6 +36,9 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +46,8 @@ import org.apache.cassandra.concurrent.D
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.GCInspector;
@@ -54,7 +59,7 @@ import org.apache.cassandra.utils.GuidGe
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-public class MessagingService implements MessagingServiceMBean
+public class MessagingService implements MessagingServiceMBean, 
ILatencyPublisher
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
@@ -64,9 +69,9 @@ public class MessagingService implements
     public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private static ExpiringMap<String, IAsyncCallback> callbackMap_;
-    private static ExpiringMap<String, IAsyncResult> taskCompletionMap_;
-    
+    private static ExpiringMap<String, IMessageCallback> callbacks;
+    private static Multimap<String, InetAddress> targets;
+
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
@@ -83,6 +88,8 @@ public class MessagingService implements
     private SocketThread socketThread;
     private SimpleCondition listenGate;
     private static final Map<StorageService.Verb, AtomicInteger> 
droppedMessages = new EnumMap<StorageService.Verb, 
AtomicInteger>(StorageService.Verb.class);
+    private final List<ILatencySubscriber> subscribers = new 
ArrayList<ILatencySubscriber>();
+
     static
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
@@ -99,15 +106,6 @@ public class MessagingService implements
     {
         listenGate = new SimpleCondition();
         verbHandlers_ = new EnumMap<StorageService.Verb, 
IVerbHandler>(StorageService.Verb.class);
-        /*
-         * Leave callbacks in the cachetable long enough that any related 
messages will arrive
-         * before the callback is evicted from the table. The concurrency 
level is set at 128
-         * which is the sum of the threads in the pool that adds shit into the 
table and the 
-         * pool that retrives the callback from here.
-        */
-        callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()));
-        taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 
* DatabaseDescriptor.getRpcTimeout()));
-
         streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming", 
DatabaseDescriptor.getCompactionThreadPriority());
         Runnable logDropped = new Runnable()
         {
@@ -118,6 +116,26 @@ public class MessagingService implements
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, 
LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
+        Function<String, ?> timeoutReporter = new Function<String, Object>()
+        {
+            public Object apply(String messageId)
+            {
+                Collection<InetAddress> addresses = 
targets.removeAll(messageId);
+                if (addresses == null)
+                    return null;
+
+                for (InetAddress address : addresses)
+                {
+                    for (ILatencySubscriber subscriber : subscribers)
+                        subscriber.receiveTiming(address, (double) 
DatabaseDescriptor.getRpcTimeout());
+                }
+
+                return null;
+            }
+        };
+        targets = ArrayListMultimap.create();
+        callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
@@ -230,6 +248,7 @@ public class MessagingService implements
         addCallback(cb, messageId);
         for (InetAddress endpoint : to)
         {
+            targets.put(messageId, endpoint);
             sendOneWay(message, endpoint);
         }
         return messageId;
@@ -237,7 +256,7 @@ public class MessagingService implements
 
     public void addCallback(IAsyncCallback cb, String messageId)
     {
-        callbackMap_.put(messageId, cb);
+        callbacks.put(messageId, cb);
     }
 
     /**
@@ -254,6 +273,7 @@ public class MessagingService implements
     {        
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
+        targets.put(messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -280,6 +300,7 @@ public class MessagingService implements
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
+            targets.put(groupId, to.get(i));
             sendOneWay(messages[i], to.get(i));
         }
         return groupId;
@@ -332,7 +353,8 @@ public class MessagingService implements
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        taskCompletionMap_.put(message.getMessageId(), iar);
+        callbacks.put(message.getMessageId(), iar);
+        targets.put(message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -350,6 +372,11 @@ public class MessagingService implements
         streamExecutor_.execute(new FileStreamTask(header, to));
     }
     
+    public void register(ILatencySubscriber subcriber)
+    {
+        subscribers.add(subcriber);
+    }
+
     /** blocks until the processing pools are empty and done. */
     public static void waitFor() throws InterruptedException
     {
@@ -371,10 +398,7 @@ public class MessagingService implements
         }
 
         streamExecutor_.shutdownNow();
-
-        /* shut down the cachetables */
-        taskCompletionMap_.shutdown();
-        callbackMap_.shutdown();
+        callbacks.shutdown();
 
         logger_.info("Shutdown complete (no further commands will be 
processed)");
     }
@@ -391,29 +415,25 @@ public class MessagingService implements
         stage.execute(runnable);
     }
 
-    public static IAsyncCallback getRegisteredCallback(String messageId)
-    {
-        return callbackMap_.get(messageId);
-    }
-    
-    public static void removeRegisteredCallback(String messageId)
+    public static IMessageCallback getRegisteredCallback(String messageId)
     {
-        callbackMap_.remove(messageId);
+        return callbacks.get(messageId);
     }
     
-    public static IAsyncResult getAsyncResult(String messageId)
+    public static IMessageCallback removeRegisteredCallback(String messageId)
     {
-        return taskCompletionMap_.remove(messageId);
+        targets.removeAll(messageId); // TODO fix this when we clean up quorum 
reads to do proper RR
+        return callbacks.remove(messageId);
     }
 
     public static long getRegisteredCallbackAge(String messageId)
     {
-        return callbackMap_.getAge(messageId);
+        return callbacks.getAge(messageId);
     }
 
-    public static long getAsyncResultAge(String messageId)
+    public static void responseReceivedFrom(String messageId, InetAddress from)
     {
-        return taskCompletionMap_.getAge(messageId);
+        targets.remove(messageId, from);
     }
 
     public static void validateMagic(int magic) throws IOException

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 Wed Dec 29 18:26:43 2010
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.net;
 
-
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,35 +36,28 @@ public class ResponseVerbHandler impleme
 
     public void doVerb(Message message)
     {     
-        String messageId = message.getMessageId();        
-        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
-        double age = 0;
-        if (cb != null)
+        String messageId = message.getMessageId();
+        MessagingService.responseReceivedFrom(messageId, message.getFrom());
+        double age = System.currentTimeMillis() - 
MessagingService.getRegisteredCallbackAge(messageId);
+        IMessageCallback cb = 
MessagingService.getRegisteredCallback(messageId);
+        if (cb == null)
+            return;
+
+        // if cb is not null, then age will be valid
+        for (ILatencySubscriber subscriber : subscribers)
+            subscriber.receiveTiming(message.getFrom(), age);
+
+        if (cb instanceof IAsyncCallback)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Processing response on a callback from " + 
message.getMessageId() + "@" + message.getFrom());
-            age = System.currentTimeMillis() - 
MessagingService.getRegisteredCallbackAge(messageId);
-            cb.response(message);
+            ((IAsyncCallback) cb).response(message);
         }
         else
         {
-            IAsyncResult ar = MessagingService.getAsyncResult(messageId);
-            if (ar != null)
-            {
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Processing response on an async result from 
" + message.getMessageId() + "@" + message.getFrom());
-                age = System.currentTimeMillis() - 
MessagingService.getAsyncResultAge(messageId);
-                ar.result(message);
-            }
-        }
-        notifySubscribers(message.getFrom(), age);
-    }
-
-    private void notifySubscribers(InetAddress host, double latency)
-    {
-        for (ILatencySubscriber subscriber : subscribers)
-        {
-            subscriber.receiveTiming(host, latency);
+            if (logger_.isDebugEnabled())
+                logger_.debug("Processing response on an async result from " + 
message.getMessageId() + "@" + message.getFrom());
+            ((IAsyncResult) cb).result(message);
         }
     }
 

Modified: 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
 (original)
+++ 
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
 Wed Dec 29 18:26:43 2010
@@ -22,7 +22,9 @@ import java.util.Enumeration;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Callable;
 
+import com.google.common.base.Function;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +33,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ExpiringMap.class);
+    private final Function<K, ?> postExpireHook;
 
     private static class CacheableObject<T>
     {
@@ -76,6 +79,7 @@ public class ExpiringMap<K, V>
                     if (co != null && co.isReadyToDie(expiration))
                     {
                         cache.remove(key);
+                        postExpireHook.apply(key);
                     }
                 }
             }
@@ -86,12 +90,18 @@ public class ExpiringMap<K, V>
     private final Timer timer;
     private static int counter = 0;
 
-    /*
-    * Specify the TTL for objects in the cache
-    * in milliseconds.
-    */
     public ExpiringMap(long expiration)
     {
+        this(expiration, null);
+    }
+
+    /**
+     *
+     * @param expiration the TTL for objects in the cache in milliseconds
+     */
+    public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+    {
+        this.postExpireHook = postExpireHook;
         if (expiration <= 0)
         {
             throw new IllegalArgumentException("Argument specified must be a 
positive number");


Reply via email to