Author: jbellis
Date: Tue Dec 28 03:10:26 2010
New Revision: 1053244
URL: http://svn.apache.org/viewvc?rev=1053244&view=rev
Log:
count timeouts towards dynamicsnitch latencies
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1905
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Dec 28 03:10:26 2010
@@ -29,6 +29,7 @@
* manage read repair in coordinator instead of data source, to
provide latency information to dynamic snitch (CASSANDRA-1873)
* enable keepalive on intra-cluster sockets (CASSANDRA-1766)
+ * count timeouts towards dynamicsnitch latencies (CASSANDRA-1905)
0.6.8
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
Tue Dec 28 03:10:26 2010
@@ -121,6 +121,8 @@ class AsyncResult implements IAsyncResul
{
lock_.unlock();
}
+
+ MessagingService.removeRegisteredCallback(response.getMessageId());
}
public InetAddress getFrom()
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
Tue Dec 28 03:10:26 2010
@@ -24,7 +24,7 @@ package org.apache.cassandra.net;
* service. In particular, if any shared state is referenced, making
* response alone synchronized will not suffice.
*/
-public interface IAsyncCallback
+public interface IAsyncCallback extends IMessageCallback
{
/**
* @param msg response received.
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
Tue Dec 28 03:10:26 2010
@@ -22,7 +22,7 @@ import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public interface IAsyncResult
+public interface IAsyncResult extends IMessageCallback
{
/**
* This is used to check if the task has been completed
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1053244&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
(added)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IMessageCallback.java
Tue Dec 28 03:10:26 2010
@@ -0,0 +1,10 @@
+package org.apache.cassandra.net;
+
+public interface IMessageCallback
+{
+}
+package org.apache.cassandra.net;
+
+public interface IMessageCallback
+{
+}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Tue Dec 28 03:10:26 2010
@@ -33,12 +33,18 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.apache.log4j.Logger;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
@@ -48,7 +54,7 @@ import org.apache.cassandra.utils.GuidGe
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-public class MessagingService
+public class MessagingService implements ILatencyPublisher
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
@@ -58,9 +64,9 @@ public class MessagingService
public static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* This records all the results mapped by message Id */
- private static ExpiringMap<String, IAsyncCallback> callbackMap_;
- private static ExpiringMap<String, IAsyncResult> taskCompletionMap_;
-
+ private static ExpiringMap<String, IMessageCallback> callbacks;
+ private static Multimap<String, InetAddress> targets;
+
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -80,6 +86,8 @@ public class MessagingService
private SocketThread socketThread;
private SimpleCondition listenGate;
private static final Map<StorageService.Verb, AtomicInteger>
droppedMessages = new EnumMap<StorageService.Verb,
AtomicInteger>(StorageService.Verb.class);
+ private final List<ILatencySubscriber> subscribers = new
ArrayList<ILatencySubscriber>();
+
static
{
for (StorageService.Verb verb : StorageService.Verb.values())
@@ -96,18 +104,30 @@ public class MessagingService
{
listenGate = new SimpleCondition();
verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
- /*
- * Leave callbacks in the cachetable long enough that any related
messages will arrive
- * before the callback is evicted from the table. The concurrency
level is set at 128
- * which is the sum of the threads in the pool that adds shit into the
table and the
- * pool that retrives the callback from here.
- */
- callbackMap_ = new ExpiringMap<String, IAsyncCallback>( 2 *
DatabaseDescriptor.getRpcTimeout() );
- taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>( 2 *
DatabaseDescriptor.getRpcTimeout() );
- defaultExecutor_ = new
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
+ Function<String, ?> timeoutReporter = new Function<String, Object>()
+ {
+ public Object apply(String messageId)
+ {
+ Collection<InetAddress> addresses =
targets.removeAll(messageId);
+ if (addresses == null)
+ return null;
+
+ for (InetAddress address : addresses)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(address, (double)
DatabaseDescriptor.getRpcTimeout());
+ }
+
+ return null;
+ }
+ };
+ targets = ArrayListMultimap.create();
+ callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+ defaultExecutor_ = new
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+
TimerTask logDropped = new TimerTask()
{
public void run()
@@ -220,6 +240,7 @@ public class MessagingService
addCallback(cb, messageId);
for (InetAddress endpoint : to)
{
+ targets.put(messageId, endpoint);
sendOneWay(message, endpoint);
}
return messageId;
@@ -227,7 +248,7 @@ public class MessagingService
public void addCallback(IAsyncCallback cb, String messageId)
{
- callbackMap_.put(messageId, cb);
+ callbacks.put(messageId, cb);
}
/**
@@ -244,6 +265,7 @@ public class MessagingService
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
+ targets.put(messageId, to);
sendOneWay(message, to);
return messageId;
}
@@ -272,6 +294,7 @@ public class MessagingService
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
+ targets.put(groupId, to[i]);
sendOneWay(messages[i], to[i]);
}
return groupId;
@@ -324,7 +347,8 @@ public class MessagingService
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- taskCompletionMap_.put(message.getMessageId(), iar);
+ callbacks.put(message.getMessageId(), iar);
+ targets.put(message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
@@ -345,6 +369,11 @@ public class MessagingService
streamExecutor_.execute(streamingTask);
}
+ public void register(ILatencySubscriber subcriber)
+ {
+ subscribers.add(subcriber);
+ }
+
/** blocks until the processing pools are empty and done. */
public static void waitFor() throws InterruptedException
{
@@ -369,10 +398,7 @@ public class MessagingService
defaultExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
-
- /* shut down the cachetables */
- taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
+ callbacks.shutdown();
logger_.info("Shutdown complete (no further commands will be
processed)");
}
@@ -396,29 +422,25 @@ public class MessagingService
}
}
- public static IAsyncCallback getRegisteredCallback(String messageId)
- {
- return callbackMap_.get(messageId);
- }
-
- public static void removeRegisteredCallback(String messageId)
+ public static IMessageCallback getRegisteredCallback(String messageId)
{
- callbackMap_.remove(messageId);
+ return callbacks.get(messageId);
}
- public static IAsyncResult getAsyncResult(String messageId)
+ public static IMessageCallback removeRegisteredCallback(String messageId)
{
- return taskCompletionMap_.remove(messageId);
+ targets.removeAll(messageId); // TODO fix this when we clean up quorum
reads to do proper RR
+ return callbacks.remove(messageId);
}
public static long getRegisteredCallbackAge(String messageId)
{
- return callbackMap_.getAge(messageId);
+ return callbacks.getAge(messageId);
}
- public static long getAsyncResultAge(String messageId)
+ public static void responseReceivedFrom(String messageId, InetAddress from)
{
- return taskCompletionMap_.getAge(messageId);
+ targets.remove(messageId, from);
}
public static void validateMagic(int magic) throws IOException
@@ -554,5 +576,4 @@ public class MessagingService
server.close();
}
}
-
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Tue Dec 28 03:10:26 2010
@@ -18,7 +18,6 @@
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@@ -34,35 +33,28 @@ public class ResponseVerbHandler impleme
public void doVerb(Message message)
{
- String messageId = message.getMessageId();
- IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
- double age = 0;
- if (cb != null)
+ String messageId = message.getMessageId();
+ MessagingService.responseReceivedFrom(messageId, message.getFrom());
+ double age = System.currentTimeMillis() -
MessagingService.getRegisteredCallbackAge(messageId);
+ IMessageCallback cb =
MessagingService.getRegisteredCallback(messageId);
+ if (cb == null)
+ return;
+
+ // if cb is not null, then age will be valid
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(message.getFrom(), age);
+
+ if (cb instanceof IAsyncCallback)
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on a callback from " +
message.getMessageId() + "@" + message.getFrom());
- age = System.currentTimeMillis() -
MessagingService.getRegisteredCallbackAge(messageId);
- cb.response(message);
+ ((IAsyncCallback) cb).response(message);
}
else
{
- IAsyncResult ar = MessagingService.getAsyncResult(messageId);
- if (ar != null)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from
" + message.getMessageId() + "@" + message.getFrom());
- age = System.currentTimeMillis() -
MessagingService.getAsyncResultAge(messageId);
- ar.result(message);
- }
- }
- notifySubscribers(message.getFrom(), age);
- }
-
- private void notifySubscribers(InetAddress host, double latency)
- {
- for (ILatencySubscriber subscriber : subscribers)
- {
- subscriber.receiveTiming(host, latency);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Processing response on an async result from " +
message.getMessageId() + "@" + message.getFrom());
+ ((IAsyncResult) cb).result(message);
}
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1053244&r1=1053243&r2=1053244&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
Tue Dec 28 03:10:26 2010
@@ -22,7 +22,9 @@ import java.util.Enumeration;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import com.google.common.base.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import org.cliffc.high_scale_lib.NonBloc
public class ExpiringMap<K, V>
{
private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
+ private final Function<K, ?> postExpireHook;
private static class CacheableObject<T>
{
@@ -76,6 +79,7 @@ public class ExpiringMap<K, V>
if (co != null && co.isReadyToDie(expiration))
{
cache.remove(key);
+ postExpireHook.apply(key);
}
}
}
@@ -86,12 +90,18 @@ public class ExpiringMap<K, V>
private final Timer timer;
private static int counter = 0;
- /*
- * Specify the TTL for objects in the cache
- * in milliseconds.
- */
public ExpiringMap(long expiration)
{
+ this(expiration, null);
+ }
+
+ /**
+ *
+ * @param expiration the TTL for objects in the cache in milliseconds
+ */
+ public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+ {
+ this.postExpireHook = postExpireHook;
if (expiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a
positive number");