Author: jbellis
Date: Wed Dec 29 18:26:43 2010
New Revision: 1053696
URL: http://svn.apache.org/viewvc?rev=1053696&view=rev
Log:
merge r1053245, r1053247, r1053409 from 0.7
Added:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IMessageCallback.java
- copied unchanged from r1053245,
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IMessageCallback.java
Modified:
cassandra/branches/cassandra-0.7.0/ (props changed)
cassandra/branches/cassandra-0.7.0/CHANGES.txt
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
Propchange: cassandra/branches/cassandra-0.7.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1053218
-/cassandra/branches/cassandra-0.7:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6:922689-1053244
+/cassandra/branches/cassandra-0.7:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Modified: cassandra/branches/cassandra-0.7.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/CHANGES.txt?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7.0/CHANGES.txt Wed Dec 29 18:26:43 2010
@@ -4,6 +4,11 @@ dev
histograms in StorageProxyMBean (CASSANDRA-1893)
* fix CLI get recognition of supercolumns (CASSANDRA-1899)
* enable keepalive on intra-cluster sockets (CASSANDRA-1766)
+ * count timeouts towards dynamicsnitch latencies (CASSANDRA-1905)
+ * Expose index-building status in JMX + cli schema description
+ (CASSANDRA-1871)
+ * allow [LOCAL|EACH]_QUORUM to be used with non-NetworkTopology
+ replication Strategies
0.7.0-rc3
Propchange:
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange:
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange:
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange:
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange:
cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 29 18:26:43 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053218
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053244
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1052002,1052021,1052027,1052355,1052358,1052545,1053219-1053220,1053245,1053247,1053409
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliClient.java
Wed Dec 29 18:26:43 2010
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cli;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -25,8 +26,11 @@ import com.google.common.base.Charsets;
import org.antlr.runtime.tree.Tree;
import org.apache.cassandra.auth.SimpleAuthenticator;
import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -1206,6 +1210,11 @@ public class CliClient extends CliUserHe
private void describeKeySpace(String keySpaceName, KsDef metadata) throws
TException
{
+ NodeProbe probe = sessionState.getNodeProbe();
+
+ // getting compaction manager MBean to displaying index building
information
+ CompactionManagerMBean compactionManagerMBean = (probe == null) ? null
: probe.getCompactionManagerProxy();
+
// Describe and display
sessionState.out.println("Keyspace: " + keySpaceName + ":");
try
@@ -1213,10 +1222,12 @@ public class CliClient extends CliUserHe
KsDef ks_def;
ks_def = metadata == null ?
thriftClient.describe_keyspace(keySpaceName) : metadata;
sessionState.out.println(" Replication Strategy: " +
ks_def.strategy_class);
+
if (ks_def.strategy_class.endsWith(".NetworkTopologyStrategy"))
sessionState.out.println(" Options: " +
FBUtilities.toString(ks_def.strategy_options));
else
sessionState.out.println(" Replication Factor: " +
ks_def.replication_factor);
+
sessionState.out.println(" Column Families:");
boolean isSuper;
@@ -1224,6 +1235,9 @@ public class CliClient extends CliUserHe
Collections.sort(ks_def.cf_defs, new CfDefNamesComparator());
for (CfDef cf_def : ks_def.cf_defs)
{
+ // fetching bean for current column family store
+ ColumnFamilyStoreMBean cfMBean = (probe == null) ? null :
probe.getCfsProxy(ks_def.getName(), cf_def.getName());
+
isSuper = cf_def.column_type.equals("Super");
sessionState.out.printf(" ColumnFamily: %s%s%n",
cf_def.name, isSuper ? " (Super)" : "");
@@ -1241,6 +1255,12 @@ public class CliClient extends CliUserHe
sessionState.out.printf(" Compaction min/max thresholds:
%s/%s%n", cf_def.min_compaction_threshold, cf_def.max_compaction_threshold);
sessionState.out.printf(" Read repair chance: %s%n",
cf_def.read_repair_chance);
+ // if we have connection to the cfMBean established
+ if (cfMBean != null)
+ {
+ sessionState.out.printf(" Built indexes: %s%n",
cfMBean.getBuiltIndexes());
+ }
+
if (cf_def.getColumn_metadataSize() != 0)
{
String leftSpace = " ";
@@ -1281,6 +1301,26 @@ public class CliClient extends CliUserHe
}
}
}
+
+ // compaction manager information
+ if (compactionManagerMBean != null)
+ {
+ String compactionType =
compactionManagerMBean.getCompactionType();
+
+ // if ongoing compaction type is index build
+ if (compactionType != null && compactionType.contains("index
build"))
+ {
+ String indexName =
compactionManagerMBean.getColumnFamilyInProgress();
+ long bytesCompacted =
compactionManagerMBean.getBytesCompacted();
+ long totalBytesToProcess =
compactionManagerMBean.getBytesTotalInProgress();
+
+ sessionState.out.printf("%nCurrently building index %s,
completed %d of %d bytes.%n", indexName, bytesCompacted, totalBytesToProcess);
+ }
+ }
+
+ // closing JMX connection
+ if (probe != null)
+ probe.close();
}
catch (InvalidRequestException e)
{
@@ -1290,7 +1330,12 @@ public class CliClient extends CliUserHe
{
sessionState.out.println("Keyspace " + keySpaceName + " could not
be found.");
}
+ catch (IOException e)
+ {
+ sessionState.out.println("Error while closing JMX connection: " +
e.getMessage());
+ }
}
+
// DESCRIBE KEYSPACE <keyspace_name>
private void executeDescribeKeySpace(Tree statement) throws TException,
InvalidRequestException
{
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliOptions.java
Wed Dec 29 18:26:43 2010
@@ -39,7 +39,8 @@ public class CliOptions {
private static final String BATCH_OPTION = "batch";
private static final String HELP_OPTION = "help";
private static final String FILE_OPTION = "file";
-
+ private static final String JMX_PORT_OPTION = "jmxport";
+
// Default values for optional command line arguments
private static final int DEFAULT_THRIFT_PORT = 9160;
@@ -57,12 +58,13 @@ public class CliOptions {
options.addOption(KEYSPACE_OPTION, true, "cassandra keyspace user is
authenticated against");
options.addOption(BATCH_OPTION, false, "enabled batch mode (supress
output; errors are fatal)");
options.addOption(FILE_OPTION, true, "load statements from the
specific file.");
+ options.addOption(JMX_PORT_OPTION, true, "JMX service port.");
options.addOption(HELP_OPTION, false, "usage help.");
}
private static void printUsage()
{
- System.err.println("Usage: cassandra-cli --host hostname [--port
<portname>] [--file <filename>] [--unframed] [--debug]");
+ System.err.println("Usage: cassandra-cli --host hostname [--port
<port>] [--jmxport <port>] [--file <filename>] [--unframed] [--debug]");
System.err.println("\t[--username username] [--password password]
[--keyspace keyspace] [--batch] [--help]");
}
@@ -138,6 +140,11 @@ public class CliOptions {
css.filename = cmd.getOptionValue(FILE_OPTION);
}
+ if (cmd.hasOption(JMX_PORT_OPTION))
+ {
+ css.jmxPort =
Integer.parseInt(cmd.getOptionValue(JMX_PORT_OPTION));
+ }
+
if (cmd.hasOption(HELP_OPTION))
{
printUsage();
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/cli/CliSessionState.java
Wed Dec 29 18:26:43 2010
@@ -18,6 +18,8 @@
package org.apache.cassandra.cli;
+import org.apache.cassandra.tools.NodeProbe;
+
import java.io.InputStream;
import java.io.PrintStream;
@@ -36,6 +38,7 @@ public class CliSessionState
public String keyspace; // cassandra keyspace user is authenticating
public boolean batch = false; // enable/disable batch processing mode
public String filename = ""; // file to read commands from
+ public int jmxPort = 8080;// JMX service port
/*
* Streams to read/write from
@@ -65,4 +68,19 @@ public class CliSessionState
{
return !this.filename.isEmpty();
}
+
+ public NodeProbe getNodeProbe()
+ {
+ try
+ {
+ return new NodeProbe(hostName, jmxPort);
+ }
+ catch (Exception e)
+ {
+ err.printf("WARNING: Could not connect to the JMX on %s:%d,
information won't be shown.%n%n", hostName, jmxPort);
+ }
+
+ return null;
+ }
+
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Dec 29 18:26:43 2010
@@ -314,7 +314,7 @@ public class ColumnFamilyStore implement
return;
// if we're just linking in the index to indexedColumns on an
already-built index post-restart, we're done
- if (SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+ if (indexedCfs.isIndexBuilt())
return;
// build it asynchronously; addIndex gets called by CFS open and
schema update, neither of which
@@ -1925,4 +1925,33 @@ public class ColumnFamilyStore implement
return histogram;
}
+
+ /**
+ * Check if index is already built for current store
+ * @return true if built, false otherwise
+ */
+ public boolean isIndexBuilt()
+ {
+ return SystemTable.isIndexBuilt(table.name, columnFamily);
+ }
+
+ /**
+ * Returns a list of the names of the built column indexes for current
store
+ * @return list of the index names
+ */
+ public List<String> getBuiltIndexes()
+ {
+ List<String> indexes = new ArrayList<String>();
+
+ for (ColumnFamilyStore cfs : indexedColumns.values())
+ {
+ if (cfs.isIndexBuilt())
+ {
+ indexes.add(cfs.columnFamily); // store.columnFamily
represents a name of the index
+ }
+ }
+
+ return indexes;
+ }
+
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Wed Dec 29 18:26:43 2010
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.ExecutionException;
/**
@@ -213,4 +214,10 @@ public interface ColumnFamilyStoreMBean
public long[] getEstimatedRowSizeHistogram();
public long[] getEstimatedColumnCountHistogram();
+
+ /**
+ * Returns a list of the names of the built column indexes for current
store
+ * @return list of the index names
+ */
+ public List<String> getBuiltIndexes();
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/Table.java
Wed Dec 29 18:26:43 2010
@@ -643,7 +643,7 @@ public class Table
public String getTaskType()
{
- return "Secondary index build";
+ return String.format("Secondary index build %s", cfs.columnFamily);
}
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Dec 29 18:26:43 2010
@@ -224,6 +224,10 @@ public abstract class AbstractReplicatio
public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel)
{
+ if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+ {
+ return new DatacenterQuorumResponseHandler(responseResolver,
consistencyLevel, table);
+ }
return new QuorumResponseHandler(responseResolver, consistencyLevel,
table);
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
Wed Dec 29 18:26:43 2010
@@ -164,18 +164,4 @@ public class NetworkTopologyStrategy ext
}
return super.getWriteResponseHandler(writeEndpoints, hintedEndpoints,
consistency_level);
}
-
- /**
- * This method will generate the WRH object and returns. If the Consistency
- * level is LOCAL_QUORUM/EACH_QUORUM then it will return a DCQRH.
- */
- @Override
- public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel)
- {
- if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
- {
- return new DatacenterQuorumResponseHandler(responseResolver,
consistencyLevel, table);
- }
- return super.getQuorumResponseHandler(responseResolver,
consistencyLevel);
- }
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/AsyncResult.java
Wed Dec 29 18:26:43 2010
@@ -94,6 +94,8 @@ class AsyncResult implements IAsyncResul
{
lock.unlock();
}
+
+ MessagingService.removeRegisteredCallback(response.getMessageId());
}
public InetAddress getFrom()
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncCallback.java
Wed Dec 29 18:26:43 2010
@@ -24,7 +24,7 @@ package org.apache.cassandra.net;
* service. In particular, if any shared state is referenced, making
* response alone synchronized will not suffice.
*/
-public interface IAsyncCallback
+public interface IAsyncCallback extends IMessageCallback
{
/**
* @param msg response received.
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/IAsyncResult.java
Wed Dec 29 18:26:43 2010
@@ -22,7 +22,7 @@ import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public interface IAsyncResult
+public interface IAsyncResult extends IMessageCallback
{
/**
* Same operation as the above get() but allows the calling
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/MessagingService.java
Wed Dec 29 18:26:43 2010
@@ -36,6 +36,9 @@ import java.util.concurrent.atomic.Atomi
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,8 @@ import org.apache.cassandra.concurrent.D
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
@@ -54,7 +59,7 @@ import org.apache.cassandra.utils.GuidGe
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-public class MessagingService implements MessagingServiceMBean
+public class MessagingService implements MessagingServiceMBean,
ILatencyPublisher
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
@@ -64,9 +69,9 @@ public class MessagingService implements
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* This records all the results mapped by message Id */
- private static ExpiringMap<String, IAsyncCallback> callbackMap_;
- private static ExpiringMap<String, IAsyncResult> taskCompletionMap_;
-
+ private static ExpiringMap<String, IMessageCallback> callbacks;
+ private static Multimap<String, InetAddress> targets;
+
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -83,6 +88,8 @@ public class MessagingService implements
private SocketThread socketThread;
private SimpleCondition listenGate;
private static final Map<StorageService.Verb, AtomicInteger>
droppedMessages = new EnumMap<StorageService.Verb,
AtomicInteger>(StorageService.Verb.class);
+ private final List<ILatencySubscriber> subscribers = new
ArrayList<ILatencySubscriber>();
+
static
{
for (StorageService.Verb verb : StorageService.Verb.values())
@@ -99,15 +106,6 @@ public class MessagingService implements
{
listenGate = new SimpleCondition();
verbHandlers_ = new EnumMap<StorageService.Verb,
IVerbHandler>(StorageService.Verb.class);
- /*
- * Leave callbacks in the cachetable long enough that any related
messages will arrive
- * before the callback is evicted from the table. The concurrency
level is set at 128
- * which is the sum of the threads in the pool that adds shit into the
table and the
- * pool that retrives the callback from here.
- */
- callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()));
- taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1
* DatabaseDescriptor.getRpcTimeout()));
-
streamExecutor_ = new DebuggableThreadPoolExecutor("Streaming",
DatabaseDescriptor.getCompactionThreadPriority());
Runnable logDropped = new Runnable()
{
@@ -118,6 +116,26 @@ public class MessagingService implements
};
StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped,
LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ Function<String, ?> timeoutReporter = new Function<String, Object>()
+ {
+ public Object apply(String messageId)
+ {
+ Collection<InetAddress> addresses =
targets.removeAll(messageId);
+ if (addresses == null)
+ return null;
+
+ for (InetAddress address : addresses)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(address, (double)
DatabaseDescriptor.getRpcTimeout());
+ }
+
+ return null;
+ }
+ };
+ targets = ArrayListMultimap.create();
+ callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
@@ -230,6 +248,7 @@ public class MessagingService implements
addCallback(cb, messageId);
for (InetAddress endpoint : to)
{
+ targets.put(messageId, endpoint);
sendOneWay(message, endpoint);
}
return messageId;
@@ -237,7 +256,7 @@ public class MessagingService implements
public void addCallback(IAsyncCallback cb, String messageId)
{
- callbackMap_.put(messageId, cb);
+ callbacks.put(messageId, cb);
}
/**
@@ -254,6 +273,7 @@ public class MessagingService implements
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
+ targets.put(messageId, to);
sendOneWay(message, to);
return messageId;
}
@@ -280,6 +300,7 @@ public class MessagingService implements
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
+ targets.put(groupId, to.get(i));
sendOneWay(messages[i], to.get(i));
}
return groupId;
@@ -332,7 +353,8 @@ public class MessagingService implements
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- taskCompletionMap_.put(message.getMessageId(), iar);
+ callbacks.put(message.getMessageId(), iar);
+ targets.put(message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
@@ -350,6 +372,11 @@ public class MessagingService implements
streamExecutor_.execute(new FileStreamTask(header, to));
}
+ public void register(ILatencySubscriber subcriber)
+ {
+ subscribers.add(subcriber);
+ }
+
/** blocks until the processing pools are empty and done. */
public static void waitFor() throws InterruptedException
{
@@ -371,10 +398,7 @@ public class MessagingService implements
}
streamExecutor_.shutdownNow();
-
- /* shut down the cachetables */
- taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
+ callbacks.shutdown();
logger_.info("Shutdown complete (no further commands will be
processed)");
}
@@ -391,29 +415,25 @@ public class MessagingService implements
stage.execute(runnable);
}
- public static IAsyncCallback getRegisteredCallback(String messageId)
- {
- return callbackMap_.get(messageId);
- }
-
- public static void removeRegisteredCallback(String messageId)
+ public static IMessageCallback getRegisteredCallback(String messageId)
{
- callbackMap_.remove(messageId);
+ return callbacks.get(messageId);
}
- public static IAsyncResult getAsyncResult(String messageId)
+ public static IMessageCallback removeRegisteredCallback(String messageId)
{
- return taskCompletionMap_.remove(messageId);
+ targets.removeAll(messageId); // TODO fix this when we clean up quorum
reads to do proper RR
+ return callbacks.remove(messageId);
}
public static long getRegisteredCallbackAge(String messageId)
{
- return callbackMap_.getAge(messageId);
+ return callbacks.getAge(messageId);
}
- public static long getAsyncResultAge(String messageId)
+ public static void responseReceivedFrom(String messageId, InetAddress from)
{
- return taskCompletionMap_.getAge(messageId);
+ targets.remove(messageId, from);
}
public static void validateMagic(int magic) throws IOException
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Wed Dec 29 18:26:43 2010
@@ -18,7 +18,6 @@
package org.apache.cassandra.net;
-
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@@ -37,35 +36,28 @@ public class ResponseVerbHandler impleme
public void doVerb(Message message)
{
- String messageId = message.getMessageId();
- IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
- double age = 0;
- if (cb != null)
+ String messageId = message.getMessageId();
+ MessagingService.responseReceivedFrom(messageId, message.getFrom());
+ double age = System.currentTimeMillis() -
MessagingService.getRegisteredCallbackAge(messageId);
+ IMessageCallback cb =
MessagingService.getRegisteredCallback(messageId);
+ if (cb == null)
+ return;
+
+ // if cb is not null, then age will be valid
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(message.getFrom(), age);
+
+ if (cb instanceof IAsyncCallback)
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on a callback from " +
message.getMessageId() + "@" + message.getFrom());
- age = System.currentTimeMillis() -
MessagingService.getRegisteredCallbackAge(messageId);
- cb.response(message);
+ ((IAsyncCallback) cb).response(message);
}
else
{
- IAsyncResult ar = MessagingService.getAsyncResult(messageId);
- if (ar != null)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from
" + message.getMessageId() + "@" + message.getFrom());
- age = System.currentTimeMillis() -
MessagingService.getAsyncResultAge(messageId);
- ar.result(message);
- }
- }
- notifySubscribers(message.getFrom(), age);
- }
-
- private void notifySubscribers(InetAddress host, double latency)
- {
- for (ILatencySubscriber subscriber : subscribers)
- {
- subscriber.receiveTiming(host, latency);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Processing response on an async result from " +
message.getMessageId() + "@" + message.getFrom());
+ ((IAsyncResult) cb).result(message);
}
}
Modified:
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053696&r1=1053695&r2=1053696&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++
cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
Wed Dec 29 18:26:43 2010
@@ -22,7 +22,9 @@ import java.util.Enumeration;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import com.google.common.base.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import org.cliffc.high_scale_lib.NonBloc
public class ExpiringMap<K, V>
{
private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
+ private final Function<K, ?> postExpireHook;
private static class CacheableObject<T>
{
@@ -76,6 +79,7 @@ public class ExpiringMap<K, V>
if (co != null && co.isReadyToDie(expiration))
{
cache.remove(key);
+ postExpireHook.apply(key);
}
}
}
@@ -86,12 +90,18 @@ public class ExpiringMap<K, V>
private final Timer timer;
private static int counter = 0;
- /*
- * Specify the TTL for objects in the cache
- * in milliseconds.
- */
public ExpiringMap(long expiration)
{
+ this(expiration, null);
+ }
+
+ /**
+ *
+ * @param expiration the TTL for objects in the cache in milliseconds
+ */
+ public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+ {
+ this.postExpireHook = postExpireHook;
if (expiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a
positive number");