Author: jbellis
Date: Mon Feb 7 20:28:10 2011
New Revision: 1068101
URL: http://svn.apache.org/viewvc?rev=1068101&view=rev
Log:
move id out of Message so we can do cross-DC forwarding again
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1530
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace1.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace2.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace3.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace4.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace5.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/gms.EndpointState.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/service.TreeRequest.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/service.TreeResponse.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/streaming.StreamReply.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/streaming.StreamRequestMessage.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/utils.BloomFilter.bin
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/utils.LegacyBloomFilter.bin
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -32,7 +32,7 @@ public class BinaryVerbHandler implement
{
private static Logger logger_ =
LoggerFactory.getLogger(BinaryVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -45,8 +45,8 @@ public class BinaryVerbHandler implement
WriteResponse response = new WriteResponse(rm.getTable(),
rm.key(), true);
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
- logger_.debug("binary " + rm + " applied. Sending response to "
+ message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
+ logger_.debug("binary " + rm + " applied. Sending response to "
+ id + "@" + message.getFrom());
+ MessagingService.instance().sendReply(responseMessage, id,
message.getFrom());
}
catch (Exception e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -28,7 +28,7 @@ public class DefinitionsAnnounceVerbHand
{
/** someone is announcing their schema version. */
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
UUID theirVersion = UUID.fromString(new
String(message.getMessageBody()));
MigrationManager.rectify(theirVersion, message.getFrom());
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -42,7 +42,7 @@ public class DefinitionsUpdateResponseVe
private static final Logger logger =
LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
/** someone sent me their data definitions */
- public void doVerb(final Message message)
+ public void doVerb(final Message message, String id)
{
try
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -33,7 +33,7 @@ public class ReadRepairVerbHandler imple
{
private static Logger logger_ =
LoggerFactory.getLogger(ReadRepairVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(body);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -45,7 +45,7 @@ public class ReadVerbHandler implements
/* We use this so that we can reuse readcontext objects */
private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new
InheritableThreadLocal<ReadVerbHandler.ReadContext>();
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
/* Obtain a Read Context from TLS */
@@ -79,8 +79,8 @@ public class ReadVerbHandler implements
Message response = message.getReply(FBUtilities.getLocalAddress(),
bytes);
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to
%s@%s",
-
ByteBufferUtil.bytesToHex(command.key), message.getMessageId(),
message.getFrom()));
- MessagingService.instance().sendOneWay(response,
message.getFrom());
+
ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
+ MessagingService.instance().sendReply(response, id,
message.getFrom());
}
catch (IOException ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -40,7 +40,7 @@ public class RowMutationVerbHandler impl
{
private static Logger logger_ =
LoggerFactory.getLogger(RowMutationVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -78,8 +78,8 @@ public class RowMutationVerbHandler impl
WriteResponse response = new WriteResponse(rm.getTable(),
rm.key(), true);
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
- logger_.debug(rm + " applied. Sending response to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
+ logger_.debug(rm + " applied. Sending response to " + id + "@"
+ message.getFrom());
+ MessagingService.instance().sendReply(responseMessage, id,
message.getFrom());
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -31,10 +31,10 @@ public class SchemaCheckVerbHandler impl
{
private final Logger logger =
LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
logger.debug("Received schema check request.");
Message response =
message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -40,7 +40,7 @@ public class TruncateVerbHandler impleme
{
private static Logger logger =
LoggerFactory.getLogger(TruncateVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -76,9 +76,8 @@ public class TruncateVerbHandler impleme
TruncateResponse response = new TruncateResponse(t.keyspace,
t.columnFamily, true);
Message responseMessage =
TruncateResponse.makeTruncateResponseMessage(message, response);
- logger.debug("{} applied. Sending response to {}@{} ",
- new Object[]{t, message.getMessageId(),
message.getFrom()});
- MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
+ logger.debug("{} applied. Sending response to {}@{} ", new
Object[]{ t, id, message.getFrom()});
+ MessagingService.instance().sendReply(responseMessage, id,
message.getFrom());
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
Mon Feb 7 20:28:10 2011
@@ -250,12 +250,12 @@ public class BootStrapper
public static class BootstrapTokenVerbHandler implements IVerbHandler
{
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
StorageService ss = StorageService.instance;
String tokenString =
StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
Message response =
message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
- MessagingService.instance().sendOneWay(response,
message.getFrom());
+ MessagingService.instance().sendReply(response, id,
message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -37,7 +37,7 @@ public class GossipDigestAck2VerbHandler
{
private static Logger logger_ =
LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -40,7 +40,7 @@ public class GossipDigestAckVerbHandler
{
private static Logger logger_ =
LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -39,7 +39,7 @@ public class GossipDigestSynVerbHandler
{
private static Logger logger_ = LoggerFactory.getLogger(
GossipDigestSynVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
Mon Feb 7 20:28:10 2011
@@ -33,8 +33,7 @@ import org.apache.cassandra.service.Stor
public class Header
{
private static ICompactSerializer<Header> serializer_;
- private static AtomicInteger idGen_ = new AtomicInteger(0);
-
+
static
{
serializer_ = new HeaderSerializer();
@@ -48,31 +47,23 @@ public class Header
private final InetAddress from_;
// TODO STAGE can be determined from verb
private final StorageService.Verb verb_;
- private final String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
- Header(String id, InetAddress from, StorageService.Verb verb)
+ Header(InetAddress from, StorageService.Verb verb)
{
- assert id != null;
assert from != null;
assert verb != null;
- messageId_ = id;
from_ = from;
verb_ = verb;
}
-
- Header(String id, InetAddress from, StorageService.Verb verb, Map<String,
byte[]> details)
+
+ Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]>
details)
{
- this(id, from, verb);
+ this(from, verb);
details_ = details;
}
- Header(InetAddress from, StorageService.Verb verb)
- {
- this(Integer.toString(idGen_.incrementAndGet()), from, verb);
- }
-
InetAddress getFrom()
{
return from_;
@@ -83,11 +74,6 @@ public class Header
return verb_;
}
- String getMessageId()
- {
- return messageId_;
- }
-
byte[] getDetail(String key)
{
return details_.get(key);
@@ -108,7 +94,6 @@ class HeaderSerializer implements ICompa
{
public void serialize(Header t, DataOutputStream dos) throws IOException
{
- dos.writeUTF(t.getMessageId());
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
dos.writeInt(t.getVerb().ordinal());
@@ -128,7 +113,6 @@ class HeaderSerializer implements ICompa
public Header deserialize(DataInputStream dis) throws IOException
{
- String id = dis.readUTF();
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
int verbOrdinal = dis.readInt();
@@ -144,7 +128,7 @@ class HeaderSerializer implements ICompa
details.put(key, bytes);
}
- return new Header(id, from, StorageService.VERBS[verbOrdinal],
details);
+ return new Header(from, StorageService.VERBS[verbOrdinal], details);
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -31,8 +31,9 @@ public interface IVerbHandler
* class was registered by a call to
MessagingService.registerVerbHandlers).
* Note that the caller should not be holding any locks when calling this
method
* because the implementation may be synchronized.
- *
- * @param message - incoming message that needs handling.
+ *
+ * @param message - incoming message that needs handling.
+ * @param id
*/
- public void doVerb(Message message);
+ public void doVerb(Message message, String id);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Mon Feb 7 20:28:10 2011
@@ -101,8 +101,10 @@ public class IncomingTcpConnection exten
logger.info("Received connection from newer protocol
version. Ignorning message.");
else
{
- Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
- MessagingService.instance().receive(message);
+ DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(contentBytes));
+ String id = dis.readUTF();
+ Message message =
Message.serializer().deserialize(dis);
+ MessagingService.instance().receive(message, id);
}
}
// prepare to read the next message
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
Mon Feb 7 20:28:10 2011
@@ -94,21 +94,17 @@ public class Message
return header_.getVerb();
}
- public String getMessageId()
- {
- return header_.getMessageId();
- }
-
// TODO should take byte[] + length so we don't have to copy to a byte[]
of exactly the right len
+ // TODO make static
public Message getReply(InetAddress from, byte[] args)
{
- Header header = new Header(getMessageId(), from,
StorageService.Verb.REQUEST_RESPONSE);
+ Header header = new Header(from, StorageService.Verb.REQUEST_RESPONSE);
return new Message(header, args);
}
public Message getInternalReply(byte[] body)
{
- Header header = new Header(getMessageId(),
FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+ Header header = new Header(FBUtilities.getLocalAddress(),
StorageService.Verb.INTERNAL_RESPONSE);
return new Message(header, body);
}
@@ -116,9 +112,7 @@ public class Message
{
StringBuilder sbuf = new StringBuilder("");
String separator = System.getProperty("line.separator");
- sbuf.append("ID:" + getMessageId())
- .append(separator)
- .append("FROM:" + getFrom())
+ sbuf.append("FROM:" + getFrom())
.append(separator)
.append("TYPE:" + getMessageType())
.append(separator)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
Mon Feb 7 20:28:10 2011
@@ -28,18 +28,20 @@ public class MessageDeliveryTask impleme
{
private static final Logger logger_ =
LoggerFactory.getLogger(MessageDeliveryTask.class);
- private Message message_;
- private final long constructionTime_ = System.currentTimeMillis();
+ private Message message;
+ private final long constructionTime = System.currentTimeMillis();
+ private final String id;
- public MessageDeliveryTask(Message message)
+ public MessageDeliveryTask(Message message, String id)
{
assert message != null;
- message_ = message;
+ this.message = message;
+ this.id = id;
}
public void run()
{
- StorageService.Verb verb = message_.getVerb();
+ StorageService.Verb verb = message.getVerb();
switch (verb)
{
case BINARY:
@@ -48,7 +50,7 @@ public class MessageDeliveryTask impleme
case RANGE_SLICE:
case READ_REPAIR:
case REQUEST_RESPONSE:
- if (System.currentTimeMillis() > constructionTime_ +
DatabaseDescriptor.getRpcTimeout())
+ if (System.currentTimeMillis() > constructionTime +
DatabaseDescriptor.getRpcTimeout())
{
MessagingService.instance().incrementDroppedMessages(verb);
return;
@@ -67,6 +69,6 @@ public class MessageDeliveryTask impleme
IVerbHandler verbHandler =
MessagingService.instance().getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
- verbHandler.doVerb(message_);
+ verbHandler.doVerb(message, id);
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Feb 7 20:28:10 2011
@@ -34,8 +34,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,11 +51,10 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
public final class MessagingService implements MessagingServiceMBean
{
@@ -262,6 +259,13 @@ public final class MessagingService impl
assert previous == null;
}
+ private static AtomicInteger idGen = new AtomicInteger(0);
+ // TODO make these integers to avoid unnecessary int -> string -> int
conversions
+ private static String nextId()
+ {
+ return Integer.toString(idGen.incrementAndGet());
+ }
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -272,12 +276,24 @@ public final class MessagingService impl
* suggest that a timeout occurred to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
+ public String sendRR(Message message, InetAddress to, IMessageCallback cb)
{
- String messageId = message.getMessageId();
- addCallback(cb, messageId, to);
- sendOneWay(message, to);
- return messageId;
+ String id = nextId();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending " + message.getVerb() + " to " + id + "@" +
to);
+ addCallback(cb, id, to);
+ sendOneWay(message, id, to);
+ return id;
+ }
+
+ public void sendOneWay(Message message, InetAddress to)
+ {
+ sendOneWay(message, nextId(), to);
+ }
+
+ public void sendReply(Message message, String id, InetAddress to)
+ {
+ sendOneWay(message, id, to);
}
/**
@@ -286,12 +302,12 @@ public final class MessagingService impl
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
- public void sendOneWay(Message message, InetAddress to)
+ private void sendOneWay(Message message, String id, InetAddress to)
{
// do local deliveries
if ( message.getFrom().equals(to) )
{
- receive(message);
+ receive(message, id);
return;
}
@@ -310,6 +326,7 @@ public final class MessagingService impl
try
{
DataOutputBuffer buffer = new DataOutputBuffer();
+ buffer.writeUTF(id);
Message.serializer().serialize(message, buffer);
data = buffer.getData();
}
@@ -327,8 +344,7 @@ public final class MessagingService impl
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- addCallback(iar, message.getMessageId(), to);
- sendOneWay(message, to);
+ sendRR(message, to, iar);
return iar;
}
@@ -376,13 +392,13 @@ public final class MessagingService impl
logger_.info("Shutdown complete (no further commands will be
processed)");
}
- public void receive(Message message)
+ public void receive(Message message, String id)
{
message = SinkManager.processServerMessage(message);
if (message == null)
return;
- Runnable runnable = new MessageDeliveryTask(message);
+ Runnable runnable = new MessageDeliveryTask(message, id);
ExecutorService stage =
StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " +
message.getMessageType();
stage.execute(runnable);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -19,8 +19,6 @@
package org.apache.cassandra.net;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,14 +29,13 @@ public class ResponseVerbHandler impleme
{
private static final Logger logger_ = LoggerFactory.getLogger(
ResponseVerbHandler.class );
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
- String messageId = message.getMessageId();
- double age = System.currentTimeMillis() -
MessagingService.instance().getRegisteredCallbackAge(messageId);
- Pair<InetAddress, IMessageCallback> pair =
MessagingService.instance().removeRegisteredCallback(messageId);
+ double age = System.currentTimeMillis() -
MessagingService.instance().getRegisteredCallbackAge(id);
+ Pair<InetAddress, IMessageCallback> pair =
MessagingService.instance().removeRegisteredCallback(id);
if (pair == null)
{
- logger_.debug("Callback already removed for {}", messageId);
+ logger_.debug("Callback already removed for {}", id);
return;
}
@@ -48,13 +45,13 @@ public class ResponseVerbHandler impleme
if (cb instanceof IAsyncCallback)
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on a callback from " +
message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Processing response on a callback from " + id +
"@" + message.getFrom());
((IAsyncCallback) cb).response(message);
}
else
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from " +
message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Processing response on an async result from " +
id + "@" + message.getFrom());
((IAsyncResult) cb).result(message);
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon Feb 7 20:28:10 2011
@@ -23,7 +23,6 @@ import java.net.InetAddress;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -578,7 +577,7 @@ public class AntiEntropyService
/**
* Trigger a validation compaction which will return the tree upon
completion.
*/
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
@@ -645,7 +644,7 @@ public class AntiEntropyService
}
}
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
DataInputStream buffer = new DataInputStream(new
ByteArrayInputStream(bytes));
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -33,7 +33,7 @@ public class IndexScanVerbHandler implem
{
private static final Logger logger =
LoggerFactory.getLogger(IndexScanVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
try
{
@@ -43,8 +43,8 @@ public class IndexScanVerbHandler implem
RangeSliceReply reply = new RangeSliceReply(rows);
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
- logger.debug("Sending " + reply+ " to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response,
message.getFrom());
+ logger.debug("Sending " + reply+ " to " + id + "@" +
message.getFrom());
+ MessagingService.instance().sendReply(response, id,
message.getFrom());
}
catch (Exception ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -35,7 +35,7 @@ public class RangeSliceVerbHandler imple
private static final Logger logger =
LoggerFactory.getLogger(RangeSliceVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
try
{
@@ -52,8 +52,8 @@ public class RangeSliceVerbHandler imple
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
- logger.debug("Sending " + reply+ " to " +
message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response,
message.getFrom());
+ logger.debug("Sending " + reply+ " to " + id + "@" +
message.getFrom());
+ MessagingService.instance().sendReply(response, id,
message.getFrom());
}
catch (Exception ex)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Mon Feb 7 20:28:10 2011
@@ -144,21 +144,6 @@ public class StorageLoadBalancer impleme
*/
}
- class MoveMessageVerbHandler implements IVerbHandler
- {
- public void doVerb(Message message)
- {
- Message reply = message.getInternalReply(new byte[]
{(byte)(isMoveable_.get() ? 1 : 0)});
- MessagingService.instance().sendOneWay(reply, message.getFrom());
- if ( isMoveable_.get() )
- {
- // MoveMessage moveMessage =
(MoveMessage)message.getMessageBody()[0];
- /* Start the leave operation and join the ring at the position
specified */
- isMoveable_.set(false);
- }
- }
- }
-
private static final int BROADCAST_INTERVAL = 60 * 1000;
public static final StorageLoadBalancer instance = new
StorageLoadBalancer();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Feb 7 20:28:10 2011
@@ -131,6 +131,7 @@ public class StorageProxy implements Sto
// Multimap that holds onto all the messages and addresses
meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new
HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+ Message unhintedMessage = null;
for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
{
@@ -149,9 +150,10 @@ public class StorageProxy implements Sto
else
{
// belongs on a different server
- Message unhintedMessage =
rm.makeRowMutationMessage();
+ if (unhintedMessage == null)
+ unhintedMessage = rm.makeRowMutationMessage();
if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
+ logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
if (messages == null)
@@ -165,7 +167,8 @@ public class StorageProxy implements Sto
}
else
{
- // hinted
+ // hinted messages are unique, so there is no point to
adding a hop by forwarding via another node.
+ // thus, we use sendRR/sendOneWay directly here.
Message hintedMessage = rm.makeRowMutationMessage();
for (InetAddress target : targets)
{
@@ -173,25 +176,14 @@ public class StorageProxy implements Sto
{
addHintHeader(hintedMessage, target);
if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() +
"@" + destination + " for " + target);
+ logger.debug("insert writing key " +
ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
}
}
- // (non-destination hints are part of the callback and
count towards consistency only under CL.ANY)
- // (non-destination hints are part of the callback and
count towards consistency only under CL.ANY)
+ // non-destination hints are part of the callback and
count towards consistency only under CL.ANY
if (writeEndpoints.contains(destination) ||
consistency_level == ConsistencyLevel.ANY)
MessagingService.instance().sendRR(hintedMessage,
destination, responseHandler);
else
MessagingService.instance().sendOneWay(hintedMessage, destination);
-
- Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
-
- if (messages == null)
- {
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
- }
-
- messages.put(hintedMessage, destination);
}
}
@@ -229,6 +221,9 @@ public class StorageProxy implements Sto
for (Map.Entry<Message, Collection<InetAddress>> messages:
entry.getValue().asMap().entrySet())
{
Message message = messages.getKey();
+ // a single message object is used for unhinted writes, so
clean out any forwards
+ // from previous loop iterations
+ message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
@@ -375,12 +370,13 @@ public class StorageProxy implements Sto
{
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " from " +
message.getMessageId() + "@" + dataPoint);
+ logger.debug("reading data for " + command + " from " +
dataPoint);
MessagingService.instance().sendRR(message, dataPoint,
handler);
}
// We lazy-construct the digest Message object since it may not be
necessary if we
// are doing a local digest read, or no digest reads at all.
+ Message digestMessage = null;
for (InetAddress digestPoint : endpoints.subList(1,
endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -391,9 +387,10 @@ public class StorageProxy implements Sto
}
else
{
- Message digestMessage = digestCommand.makeReadMessage();
+ if (digestMessage == null)
+ digestMessage = digestCommand.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " from
" + digestMessage.getMessageId() + "@" + digestPoint);
+ logger.debug("reading digest for " + command + " from
" + digestPoint);
MessagingService.instance().sendRR(digestMessage,
digestPoint, handler);
}
}
@@ -493,11 +490,9 @@ public class StorageProxy implements Sto
{
ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
RepairCallback<Row> handler = new RepairCallback<Row>(resolver,
endpoints);
+ Message messageRepair = command.makeReadMessage();
for (InetAddress endpoint : endpoints)
- {
- Message messageRepair = command.makeReadMessage();
MessagingService.instance().sendRR(messageRepair, endpoint,
handler);
- }
return handler;
}
@@ -546,18 +541,18 @@ public class StorageProxy implements Sto
{
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
RangeSliceCommand c2 = new
RangeSliceCommand(command.keyspace, command.column_family,
command.super_column, command.predicate, range, command.max_keys);
+ Message message = c2.getMessage();
// collect replies and resolve according to consistency
level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
ReadCallback<List<Row>> handler =
getReadCallback(resolver, command.keyspace, consistency_level);
// TODO bail early if live endpoints can't satisfy
requested consistency level
- for (InetAddress endpoint : liveEndpoints)
+ for (InetAddress endpoint : liveEndpoints)
{
- Message message = c2.getMessage();
MessagingService.instance().sendRR(message, endpoint,
handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " from " +
message.getMessageId() + "@" + endpoint);
+ logger.debug("reading " + c2 + " from " +
endpoint);
}
// TODO read repair on remaining replicas?
@@ -811,12 +806,12 @@ public class StorageProxy implements Sto
throw new UnavailableException();
IndexScanCommand command = new IndexScanCommand(keyspace,
column_family, index_clause, column_predicate, range);
+ Message message = command.getMessage();
for (InetAddress endpoint : liveEndpoints)
{
- Message message = command.getMessage();
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endpoint);
+ logger.debug("reading " + command + " from " + endpoint);
}
List<Row> theseRows;
@@ -899,11 +894,9 @@ public class StorageProxy implements Sto
// Send out the truncate calls and track the responses with the
callbacks.
logger.debug("Starting to send truncate messages to hosts {}",
allEndpoints);
Truncation truncation = new Truncation(keyspace, cfname);
+ Message message = truncation.makeTruncationMessage();
for (InetAddress endpoint : allEndpoints)
- {
- Message message = truncation.makeTruncationMessage();
MessagingService.instance().sendRR(message, endpoint,
responseHandler);
- }
// Wait for all
logger.debug("Sent all truncate messages, now waiting for {}
responses", blockFor);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -33,12 +33,12 @@ public class ReplicationFinishedVerbHand
{
private static Logger logger =
LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
- public void doVerb(Message msg)
+ public void doVerb(Message msg, String id)
{
StorageService.instance.confirmReplication(msg.getFrom());
Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
if (logger.isDebugEnabled())
- logger.debug("Replying to " + msg.getMessageId() + "@" +
msg.getFrom());
- MessagingService.instance().sendOneWay(response, msg.getFrom());
+ logger.debug("Replying to " + id + "@" + msg.getFrom());
+ MessagingService.instance().sendReply(response, id, msg.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -36,7 +36,7 @@ public class StreamReplyVerbHandler impl
{
private static Logger logger =
LoggerFactory.getLogger(StreamReplyVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
Mon Feb 7 20:28:10 2011
@@ -37,7 +37,7 @@ public class StreamRequestVerbHandler im
{
private static Logger logger =
LoggerFactory.getLogger(StreamRequestVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
if (logger.isDebugEnabled())
logger.debug("Received a StreamRequestMessage from {}",
message.getFrom());
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.
Modified:
cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.