Author: jbellis
Date: Thu Dec 9 20:19:40 2010
New Revision: 1044116
URL: http://svn.apache.org/viewvc?rev=1044116&view=rev
Log:
merge from 0.6
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
Modified:
cassandra/branches/cassandra-0.7/ (props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1041242
+/cassandra/branches/cassandra-0.6:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7:1035666
/cassandra/trunk:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1035666
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1035666
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1035666
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1035666
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1041242,1042824,1043070,1043268
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1035666
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Dec 9 20:19:40 2010
@@ -895,7 +895,7 @@ public class ColumnFamilyStore implement
return maxFile;
}
- void forceCleanup() throws ExecutionException, InterruptedException
+ public void forceCleanup() throws ExecutionException, InterruptedException
{
CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Thu Dec 9 20:19:40 2010
@@ -60,8 +60,7 @@ public class IncomingTcpConnection exten
{
try
{
- MessagingService.validateMagic(input.readInt());
- int header = input.readInt();
+ int header = readHeader(input);
int type = MessagingService.getBits(header, 1, 2);
boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
int version = MessagingService.getBits(header, 15, 8);
@@ -76,11 +75,7 @@ public class IncomingTcpConnection exten
}
else
{
- int size = input.readInt();
- byte[] contentBytes = new byte[size];
- input.readFully(contentBytes);
-
- Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
+ Message message = readMessage(input);
MessagingService.receive(message);
}
}
@@ -101,6 +96,21 @@ public class IncomingTcpConnection exten
close();
}
+ static int readHeader(DataInputStream in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ return in.readInt();
+ }
+
+ static Message readMessage(DataInputStream in) throws IOException
+ {
+ int size = in.readInt();
+ byte[] contentBytes = new byte[size];
+ in.readFully(contentBytes);
+
+ return Message.serializer().deserialize(new DataInputStream(new
ByteArrayInputStream(contentBytes)));
+ }
+
private void close()
{
try
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Thu Dec 9 20:19:40 2010
@@ -310,6 +310,13 @@ public class MessagingService implements
// get pooled connection (really, connection queue)
OutboundTcpConnection connection = getConnection(to, message);
+ // write it
+ ByteBuffer buffer = serialize(message);
+ connection.write(buffer);
+ }
+
+ static ByteBuffer serialize(Message message)
+ {
// pack message with header in a bytebuffer
byte[] data;
try
@@ -323,12 +330,9 @@ public class MessagingService implements
throw new RuntimeException(e);
}
assert data.length > 0;
- ByteBuffer buffer = packIt(data , false);
-
- // write it
- connection.write(buffer);
+ return packIt(data , false);
}
-
+
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Thu Dec 9 20:19:40 2010
@@ -1175,32 +1175,23 @@ public class StorageService implements I
return
Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
}
- public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException
+ public void forceTableCleanup(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
{
- List<String> tables = DatabaseDescriptor.getNonSystemTables();
- for (String tName : tables)
+ if (tableName.equals("system"))
+ throw new RuntimeException("Cleanup of the system table is neither
necessary nor wise");
+
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
{
- Table table = Table.open(tName);
- table.forceCleanup();
+ cfStore.forceCleanup();
}
}
- public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException
- {
- Table table = getValidTable(tableName);
- table.forceCleanup();
- }
-
- public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException
- {
- for (Table table : Table.all())
- table.forceCompaction();
- }
-
- public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException
+ public void forceTableCompaction(String tableName, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- Table table = getValidTable(tableName);
- table.forceCompaction();
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
+ {
+ cfStore.forceMajorCompaction();
+ }
}
/**
@@ -2074,4 +2065,10 @@ public class StorageService implements I
return partitioner_.describeOwnership(sortedTokens);
}
+ public List<String> getKeyspaces()
+ {
+ List<String> tableslist = new
ArrayList<String>(DatabaseDescriptor.getTables());
+ return Collections.unmodifiableList(tableslist);
+ }
+
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Dec 9 20:19:40 2010
@@ -131,26 +131,6 @@ public interface StorageServiceMBean
public List<InetAddress> getNaturalEndpoints(String table, byte[] key);
/**
- * Forces major compaction (all sstable files compacted)
- */
- public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException;
-
- /**
- * Forces major compaction of a single keyspace
- */
- public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException;
-
- /**
- * Trigger a cleanup of keys on all tables.
- */
- public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException;
-
- /**
- * Trigger a cleanup of keys on a single keyspace
- */
- public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException;
-
- /**
* Takes the snapshot for a given table.
*
* @param tableName the name of the table.
@@ -171,6 +151,16 @@ public interface StorageServiceMBean
public void clearSnapshot() throws IOException;
/**
+ * Forces major compaction of a single keyspace
+ */
+ public void forceTableCompaction(String tableName, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException;
+
+ /**
+ * Trigger a cleanup of keys on a single keyspace
+ */
+ public void forceTableCleanup(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException;
+
+ /**
* Flush all memtables for the given column families, or all
columnfamilies for the given table
* if none are explicitly listed.
* @param tableName
@@ -270,4 +260,6 @@ public interface StorageServiceMBean
* a mapping from "token -> %age of cluster owned by that token"
*/
public Map<Token, Float> getOwnership();
+
+ public List<String> getKeyspaces();
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
Thu Dec 9 20:19:40 2010
@@ -30,6 +30,8 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.commons.cli.*;
@@ -64,6 +66,14 @@ public class NodeCmd {
{
this.probe = probe;
}
+
+ public enum NodeCommand {
+ RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH,
DRAIN,
+ DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT,
+ SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD,
NETSTATS, CFHISTOGRAMS,
+ COMPACTIONSTATS
+ }
+
/**
* Prints usage information to stdout.
@@ -71,12 +81,37 @@ public class NodeCmd {
private static void printUsage()
{
HelpFormatter hf = new HelpFormatter();
- String header = String.format(
- "%nAvailable commands: ring, info, version, cleanup, compact
[keyspacename], cfstats, snapshot [snapshotname], " +
- "clearsnapshot, tpstats, flush, drain, repair, decommission,
move, loadbalance, removetoken [status|force]|[token], " +
- "setcachecapacity [keyspace] [cfname] [keycachecapacity]
[rowcachecapacity], " +
- "getcompactionthreshold [keyspace] [cfname],
setcompactionthreshold [cfname] [minthreshold] [maxthreshold], " +
- "netstats [host], cfhistograms <keyspace> <column_family>,
compactionstats");
+ String header = "\nAvailable commands:\n"
+ // No args
+ + "ring\n"
+ + "info\n"
+ + "cfstats\n"
+ + "clearsnapshot\n"
+ + "version\n"
+ + "tpstats\n"
+ + "drain\n"
+ + "decommission\n"
+ + "loadbalance\n"
+ + "compactionstats\n"
+
+ // One arg
+ + "snapshot [snapshotname]\n"
+ + "netstats [host]\n"
+ + "move <new token>\n"
+ + "removetoken status|force|<token>\n"
+
+ // Two args
+ + "flush [keyspace] [cfnames]\n"
+ + "repair [keyspace] [cfnames]\n"
+ + "cleanup [keyspace] [cfnames]\n"
+ + "compact [keyspace] [cfnames]\n"
+ + "getcompactionthreshold <keyspace> <cfname>\n"
+ + "cfhistograms <keyspace> <cfname>\n"
+
+ // Four args
+ + "setcachecapacity <keyspace> <cfname>
<keycachecapacity> <rowcachecapacity>\n"
+ + "setcompactionthreshold <keyspace> <cfname>
<minthreshold> <maxthreshold>\n";
+
String usage = String.format("java %s --host <arg> <command>%n",
NodeCmd.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -419,9 +454,7 @@ public class NodeCmd {
}
catch (ParseException parseExcep)
{
- System.err.println(parseExcep);
- printUsage();
- System.exit(1);
+ badUse(parseExcep.toString());
}
String host = cmd.getOptionValue(HOST_OPT_LONG);
@@ -447,248 +480,177 @@ public class NodeCmd {
}
catch (IOException ioe)
{
- System.err.println("Error connecting to remote JMX agent!");
- ioe.printStackTrace();
- System.exit(3);
+ err(ioe, "Error connection to remote JMX agent!");
}
if (cmd.getArgs().length < 1)
- {
- System.err.println("Missing argument for command.");
- printUsage();
- System.exit(1);
- }
-
+ badUse("Missing argument for command.");
+
NodeCmd nodeCmd = new NodeCmd(probe);
// Execute the requested command.
String[] arguments = cmd.getArgs();
String cmdName = arguments[0];
- if (cmdName.equals("ring"))
- {
- nodeCmd.printRing(System.out);
- }
- else if (cmdName.equals("info"))
- {
- nodeCmd.printInfo(System.out);
- }
- else if (cmdName.equals("cleanup"))
- {
- try
- {
- if (arguments.length > 1)
- probe.forceTableCleanup(arguments[1]);
- else
- probe.forceTableCleanup();
- }
- catch (ExecutionException ee)
- {
- System.err.println("Error occured during Keyspace cleanup");
- ee.printStackTrace();
- System.exit(3);
- }
- }
- else if (cmdName.equals("compact"))
- {
- try
- {
- if (arguments.length > 1)
- probe.forceTableCompaction(arguments[1]);
- else
- probe.forceTableCompaction();
- }
- catch (ExecutionException ee)
- {
- System.err.println("Error occured during Keyspace compaction");
- ee.printStackTrace();
- System.exit(3);
- }
- }
- else if (cmdName.equals("compactionstats"))
- {
- nodeCmd.printCompactionStats(System.out);
- }
- else if (cmdName.equals("cfstats"))
- {
- nodeCmd.printColumnFamilyStats(System.out);
- }
- else if (cmdName.equals("decommission"))
- {
- probe.decommission();
- }
- else if (cmdName.equals("loadbalance"))
- {
- probe.loadBalance();
- }
- else if (cmdName.equals("move"))
- {
- if (arguments.length <= 1)
- {
- System.err.println("missing token argument");
- }
- probe.move(arguments[1]);
- }
- else if (cmdName.equals("removetoken"))
- {
- if (arguments.length <= 1)
- {
- System.err.println("Missing an argument.");
- printUsage();
- }
- else if (arguments[1].equals("status"))
- {
- nodeCmd.printRemovalStatus(System.out);
- }
- else if (arguments[1].equals("force"))
- {
- nodeCmd.printRemovalStatus(System.out);
- probe.forceRemoveCompletion();
- }
- else
- probe.removeToken(arguments[1]);
- }
- else if (cmdName.equals("snapshot"))
- {
- String snapshotName = "";
- if (arguments.length > 1)
- {
- snapshotName = arguments[1];
- }
- probe.takeSnapshot(snapshotName);
- }
- else if (cmdName.equals("clearsnapshot"))
+ boolean validCommand = false;
+ for (NodeCommand n : NodeCommand.values())
{
- probe.clearSnapshot();
+ if (cmdName.toUpperCase().equals(n.name()))
+ validCommand = true;
}
- else if (cmdName.equals("tpstats"))
- {
- nodeCmd.printThreadPoolStats(System.out);
+
+ if (!validCommand)
+ badUse("Unrecognized command: " + cmdName);
+
+ NodeCommand nc = NodeCommand.valueOf(cmdName.toUpperCase());
+ switch (nc)
+ {
+ case RING : nodeCmd.printRing(System.out); break;
+ case INFO : nodeCmd.printInfo(System.out); break;
+ case CFSTATS : nodeCmd.printColumnFamilyStats(System.out);
break;
+ case DECOMMISSION : probe.decommission(); break;
+ case LOADBALANCE : probe.loadBalance(); break;
+ case CLEARSNAPSHOT : probe.clearSnapshot(); break;
+ case TPSTATS : nodeCmd.printThreadPoolStats(System.out);
break;
+ case VERSION : nodeCmd.printReleaseVersion(System.out);
break;
+ case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out);
break;
+
+ case DRAIN :
+ try { probe.drain(); }
+ catch (ExecutionException ee) { err(ee, "Error occured during
flushing"); }
+ break;
+
+ case NETSTATS :
+ if (arguments.length > 1) {
nodeCmd.printNetworkStats(InetAddress.getByName(arguments[1]), System.out); }
+ else { nodeCmd.printNetworkStats(null,
System.out); }
+ break;
+
+ case SNAPSHOT :
+ if (arguments.length > 1) { probe.takeSnapshot(arguments[1]); }
+ else { probe.takeSnapshot(""); }
+ break;
+
+ case MOVE :
+ if (arguments.length != 2) { badUse("Missing token argument
for move."); }
+ probe.move(arguments[1]);
+ break;
+
+ case REMOVETOKEN :
+ if (arguments.length != 2) { badUse("Missing an argument for
removetoken (either status, force, or a token)"); }
+ else if (arguments[1].equals("status")) {
nodeCmd.printRemovalStatus(System.out); }
+ else if (arguments[1].equals("force")) {
nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); }
+ else {
probe.removeToken(arguments[1]); }
+ break;
+
+ case CLEANUP :
+ case COMPACT :
+ case REPAIR :
+ case FLUSH :
+ optionalKSandCFs(nc, arguments, probe);
+ break;
+
+ case GETCOMPACTIONTHRESHOLD :
+ if (arguments.length != 3) { badUse("getcompactionthreshold
requires ks and cf args."); }
+ probe.getCompactionThreshold(System.out, arguments[1],
arguments[2]);
+ break;
+
+ case CFHISTOGRAMS :
+ if (arguments.length != 3) { badUse("cfhistograms requires ks
and cf args"); }
+ nodeCmd.printCfHistograms(arguments[1], arguments[2],
System.out);
+ break;
+
+ case SETCACHECAPACITY :
+ if (arguments.length != 5) { badUse("setcachecapacity requires
ks, cf, keycachecap, and rowcachecap args."); }
+ probe.setCacheCapacities(arguments[1], arguments[2],
Integer.valueOf(arguments[3]), Integer.valueOf(arguments[4]));
+ break;
+
+ case SETCOMPACTIONTHRESHOLD :
+ if (arguments.length != 5) { badUse("setcompactionthreshold
requires ks, cf, min, and max threshold args."); }
+ int minthreshold = Integer.parseInt(arguments[3]);
+ int maxthreshold = Integer.parseInt(arguments[4]);
+ if ((minthreshold < 0) || (maxthreshold < 0)) {
badUse("Thresholds must be positive integers"); }
+ if (minthreshold > maxthreshold) { badUse("Min
threshold cannot be greater than max."); }
+ if (minthreshold < 2 && maxthreshold != 0) { badUse("Min
threshold must be at least 2"); }
+ probe.setCompactionThreshold(arguments[1], arguments[2],
minthreshold, maxthreshold);
+ break;
+
+ default :
+ throw new RuntimeException("Unreachable code.");
+
}
- else if (cmdName.equals("flush") || cmdName.equals("repair"))
- {
- if (cmd.getArgs().length < 2)
- {
- System.err.println("Missing keyspace argument.");
- printUsage();
- System.exit(1);
- }
- String[] columnFamilies = new String[cmd.getArgs().length - 2];
- for (int i = 0; i < columnFamilies.length; i++)
+ System.exit(0);
+ }
+
+ private static void badUse(String useStr)
+ {
+ System.err.println(useStr);
+ printUsage();
+ System.exit(1);
+ }
+
+ private static void err(Exception e, String errStr)
+ {
+ System.err.println(errStr);
+ e.printStackTrace();
+ System.exit(3);
+ }
+
+ private static void optionalKSandCFs(NodeCommand nc, String[] cmdArgs,
NodeProbe probe) throws InterruptedException, IOException
+ {
+ // Per-keyspace
+ if (cmdArgs.length == 1)
+ {
+ for (String keyspace : probe.getKeyspaces())
{
- columnFamilies[i] = cmd.getArgs()[i + 2];
- }
- if (cmdName.equals("flush"))
- try
- {
- probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
- } catch (ExecutionException ee)
+ switch (nc)
{
- System.err.println("Error occured during flushing");
- ee.printStackTrace();
- System.exit(3);
+ case REPAIR : probe.forceTableRepair(keyspace); break;
+ case FLUSH :
+ try { probe.forceTableFlush(keyspace); }
+ catch (ExecutionException ee) { err(ee, "Error occured
while flushing keyspace " + keyspace); }
+ break;
+ case COMPACT :
+ try { probe.forceTableCompaction(keyspace); }
+ catch (ExecutionException ee) { err(ee, "Error occured
while compacting keyspace " + keyspace); }
+ break;
+ case CLEANUP :
+ if (keyspace.equals("system")) { break; } // Skip
cleanup on system cfs.
+ try { probe.forceTableCleanup(keyspace); }
+ catch (ExecutionException ee) { err(ee, "Error occured
while cleaning up keyspace " + keyspace); }
+ break;
+ default:
+ throw new RuntimeException("Unreachable code.");
}
- else // cmdName.equals("repair")
- probe.forceTableRepair(cmd.getArgs()[1], columnFamilies);
- }
- else if (cmdName.equals("drain"))
- {
- try
- {
- probe.drain();
- } catch (ExecutionException ee)
- {
- System.err.println("Error occured during flushing");
- ee.printStackTrace();
- System.exit(3);
- }
- }
- else if (cmdName.equals("setcachecapacity"))
- {
- if (cmd.getArgs().length != 5) // ks cf keycachecap rowcachecap
- {
- System.err.println("cacheinfo requires: Keyspace name,
ColumnFamily name, key cache capacity (in keys), and row cache capacity (in
rows)");
}
- String tableName = cmd.getArgs()[1];
- String cfName = cmd.getArgs()[2];
- int keyCacheCapacity = Integer.valueOf(cmd.getArgs()[3]);
- int rowCacheCapacity = Integer.valueOf(cmd.getArgs()[4]);
- probe.setCacheCapacities(tableName, cfName, keyCacheCapacity,
rowCacheCapacity);
}
- else if (cmdName.equals("getcompactionthreshold"))
- {
- if (arguments.length < 3) // ks cf
- {
- System.err.println("Missing keyspace/cfname");
- printUsage();
- System.exit(1);
- }
- probe.getCompactionThreshold(System.out, cmd.getArgs()[1],
cmd.getArgs()[2]);
- }
- else if (cmdName.equals("setcompactionthreshold"))
+ // Per-cf (or listed cfs) in given keyspace
+ else
{
- if (cmd.getArgs().length != 5) // ks cf min max
- {
- System.err.println("setcompactionthreshold requires: Keyspace
name, ColumnFamily name, " +
- "min threshold, and max threshold.");
- printUsage();
- System.exit(1);
- }
- String ks = cmd.getArgs()[1];
- String cf = cmd.getArgs()[2];
- int minthreshold = Integer.parseInt(arguments[3]);
- int maxthreshold = Integer.parseInt(arguments[4]);
-
- if ((minthreshold < 0) || (maxthreshold < 0))
- {
- System.err.println("Thresholds must be positive integers.");
- printUsage();
- System.exit(1);
- }
-
- if (minthreshold > maxthreshold)
- {
- System.err.println("Min threshold can't be greater than Max
threshold");
- printUsage();
- System.exit(1);
- }
-
- if (minthreshold < 2 && maxthreshold != 0)
+ String keyspace = cmdArgs[1];
+ String[] columnFamilies = new String[cmdArgs.length - 2];
+ for (int i = 0; i < columnFamilies.length; i++)
{
- System.err.println("Min threshold must be at least 2");
- printUsage();
- System.exit(1);
+ columnFamilies[i] = cmdArgs[i + 2];
}
- probe.setCompactionThreshold(ks, cf, minthreshold, maxthreshold);
- }
- else if (cmdName.equals("netstats"))
- {
- // optional host
- String otherHost = arguments.length > 1 ? arguments[1] : null;
- nodeCmd.printNetworkStats(otherHost == null ? null :
InetAddress.getByName(otherHost), System.out);
- }
- else if (cmdName.equals("cfhistograms"))
- {
- if (arguments.length < 3)
+ switch (nc)
{
- System.err.println("Usage of cfhistograms: <keyspace>
<column_family>.");
- System.exit(1);
+ case REPAIR : probe.forceTableRepair(keyspace,
columnFamilies); break;
+ case FLUSH :
+ try { probe.forceTableFlush(keyspace, columnFamilies); }
+ catch (ExecutionException ee) { err(ee, "Error occured
during flushing"); }
+ break;
+ case COMPACT :
+ try { probe.forceTableCompaction(keyspace,
columnFamilies); }
+ catch (ExecutionException ee) { err(ee, "Error occured
during compaction"); }
+ break;
+ case CLEANUP :
+ try { probe.forceTableCleanup(keyspace, columnFamilies); }
+ catch (ExecutionException ee) { err(ee, "Error occured
during cleanup"); }
+ break;
+ default:
+ throw new RuntimeException("Unreachable code.");
}
-
- nodeCmd.printCfHistograms(arguments[1], arguments[2], System.out);
}
- else if (cmdName.equals("version"))
- {
- nodeCmd.printReleaseVersion(System.out);
- }
- else
- {
- System.err.println("Unrecognized command: " + cmdName + ".");
- printUsage();
- System.exit(1);
- }
-
- System.exit(0);
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
Thu Dec 9 20:19:40 2010
@@ -143,24 +143,14 @@ public class NodeProbe
jmxc.close();
}
- public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException
+ public void forceTableCleanup(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
{
- ssProxy.forceTableCleanup();
+ ssProxy.forceTableCleanup(tableName, columnFamilies);
}
- public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException
+ public void forceTableCompaction(String tableName, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- ssProxy.forceTableCleanup(tableName);
- }
-
- public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException
- {
- ssProxy.forceTableCompaction();
- }
-
- public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException
- {
- ssProxy.forceTableCompaction(tableName);
+ ssProxy.forceTableCompaction(tableName, columnFamilies);
}
public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
@@ -489,6 +479,11 @@ public class NodeProbe
return cfsProxy;
}
+
+ public List<String> getKeyspaces()
+ {
+ return ssProxy.getKeyspaces();
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String,
ColumnFamilyStoreMBean>>
@@ -553,5 +548,5 @@ class ThreadPoolProxyMBeanIterator imple
public void remove()
{
throw new UnsupportedOperationException();
- }
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
Thu Dec 9 20:19:40 2010
@@ -342,9 +342,9 @@ public class FBUtilities
public static String bytesToHex(ByteBuffer bytes)
{
StringBuilder sb = new StringBuilder();
- for (int i=bytes.position()+bytes.arrayOffset();
i<bytes.limit()+bytes.arrayOffset(); i++)
+ for (int i = bytes.position(); i < bytes.limit(); i++)
{
- int bint = bytes.array()[i] & 0xff;
+ int bint = bytes.get(i) & 0xff;
if (bint <= 0xF)
// toHexString does not 0 pad its results.
sb.append("0");
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java?rev=1044116&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
(added)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
Thu Dec 9 20:19:40 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.net;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class MessageSerializerTest extends SchemaLoader
+{
+ @Test
+ public void testDeserialize() throws IOException
+ {
+ String wire =
"ca552dfa0000010000000080000131047f00000100000000000000000000004800094b657973706163653100046b65793100000001000003e801000003e8800000008000000000000000000000010007436f6c756d6e310000000000000000000000000461736466000000000000000000000000000000000000000000000000000000000000000000000000";
+ byte[] bytes = FBUtilities.hexToBytes(wire);
+ check(new DataInputStream(new ByteArrayInputStream(bytes)));
+ }
+
+ private void check(DataInputStream in) throws IOException
+ {
+ IncomingTcpConnection.readHeader(in);
+ IncomingTcpConnection.readMessage(in);
+ }
+
+ @Test
+ public void testRoundTrip() throws IOException
+ {
+ RowMutation rm;
+ rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+ rm.add(new QueryPath("Standard1", null,
ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
+ Message message = rm.makeRowMutationMessage();
+ ByteBuffer bb = MessagingService.serialize(message);
+ check(new DataInputStream(new ByteArrayInputStream(bb.array(),
bb.position() + bb.arrayOffset(), bb.remaining())));
+ }
+}