Author: junrao
Date: Sat Dec 5 18:49:12 2009
New Revision: 887576
URL: http://svn.apache.org/viewvc?rev=887576&view=rev
Log:
add manual repair through NodeProbe; patched by Stu Hood, reviewed by junrao
for CASSANDRA-193
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sat Dec 5 18:49:12 2009
@@ -37,6 +37,7 @@
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -47,6 +48,7 @@
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
+import com.google.common.base.Function;
import com.google.common.collect.Multimap;
import com.google.common.collect.HashMultimap;
@@ -84,7 +86,7 @@
private static volatile StorageService instance_;
- public static IPartitioner<?> getPartitioner() {
+ public static IPartitioner getPartitioner() {
return partitioner_;
}
@@ -276,6 +278,7 @@
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
+ AntiEntropyService.instance();
StorageLoadBalancer.instance().startBroadcasting();
// have to start the gossip service before we can see any info on
other nodes. this is necessary
@@ -686,12 +689,13 @@
}
/**
- * Flush all memtables for a table and column families.
- * @param tableName
- * @param columnFamilies
+ * Applies the given Function to all matching column families.
+ * @param function Function taking a column family and possibly returning
an IOException.
+ * @param tableName Name of matching table.
+ * @param columnFamilies Names of matching column families, or null for
all.
* @throws IOException
*/
- public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException
+ public void foreachColumnFamily(Function<ColumnFamilyStore, IOException>
function, String tableName, String... columnFamilies) throws IOException
{
if (DatabaseDescriptor.getTable(tableName) == null)
{
@@ -709,14 +713,12 @@
for (String columnFamily : columnFamilies)
{
-
if (positiveColumnFamilies.contains(columnFamily))
{
ColumnFamilyStore cfStore =
table.getColumnFamilyStore(columnFamily);
- logger_.debug("Forcing binary flush on keyspace " + tableName
+ ", CF " + columnFamily);
- cfStore.forceFlushBinary();
- logger_.debug("Forcing flush on keyspace " + tableName + ", CF
" + columnFamily);
- cfStore.forceFlush();
+ IOException result = function.apply(cfStore);
+ if (result != null)
+ throw result;
}
else
{
@@ -726,6 +728,59 @@
}
}
+ /**
+ * Flush all memtables for a table and column families.
+ * @param tableName
+ * @param columnFamilies
+ * @throws IOException
+ */
+ public void forceTableFlush(final String tableName, final String...
columnFamilies) throws IOException
+ {
+ foreachColumnFamily(new Function<ColumnFamilyStore, IOException>()
+ {
+ public IOException apply(ColumnFamilyStore cfStore)
+ {
+ try
+ {
+ logger_.debug("Forcing binary flush on keyspace " +
tableName + ", CF " + cfStore.getColumnFamilyName());
+ cfStore.forceFlushBinary();
+ logger_.debug("Forcing flush on keyspace " + tableName
+ ", CF " + cfStore.getColumnFamilyName());
+ cfStore.forceFlush();
+ }
+ catch(IOException e)
+ {
+ return e;
+ }
+ return null;
+ }
+ }, tableName, columnFamilies);
+ }
+
+ /**
+ * Trigger proactive repair for a table and column families.
+ * @param tableName
+ * @param columnFamilies
+ * @throws IOException
+ */
+ public void forceTableRepair(final String tableName, final String...
columnFamilies) throws IOException
+ {
+ // request that all relevant endpoints generate trees
+ final MessagingService ms = MessagingService.instance();
+ final List<InetAddress> endpoints =
getNaturalEndpoints(getLocalToken());
+ foreachColumnFamily(new Function<ColumnFamilyStore, IOException>()
+ {
+ public IOException apply(ColumnFamilyStore cfStore)
+ {
+ Message request =
TreeRequestVerbHandler.makeVerb(tableName,
+
cfStore.getColumnFamilyName());
+ for (InetAddress endpoint : endpoints)
+ ms.sendOneWay(request, endpoint);
+
+ return null;
+ }
+ }, tableName, columnFamilies);
+ }
+
/* End of MBean interface methods */
/**
@@ -852,10 +907,22 @@
*/
public List<InetAddress> getNaturalEndpoints(String key)
{
- return
replicationStrategy_.getNaturalEndpoints(partitioner_.getToken(key));
- }
+ return getNaturalEndpoints(partitioner_.getToken(key));
+ }
/**
+ * This method returns the N endpoints that are responsible for storing the
+ * specified key i.e for replication.
+ *
+ * @param token - token for which we need to find the endpoint return
value -
+ * the endpoint responsible for this token
+ */
+ public List<InetAddress> getNaturalEndpoints(Token token)
+ {
+ return replicationStrategy_.getNaturalEndpoints(token);
+ }
+
+ /**
* This method attempts to return N endpoints that are responsible for
storing the
* specified key i.e for replication.
*
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Sat Dec 5 18:49:12 2009
@@ -115,6 +115,15 @@
public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException;
/**
+ * Triggers proactive repair for given column families, or all
columnfamilies for the given table
+ * if none are explicitly listed.
+ * @param tableName
+ * @param columnFamilies
+ * @throws IOException
+ */
+ public void forceTableRepair(String tableName, String... columnFamilies)
throws IOException;
+
+ /**
* transfer this node's data to other machines and remove it from service.
*/
public void decommission() throws InterruptedException;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=887576&r1=887575&r2=887576&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Sat Dec 5 18:49:12 2009
@@ -186,6 +186,11 @@
ssProxy.forceTableFlush(tableName, columnFamilies);
}
+ public void forceTableRepair(String tableName, String... columnFamilies)
throws IOException
+ {
+ ssProxy.forceTableRepair(tableName, columnFamilies);
+ }
+
/**
* Write a textual representation of the Cassandra ring.
*
@@ -493,7 +498,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats,
snapshot [name], clearsnapshot, " +
- "tpstats, flush, decommission, move, loadbalance,
cancelpending, " +
+ "tpstats, flush, repair, decommission, move, loadbalance,
cancelpending, " +
" getcompactionthreshold, setcompactionthreshold
[minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n",
NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -589,7 +594,7 @@
{
probe.printThreadPoolStats(System.out);
}
- else if (cmdName.equals("flush"))
+ else if (cmdName.equals("flush") || cmdName.equals("repair"))
{
if (probe.getArgs().length < 2)
{
@@ -602,8 +607,11 @@
for (int i = 0; i < columnFamilies.length; i++)
{
columnFamilies[i] = probe.getArgs()[i + 2];
- }
- probe.forceTableFlush(probe.getArgs()[1], columnFamilies);
+ }
+ if (cmdName.equals("flush"))
+ probe.forceTableFlush(probe.getArgs()[1], columnFamilies);
+ else // cmdName.equals("repair")
+ probe.forceTableRepair(probe.getArgs()[1], columnFamilies);
}
else if (cmdName.equals("getcompactionthreshold"))
{