Author: junrao
Date: Sat Dec  5 18:49:12 2009
New Revision: 887576

URL: http://svn.apache.org/viewvc?rev=887576&view=rev
Log:
add manual repair through NodeProbe; patched by Stu Hood, reviewed by junrao 
for CASSANDRA-193

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Sat Dec  5 18:49:12 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -47,6 +48,7 @@
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.HashMultimap;
 
@@ -84,7 +86,7 @@
 
     private static volatile StorageService instance_;
 
-    public static IPartitioner<?> getPartitioner() {
+    public static IPartitioner getPartitioner() {
         return partitioner_;
     }
 
@@ -276,6 +278,7 @@
         SelectorManager.getSelectorManager().start();
         SelectorManager.getUdpSelectorManager().start();
 
+        AntiEntropyService.instance();
         StorageLoadBalancer.instance().startBroadcasting();
 
         // have to start the gossip service before we can see any info on 
other nodes.  this is necessary
@@ -686,12 +689,13 @@
     }
 
     /**
-     * Flush all memtables for a table and column families.
-     * @param tableName
-     * @param columnFamilies
+     * Applies the given Function to all matching column families.
+     * @param function Function taking a column family and possibly returning 
an IOException.
+     * @param tableName Name of matching table.
+     * @param columnFamilies Names of matching column families, or null for 
all.
      * @throws IOException
      */
-    public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException
+    public void foreachColumnFamily(Function<ColumnFamilyStore, IOException> 
function, String tableName, String... columnFamilies) throws IOException
     {
         if (DatabaseDescriptor.getTable(tableName) == null)
         {
@@ -709,14 +713,12 @@
 
         for (String columnFamily : columnFamilies)
         {
-
             if (positiveColumnFamilies.contains(columnFamily))
             {
                 ColumnFamilyStore cfStore = 
table.getColumnFamilyStore(columnFamily);
-                logger_.debug("Forcing binary flush on keyspace " + tableName 
+ ", CF " + columnFamily);
-                cfStore.forceFlushBinary();
-                logger_.debug("Forcing flush on keyspace " + tableName + ", CF 
" + columnFamily);
-                cfStore.forceFlush();
+                IOException result = function.apply(cfStore);
+                if (result != null)
+                    throw result;
             }
             else
             {
@@ -726,6 +728,59 @@
         }
     }
 
+    /**
+     * Flush all memtables for a table and column families.
+     * @param tableName
+     * @param columnFamilies
+     * @throws IOException
+     */
+    public void forceTableFlush(final String tableName, final String... 
columnFamilies) throws IOException
+    {
+        foreachColumnFamily(new Function<ColumnFamilyStore, IOException>()
+            {
+                public IOException apply(ColumnFamilyStore cfStore)
+                {
+                    try
+                    {
+                        logger_.debug("Forcing binary flush on keyspace " + 
tableName + ", CF " + cfStore.getColumnFamilyName());
+                        cfStore.forceFlushBinary();
+                        logger_.debug("Forcing flush on keyspace " + tableName 
+ ", CF " + cfStore.getColumnFamilyName());
+                        cfStore.forceFlush();
+                    }
+                    catch(IOException e)
+                    {
+                        return e;
+                    }
+                    return null;
+                }
+            }, tableName, columnFamilies);
+    }
+
+    /**
+     * Trigger proactive repair for a table and column families.
+     * @param tableName
+     * @param columnFamilies
+     * @throws IOException
+     */
+    public void forceTableRepair(final String tableName, final String... 
columnFamilies) throws IOException
+    {
+        // request that all relevant endpoints generate trees
+        final MessagingService ms = MessagingService.instance();
+        final List<InetAddress> endpoints = 
getNaturalEndpoints(getLocalToken());
+        foreachColumnFamily(new Function<ColumnFamilyStore, IOException>()
+            {
+                public IOException apply(ColumnFamilyStore cfStore)
+                {
+                    Message request = 
TreeRequestVerbHandler.makeVerb(tableName,
+                                                                      
cfStore.getColumnFamilyName());
+                    for (InetAddress endpoint : endpoints)
+                        ms.sendOneWay(request, endpoint);
+
+                    return null;
+                }
+            }, tableName, columnFamilies);
+    }
+
     /* End of MBean interface methods */
     
     /**
@@ -852,10 +907,22 @@
      */
     public List<InetAddress> getNaturalEndpoints(String key)
     {
-        return 
replicationStrategy_.getNaturalEndpoints(partitioner_.getToken(key));
-    }
+        return getNaturalEndpoints(partitioner_.getToken(key));
+    }    
 
     /**
+     * This method returns the N endpoints that are responsible for storing the
+     * specified key i.e for replication.
+     *
+     * @param token - token for which we need to find the endpoint return 
value -
+     * the endpoint responsible for this token
+     */
+    public List<InetAddress> getNaturalEndpoints(Token token)
+    {
+        return replicationStrategy_.getNaturalEndpoints(token);
+    }    
+    
+    /**
      * This method attempts to return N endpoints that are responsible for 
storing the
      * specified key i.e for replication.
      *

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Sat Dec  5 18:49:12 2009
@@ -115,6 +115,15 @@
     public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException;
 
     /**
+     * Triggers proactive repair for given column families, or all 
columnfamilies for the given table
+     * if none are explicitly listed.
+     * @param tableName
+     * @param columnFamilies
+     * @throws IOException
+     */
+    public void forceTableRepair(String tableName, String... columnFamilies) 
throws IOException;
+
+    /**
      * transfer this node's data to other machines and remove it from service.
      */
     public void decommission() throws InterruptedException;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
Sat Dec  5 18:49:12 2009
@@ -186,6 +186,11 @@
         ssProxy.forceTableFlush(tableName, columnFamilies);
     }
 
+    public void forceTableRepair(String tableName, String... columnFamilies) 
throws IOException
+    {
+        ssProxy.forceTableRepair(tableName, columnFamilies);
+    }
+
     /**
      * Write a textual representation of the Cassandra ring.
      * 
@@ -493,7 +498,7 @@
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, 
snapshot [name], clearsnapshot, " +
-                "tpstats, flush, decommission, move, loadbalance, 
cancelpending, " +
+                "tpstats, flush, repair, decommission, move, loadbalance, 
cancelpending, " +
                 " getcompactionthreshold, setcompactionthreshold 
[minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", 
NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -589,7 +594,7 @@
         {
             probe.printThreadPoolStats(System.out);
         }
-        else if (cmdName.equals("flush"))
+        else if (cmdName.equals("flush") || cmdName.equals("repair"))
         {
             if (probe.getArgs().length < 2)
             {
@@ -602,8 +607,11 @@
             for (int i = 0; i < columnFamilies.length; i++)
             {
                 columnFamilies[i] = probe.getArgs()[i + 2];
-            }   
-            probe.forceTableFlush(probe.getArgs()[1], columnFamilies);
+            }
+            if (cmdName.equals("flush"))
+                probe.forceTableFlush(probe.getArgs()[1], columnFamilies);
+            else // cmdName.equals("repair")
+                probe.forceTableRepair(probe.getArgs()[1], columnFamilies);
         }
         else if (cmdName.equals("getcompactionthreshold"))
         {


Reply via email to