Author: jbellis
Date: Wed Dec 22 18:47:09 2010
New Revision: 1052027
URL: http://svn.apache.org/viewvc?rev=1052027&view=rev
Log:
count timeouts in storageproxy latencies, and include latency
histograms in StorageProxyMBean
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1893
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Dec 22 18:47:09 2010
@@ -1,5 +1,7 @@
dev
* fix cli crash after backgrounding (CASSANDRA-1875)
+ * count timeouts in storageproxy latencies, and include latency
+ histograms in StorageProxyMBean (CASSANDRA-1893)
0.7.0-rc3
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Dec 22 18:47:09 2010
@@ -176,7 +176,6 @@ public class StorageProxy implements Sto
{
writeStats.addNano(System.nanoTime() - startTime);
}
-
}
private static void addHintHeader(Message message, InetAddress target)
throws IOException
@@ -217,19 +216,23 @@ public class StorageProxy implements Sto
if (StorageService.instance.isBootstrapMode())
throw new UnavailableException();
long startTime = System.nanoTime();
-
List<Row> rows;
- if (consistency_level == ConsistencyLevel.ONE)
+ try
{
- rows = weakRead(commands);
+ if (consistency_level == ConsistencyLevel.ONE)
+ {
+ rows = weakRead(commands);
+ }
+ else
+ {
+ assert consistency_level.getValue() >=
ConsistencyLevel.QUORUM.getValue();
+ rows = strongRead(commands, consistency_level);
+ }
}
- else
+ finally
{
- assert consistency_level.getValue() >=
ConsistencyLevel.QUORUM.getValue();
- rows = strongRead(commands, consistency_level);
+ readStats.addNano(System.nanoTime() - startTime);
}
-
- readStats.addNano(System.nanoTime() - startTime);
return rows;
}
@@ -415,77 +418,82 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug(command.toString());
long startTime = System.nanoTime();
-
- List<AbstractBounds> ranges = getRestrictedRanges(command.range);
+ List<Row> rows;
// now scan until we have enough results
- List<Row> rows = new ArrayList<Row>(command.max_keys);
- for (AbstractBounds range : ranges)
+ try
{
- List<InetAddress> liveEndpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
-
- if (consistency_level == ConsistencyLevel.ONE &&
liveEndpoints.contains(FBUtilities.getLocalAddress()))
+ rows = new ArrayList<Row>(command.max_keys);
+ List<AbstractBounds> ranges = getRestrictedRanges(command.range);
+ for (AbstractBounds range : ranges)
{
- if (logger.isDebugEnabled())
- logger.debug("local range slice");
- ColumnFamilyStore cfs =
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- try
- {
- rows.addAll(cfs.getRangeSlice(command.super_column,
- range,
- command.max_keys,
-
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e.getCause());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- else
- {
-
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
- RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace,
command.column_family, command.super_column, command.predicate, range,
command.max_keys);
- Message message = c2.getMessage();
-
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
- // TODO bail early if live endpoints can't satisfy requested
consistency level
- for (InetAddress endpoint : liveEndpoints)
+ List<InetAddress> liveEndpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
+
+ if (consistency_level == ConsistencyLevel.ONE &&
liveEndpoints.contains(FBUtilities.getLocalAddress()))
{
- MessagingService.instance.sendRR(message, endpoint,
handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " from " +
message.getMessageId() + "@" + endpoint);
+ logger.debug("local range slice");
+ ColumnFamilyStore cfs =
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+ try
+ {
+ rows.addAll(cfs.getRangeSlice(command.super_column,
+ range,
+ command.max_keys,
+
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
- // TODO read repair on remaining replicas?
-
- // if we're done, great, otherwise, move to the next range
- try
+ else
{
- if (logger.isDebugEnabled())
+
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
+ RangeSliceCommand c2 = new
RangeSliceCommand(command.keyspace, command.column_family,
command.super_column, command.predicate, range, command.max_keys);
+ Message message = c2.getMessage();
+
+ // collect replies and resolve according to consistency
level
+ RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
+ AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
+ QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ // TODO bail early if live endpoints can't satisfy
requested consistency level
+ for (InetAddress endpoint : liveEndpoints)
{
- for (Row row : handler.get())
+ MessagingService.instance.sendRR(message, endpoint,
handler);
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + c2 + " from " +
message.getMessageId() + "@" + endpoint);
+ }
+ // TODO read repair on remaining replicas?
+
+ // if we're done, great, otherwise, move to the next range
+ try
+ {
+ if (logger.isDebugEnabled())
{
- logger.debug("range slices read " + row.key);
+ for (Row row : handler.get())
+ {
+ logger.debug("range slices read " + row.key);
+ }
}
+ rows.addAll(handler.get());
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range
slices yet
}
- rows.addAll(handler.get());
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices
yet
}
+
+ if (rows.size() >= command.max_keys)
+ break;
}
-
- if (rows.size() >= command.max_keys)
- break;
}
-
- rangeStats.addNano(System.nanoTime() - startTime);
+ finally
+ {
+ rangeStats.addNano(System.nanoTime() - startTime);
+ }
return rows.size() > command.max_keys ? rows.subList(0,
command.max_keys) : rows;
}
@@ -620,6 +628,16 @@ public class StorageProxy implements Sto
return readStats.getRecentLatencyMicros();
}
+ public long[] getTotalReadLatencyHistogramMicros()
+ {
+ return readStats.getTotalLatencyHistogramMicros();
+ }
+
+ public long[] getRecentReadLatencyHistogramMicros()
+ {
+ return readStats.getRecentLatencyHistogramMicros();
+ }
+
public long getRangeOperations()
{
return rangeStats.getOpCount();
@@ -635,6 +653,16 @@ public class StorageProxy implements Sto
return rangeStats.getRecentLatencyMicros();
}
+ public long[] getTotalRangeLatencyHistogramMicros()
+ {
+ return rangeStats.getTotalLatencyHistogramMicros();
+ }
+
+ public long[] getRecentRangeLatencyHistogramMicros()
+ {
+ return rangeStats.getRecentLatencyHistogramMicros();
+ }
+
public long getWriteOperations()
{
return writeStats.getOpCount();
@@ -650,6 +678,16 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyMicros();
}
+ public long[] getTotalWriteLatencyHistogramMicros()
+ {
+ return writeStats.getTotalLatencyHistogramMicros();
+ }
+
+ public long[] getRecentWriteLatencyHistogramMicros()
+ {
+ return writeStats.getRecentLatencyHistogramMicros();
+ }
+
public static List<Row> scan(String keyspace, String column_family,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level)
throws IOException, TimeoutException, UnavailableException
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1052027&r1=1052026&r2=1052027&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Wed Dec 22 18:47:09 2010
@@ -23,14 +23,20 @@ public interface StorageProxyMBean
public long getReadOperations();
public long getTotalReadLatencyMicros();
public double getRecentReadLatencyMicros();
+ public long[] getTotalReadLatencyHistogramMicros();
+ public long[] getRecentReadLatencyHistogramMicros();
public long getRangeOperations();
public long getTotalRangeLatencyMicros();
public double getRecentRangeLatencyMicros();
+ public long[] getTotalRangeLatencyHistogramMicros();
+ public long[] getRecentRangeLatencyHistogramMicros();
public long getWriteOperations();
public long getTotalWriteLatencyMicros();
public double getRecentWriteLatencyMicros();
+ public long[] getTotalWriteLatencyHistogramMicros();
+ public long[] getRecentWriteLatencyHistogramMicros();
public boolean getHintedHandoffEnabled();
public void setHintedHandoffEnabled(boolean b);