Author: jbellis
Date: Thu Jan 27 22:28:06 2011
New Revision: 1064340
URL: http://svn.apache.org/viewvc?rev=1064340&view=rev
Log:
merge from 0.6
Removed:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
Modified:
cassandra/branches/cassandra-0.7/ (props changed)
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Jan 27 22:28:06 2011
@@ -4,6 +4,8 @@
* allow nodes to be up without being part of normal traffic (CASSANDRA-1951)
* fix CLI "show keyspaces" with null options on NTS (CASSANDRA-2049)
* fix possible ByteBuffer race conditions (CASSANDRA-2066)
+ * reduce garbage generated by MessagingService to prevent load spikes
+ (CASSANDRA-2058)
0.7.1
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange:
cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 27 22:28:06 2011
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1055311,1056121,1057932,1064193
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Thu Jan 27 22:28:06 2011
@@ -124,7 +124,7 @@ public class HintedHandOffManager
rm.add(cf);
Message message = rm.makeRowMutationMessage();
IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
- MessagingService.instance().sendRR(message,
Arrays.asList(endpoint), responseHandler);
+ MessagingService.instance().sendRR(message, endpoint,
responseHandler);
try
{
responseHandler.get();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
Thu Jan 27 22:28:06 2011
@@ -208,10 +208,9 @@ public class DynamicEndpointSnitch exten
return;
if (!registered)
{
- ILatencyPublisher handler = (ILatencyPublisher)
MessagingService.instance().getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
- if (handler != null)
+ if (MessagingService.instance() != null)
{
- handler.register(this);
+ MessagingService.instance().register(this);
registered = true;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
Thu Jan 27 22:28:06 2011
@@ -94,8 +94,6 @@ class AsyncResult implements IAsyncResul
{
lock.unlock();
}
-
-
MessagingService.instance().removeRegisteredCallback(response.getMessageId());
}
public InetAddress getFrom()
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Thu Jan 27 22:28:06 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.*;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,21 +44,22 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.ILatencyPublisher;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.service.ReadCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-public final class MessagingService implements MessagingServiceMBean,
ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean
{
private static final int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
@@ -69,8 +69,7 @@ public final class MessagingService impl
private static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* This records all the results mapped by message Id */
- private final ExpiringMap<String, IMessageCallback> callbacks;
- private final ConcurrentMap<String, Collection<InetAddress>> targets = new
NonBlockingHashMap<String, Collection<InetAddress>>();
+ private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>>
callbacks;
/* Lookup table for registering message handlers based on the verb. */
private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -116,24 +115,16 @@ public final class MessagingService impl
};
StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped,
LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
- Function<String, ?> timeoutReporter = new Function<String, Object>()
+ Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?>
timeoutReporter = new Function<Pair<String, Pair<InetAddress,
IMessageCallback>>, Object>()
{
- public Object apply(String messageId)
+ public Object apply(Pair<String, Pair<InetAddress,
IMessageCallback>> pair)
{
- Collection<InetAddress> addresses = targets.remove(messageId);
- if (addresses == null)
- return null;
-
- for (InetAddress address : addresses)
- {
- for (ILatencySubscriber subscriber : subscribers)
- subscriber.receiveTiming(address, (double)
DatabaseDescriptor.getRpcTimeout());
- }
-
+ Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
+ maybeAddLatency(expiredValue.right, expiredValue.left,
(double) DatabaseDescriptor.getRpcTimeout());
return null;
}
};
- callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+ callbacks = new ExpiringMap<String, Pair<InetAddress,
IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()),
timeoutReporter);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
@@ -146,6 +137,24 @@ public final class MessagingService impl
}
}
+ /**
+ * Track latency information for the dynamic snitch
+ * @param cb: the callback associated with this message -- this lets us
know if it's a message type we're interested in
+ * @param address: the host that replied to the message
+ * @param latency
+ */
+ public void maybeAddLatency(IMessageCallback cb, InetAddress address,
double latency)
+ {
+ if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+ addLatency(address, latency);
+ }
+
+ public void addLatency(InetAddress address, double latency)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(address, latency);
+ }
+
public static byte[] hash(String type, byte data[])
{
byte result[];
@@ -247,49 +256,9 @@ public final class MessagingService impl
return verbHandlers_.get(type);
}
- /**
- * Send a message to a given endpoint.
- * @param message message to be sent.
- * @param to endpoint to which the message needs to be sent
- * @return an reference to an IAsyncResult which can be queried for the
- * response
- */
- public String sendRR(Message message, Collection<InetAddress> to,
IAsyncCallback cb)
+ private void addCallback(IMessageCallback cb, String messageId,
InetAddress to)
{
- String messageId = message.getMessageId();
- addCallback(cb, messageId);
- for (InetAddress endpoint : to)
- {
- putTarget(messageId, endpoint);
- sendOneWay(message, endpoint);
- }
- return messageId;
- }
-
- private void putTarget(String messageId, InetAddress endpoint)
- {
- Collection<InetAddress> addresses = targets.get(messageId);
- if (addresses == null)
- {
- addresses = new NonBlockingHashSet<InetAddress>();
- Collection<InetAddress> oldAddresses =
targets.putIfAbsent(messageId, addresses);
- if (oldAddresses != null)
- addresses = oldAddresses;
- }
- addresses.add(endpoint);
- }
-
- private void removeTarget(String messageId, InetAddress from)
- {
- Collection<InetAddress> addresses = targets.get(messageId);
- // null is expected if we removed the callback or we got a reply after
its timeout expired
- if (addresses != null)
- addresses.remove(from);
- }
-
- public void addCallback(IAsyncCallback cb, String messageId)
- {
- callbacks.put(messageId, cb);
+ callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to,
cb));
}
/**
@@ -305,8 +274,7 @@ public final class MessagingService impl
public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
{
String messageId = message.getMessageId();
- addCallback(cb, messageId);
- putTarget(messageId, to);
+ addCallback(cb, messageId, to);
sendOneWay(message, to);
return messageId;
}
@@ -358,8 +326,7 @@ public final class MessagingService impl
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- callbacks.put(message.getMessageId(), iar);
- putTarget(message.getMessageId(), to);
+ addCallback(iar, message.getMessageId(), to);
sendOneWay(message, to);
return iar;
}
@@ -420,14 +387,8 @@ public final class MessagingService impl
stage.execute(runnable);
}
- public IMessageCallback getRegisteredCallback(String messageId)
+ public Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String
messageId)
{
- return callbacks.get(messageId);
- }
-
- public IMessageCallback removeRegisteredCallback(String messageId)
- {
- targets.remove(messageId);
return callbacks.remove(messageId);
}
@@ -436,11 +397,6 @@ public final class MessagingService impl
return callbacks.getAge(messageId);
}
- public void responseReceivedFrom(String messageId, InetAddress from)
- {
- removeTarget(messageId, from);
- }
-
public static void validateMagic(int magic) throws IOException
{
if (magic != PROTOCOL_MAGIC)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Thu Jan 27 22:28:06 2011
@@ -25,27 +25,22 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.utils.Pair;
-public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
+public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger_ = LoggerFactory.getLogger(
ResponseVerbHandler.class );
- private List<ILatencySubscriber> subscribers = new
ArrayList<ILatencySubscriber>();
-
public void doVerb(Message message)
{
String messageId = message.getMessageId();
- MessagingService.instance().responseReceivedFrom(messageId,
message.getFrom());
double age = System.currentTimeMillis() -
MessagingService.instance().getRegisteredCallbackAge(messageId);
- IMessageCallback cb =
MessagingService.instance().getRegisteredCallback(messageId);
- if (cb == null)
+ Pair<InetAddress, IMessageCallback> pair =
MessagingService.instance().removeRegisteredCallback(messageId);
+ if (pair == null)
return;
- // if cb is not null, then age will be valid
- for (ILatencySubscriber subscriber : subscribers)
- subscriber.receiveTiming(message.getFrom(), age);
+ IMessageCallback cb = pair.right;
+ MessagingService.instance().maybeAddLatency(cb, message.getFrom(),
age);
if (cb instanceof IAsyncCallback)
{
@@ -60,9 +55,4 @@ public class ResponseVerbHandler impleme
((IAsyncResult) cb).result(message);
}
}
-
- public void register(ILatencySubscriber subscriber)
- {
- subscribers.add(subscriber);
- }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
Thu Jan 27 22:28:06 2011
@@ -70,13 +70,6 @@ public abstract class AbstractWriteRespo
}
}
- public void addHintCallback(Message hintedMessage, InetAddress destination)
- {
- // (non-destination hints are part of the callback and count towards
consistency only under CL.ANY)
- if (writeEndpoints.contains(destination) || consistencyLevel ==
ConsistencyLevel.ANY)
- MessagingService.instance().addCallback(this,
hintedMessage.getMessageId());
- }
-
/** null message means "response from local write" */
public abstract void response(Message msg);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
Thu Jan 27 22:28:06 2011
@@ -20,16 +20,13 @@ package org.apache.cassandra.service;
*
*/
-import java.net.InetAddress;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.thrift.UnavailableException;
public interface IWriteResponseHandler extends IAsyncCallback
{
public void get() throws TimeoutException;
- public void addHintCallback(Message hintedMessage, InetAddress
destination);
public void assureSufficientLiveNodes() throws UnavailableException;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Jan 27 22:28:06 2011
@@ -131,7 +131,6 @@ public class StorageProxy implements Sto
// Multimap that holds onto all the messages and addresses
meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new
HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
- Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
{
@@ -150,15 +149,10 @@ public class StorageProxy implements Sto
else
{
// belongs on a different server
- if (unhintedMessage == null)
- {
- unhintedMessage = rm.makeRowMutationMessage();
-
MessagingService.instance().addCallback(responseHandler,
unhintedMessage.getMessageId());
- }
+ Message unhintedMessage =
rm.makeRowMutationMessage();
if (logger.isDebugEnabled())
logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
-
Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
if (messages == null)
{
@@ -182,7 +176,12 @@ public class StorageProxy implements Sto
logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() +
"@" + destination + " for " + target);
}
}
- responseHandler.addHintCallback(hintedMessage,
destination);
+ // (non-destination hints are part of the callback and
count towards consistency only under CL.ANY)
+ // (non-destination hints are part of the callback and
count towards consistency only under CL.ANY)
+ if (writeEndpoints.contains(destination) ||
consistency_level == ConsistencyLevel.ANY)
+ MessagingService.instance().sendRR(hintedMessage,
destination, responseHandler);
+ else
+
MessagingService.instance().sendOneWay(hintedMessage, destination);
Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
@@ -196,7 +195,7 @@ public class StorageProxy implements Sto
}
}
- sendMessages(localDataCenter, dcMessages);
+ sendMessages(localDataCenter, dcMessages, responseHandler);
}
// wait for writes. throws timeoutexception if necessary
@@ -219,7 +218,7 @@ public class StorageProxy implements Sto
/**
* for each datacenter, send a message to one node to relay the write to
other replicas
*/
- private static void sendMessages(String localDataCenter, Map<String,
Multimap<Message, InetAddress>> dcMessages)
+ private static void sendMessages(String localDataCenter, Map<String,
Multimap<Message, InetAddress>> dcMessages, IWriteResponseHandler handler)
throws IOException
{
for (Map.Entry<String, Multimap<Message, InetAddress>> entry:
dcMessages.entrySet())
@@ -230,15 +229,12 @@ public class StorageProxy implements Sto
for (Map.Entry<Message, Collection<InetAddress>> messages:
entry.getValue().asMap().entrySet())
{
Message message = messages.getKey();
- // a single message object is used for unhinted writes, so
clean out any forwards
- // from previous loop iterations
- message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
// direct writes to local DC
for (InetAddress destination : messages.getValue())
- MessagingService.instance().sendOneWay(message,
destination);
+ MessagingService.instance().sendRR(message,
destination, handler);
}
else
{
@@ -262,7 +258,7 @@ public class StorageProxy implements Sto
message.setHeader(RowMutation.FORWARD_HEADER,
bos.toByteArray());
}
// send the combined message + forward headers
- MessagingService.instance().sendOneWay(message, target);
+ MessagingService.instance().sendRR(message, target,
handler);
}
}
}
@@ -373,7 +369,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("reading data for " + command + " locally");
- StageManager.getStage(Stage.READ).submit(new
WeakReadLocalRunnable(command, handler));
+ StageManager.getStage(Stage.READ).submit(new
LocalReadRunnable(command, handler));
}
else
{
@@ -392,7 +388,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("reading digest for " + command + "
locally");
- StageManager.getStage(Stage.READ).submit(new
WeakReadLocalRunnable(digestCommand, handler));
+ StageManager.getStage(Stage.READ).submit(new
LocalReadRunnable(digestCommand, handler));
}
else
{
@@ -461,12 +457,13 @@ public class StorageProxy implements Sto
return rows;
}
- static class WeakReadLocalRunnable extends WrappedRunnable
+ static class LocalReadRunnable extends WrappedRunnable
{
private final ReadCommand command;
private final ReadCallback<Row> handler;
+ private final long start = System.currentTimeMillis();
- WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+ LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
{
this.command = command;
this.handler = handler;
@@ -475,10 +472,11 @@ public class StorageProxy implements Sto
protected void runMayThrow() throws IOException
{
if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + command);
+ logger.debug("LocalReadRunnable reading " + command);
Table table = Table.open(command.table);
ReadResponse result = ReadVerbHandler.getResponse(command,
command.getRow(table));
+
MessagingService.instance().addLatency(FBUtilities.getLocalAddress(),
System.currentTimeMillis() - start);
handler.response(result);
}
}
@@ -497,8 +495,11 @@ public class StorageProxy implements Sto
{
ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
RepairCallback<Row> handler = new RepairCallback<Row>(resolver,
endpoints);
- Message messageRepair = command.makeReadMessage();
- MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+ for (InetAddress endpoint : endpoints)
+ {
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.instance().sendRR(messageRepair, endpoint,
handler);
+ }
return handler;
}
@@ -601,21 +602,26 @@ public class StorageProxy implements Sto
final String myVersion =
DatabaseDescriptor.getDefsVersion().toString();
final Map<InetAddress, UUID> versions = new
ConcurrentHashMap<InetAddress, UUID>();
final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
- final Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
- // an empty message acts as a request to the SchemaCheckVerbHandler.
- MessagingService.instance().sendRR(msg, liveHosts, new IAsyncCallback()
+
+ IAsyncCallback cb = new IAsyncCallback()
{
- public void response(Message msg)
+ public void response(Message message)
{
// record the response from the remote node.
- logger.debug("Received schema check response from " +
msg.getFrom().getHostAddress());
- UUID theirVersion = UUID.fromString(new
String(msg.getMessageBody()));
- versions.put(msg.getFrom(), theirVersion);
+ logger.debug("Received schema check response from " +
message.getFrom().getHostAddress());
+ UUID theirVersion = UUID.fromString(new
String(message.getMessageBody()));
+ versions.put(message.getFrom(), theirVersion);
latch.countDown();
}
- });
-
+ };
+ // an empty message acts as a request to the SchemaCheckVerbHandler.
+ for (InetAddress endpoint : liveHosts)
+ {
+ Message message = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+ MessagingService.instance().sendRR(message, endpoint, cb);
+ }
+
try
{
// wait for as long as possible. timeout-1s if possible.
@@ -895,8 +901,11 @@ public class StorageProxy implements Sto
// Send out the truncate calls and track the responses with the
callbacks.
logger.debug("Starting to send truncate messages to hosts {}",
allEndpoints);
Truncation truncation = new Truncation(keyspace, cfname);
- Message message = truncation.makeTruncationMessage();
- MessagingService.instance().sendRR(message, allEndpoints,
responseHandler);
+ for (InetAddress endpoint : allEndpoints)
+ {
+ Message message = truncation.makeTruncationMessage();
+ MessagingService.instance().sendRR(message, endpoint,
responseHandler);
+ }
// Wait for all
logger.debug("Sent all truncate messages, now waiting for {}
responses", blockFor);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1064340&r1=1064339&r2=1064340&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/ExpiringMap.java
Thu Jan 27 22:28:06 2011
@@ -18,10 +18,7 @@
package org.apache.cassandra.utils;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.Callable;
import com.google.common.base.Function;
@@ -33,7 +30,7 @@ import org.cliffc.high_scale_lib.NonBloc
public class ExpiringMap<K, V>
{
private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
- private final Function<K, ?> postExpireHook;
+ private final Function<Pair<K,V>, ?> postExpireHook;
private static class CacheableObject<T>
{
@@ -69,18 +66,12 @@ public class ExpiringMap<K, V>
@Override
public void run()
{
- synchronized (cache)
+ for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
{
- Enumeration<K> e = cache.keys();
- while (e.hasMoreElements())
+ if (entry.getValue().isReadyToDie(expiration))
{
- K key = e.nextElement();
- CacheableObject co = cache.get(key);
- if (co != null && co.isReadyToDie(expiration))
- {
- cache.remove(key);
- postExpireHook.apply(key);
- }
+ cache.remove(entry.getKey());
+ postExpireHook.apply(new Pair(entry.getKey(),
entry.getValue().getValue()));
}
}
}
@@ -99,7 +90,7 @@ public class ExpiringMap<K, V>
*
* @param expiration the TTL for objects in the cache in milliseconds
*/
- public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+ public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
{
this.postExpireHook = postExpireHook;
if (expiration <= 0)