Author: jbellis
Date: Sat Oct 30 23:48:20 2010
New Revision: 1029224
URL: http://svn.apache.org/viewvc?rev=1029224&view=rev
Log:
add INTERNAL_RESPONSE verb to differentiate from responses related to client
requests. patch by jbellis; reviewed by Stu Hood for CASSANDRA-1685
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Sat Oct 30 23:48:20 2010
@@ -6,6 +6,8 @@ dev
* fix IntegerType.getString (CASSANDRA-1681)
* log tpstats when dropping messages (CASSANDRA-1660)
* make -Djava.net.preferIPv4Stack=true the default (CASSANDRA-628)
+ * add INTERNAL_RESPONSE verb to differentiate from responses related
+ to client requests (CASSANDRA-1685)
0.7.0-beta3
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
Sat Oct 30 23:48:20 2010
@@ -27,10 +27,11 @@ public enum Stage
MUTATION,
STREAM,
GOSSIP,
- RESPONSE,
+ REQUEST_RESPONSE,
ANTIENTROPY,
MIGRATION,
- MISC;
+ MISC,
+ INTERNAL_RESPONSE;
public String getJmxType()
{
@@ -41,10 +42,11 @@ public enum Stage
case MIGRATION:
case MISC:
case STREAM:
+ case INTERNAL_RESPONSE:
return "internal";
case MUTATION:
case READ:
- case RESPONSE:
+ case REQUEST_RESPONSE:
return "request";
default:
throw new AssertionError("Unknown stage " + this);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
Sat Oct 30 23:48:20 2010
@@ -42,7 +42,8 @@ public class StageManager
{
stages.put(Stage.MUTATION,
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ,
getConcurrentReaders()));
- stages.put(Stage.RESPONSE, multiThreadedStage(Stage.RESPONSE,
Math.max(2, Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.REQUEST_RESPONSE,
multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.INTERNAL_RESPONSE,
multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(1,
Runtime.getRuntime().availableProcessors())));
// the rest are all single-threaded
stages.put(Stage.STREAM, new
JMXEnabledThreadPoolExecutor(Stage.STREAM));
stages.put(Stage.GOSSIP, new
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Sat Oct 30 23:48:20 2010
@@ -31,11 +31,10 @@ public class SchemaCheckVerbHandler impl
{
private final Logger logger =
LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
- @Override
public void doVerb(Message message)
{
logger.debug("Received schema check request.");
- Message response = message.getReply(FBUtilities.getLocalAddress(),
DatabaseDescriptor.getDefsVersion().toString().getBytes());
+ Message response =
message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
MessagingService.instance.sendOneWay(response, message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
Sat Oct 30 23:48:20 2010
@@ -254,7 +254,7 @@ public class BootStrapper
{
StorageService ss = StorageService.instance;
String tokenString =
StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
- Message response = message.getReply(FBUtilities.getLocalAddress(),
tokenString.getBytes(Charsets.UTF_8));
+ Message response =
message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
MessagingService.instance.sendOneWay(response, message.getFrom());
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
Sat Oct 30 23:48:20 2010
@@ -29,7 +29,6 @@ import java.lang.management.ManagementFa
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractStatsDeque;
@@ -181,7 +180,7 @@ public class DynamicEndpointSnitch exten
{
if (!registered)
{
- ILatencyPublisher handler =
(ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+ ILatencyPublisher handler =
(ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
if (handler != null)
{
handler.register(this);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
Sat Oct 30 23:48:20 2010
@@ -26,6 +26,7 @@ import java.net.InetAddress;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
public class Message
{
@@ -101,10 +102,16 @@ public class Message
// TODO should take byte[] + length so we don't have to copy to a byte[]
of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
- Header header = new Header(getMessageId(), from,
StorageService.Verb.READ_RESPONSE);
+ Header header = new Header(getMessageId(), from,
StorageService.Verb.REQUEST_RESPONSE);
return new Message(header, args);
}
-
+
+ public Message getInternalReply(byte[] body)
+ {
+ Header header = new Header(getMessageId(),
FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+ return new Message(header, body);
+ }
+
public String toString()
{
StringBuilder sbuf = new StringBuilder("");
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Sat Oct 30 23:48:20 2010
@@ -42,7 +42,9 @@ class OutboundTcpConnectionPool
OutboundTcpConnection getConnection(Message msg)
{
Stage stage = msg.getMessageType();
- return stage == Stage.RESPONSE || stage == Stage.GOSSIP ? ackCon :
cmdCon;
+ return stage == Stage.REQUEST_RESPONSE || stage ==
Stage.INTERNAL_RESPONSE || stage == Stage.GOSSIP
+ ? ackCon
+ : cmdCon;
}
synchronized void reset()
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Sat Oct 30 23:48:20 2010
@@ -23,7 +23,6 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@@ -33,8 +32,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadCommand;
@@ -166,7 +163,7 @@ class ConsistencyChecker implements Runn
ReadResponse.serializer().serialize(readResponse, out);
byte[] bytes = new byte[out.getLength()];
System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- responses_.add(new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.READ_RESPONSE, bytes));
+ responses_.add(new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.INTERNAL_RESPONSE, bytes));
}
// synchronized so the " == majority" is safe
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sat Oct 30 23:48:20 2010
@@ -148,7 +148,7 @@ public class StorageLoadBalancer impleme
{
public void doVerb(Message message)
{
- Message reply = message.getReply(FBUtilities.getLocalAddress(),
new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+ Message reply = message.getInternalReply(new byte[]
{(byte)(isMoveable_.get() ? 1 : 0)});
MessagingService.instance.sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Sat Oct 30 23:48:20 2010
@@ -145,7 +145,7 @@ public class StorageService implements I
BINARY,
READ_REPAIR,
READ,
- READ_RESPONSE,
+ REQUEST_RESPONSE, // client-initiated reads and writes
STREAM_INITIATE, // Deprecated
STREAM_INITIATE_DONE, // Deprecated
STREAM_REPLY,
@@ -164,6 +164,7 @@ public class StorageService implements I
SCHEMA_CHECK,
INDEX_SCAN,
REPLICATION_FINISHED,
+ INTERNAL_RESPONSE, // responses to internal calls
;
// remember to add new verbs at the end, since we serialize by ordinal
}
@@ -175,7 +176,7 @@ public class StorageService implements I
put(Verb.BINARY, Stage.MUTATION);
put(Verb.READ_REPAIR, Stage.MUTATION);
put(Verb.READ, Stage.READ);
- put(Verb.READ_RESPONSE, Stage.RESPONSE);
+ put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on
misc? I've just copied old behavior here
put(Verb.STREAM_REQUEST, Stage.STREAM);
put(Verb.RANGE_SLICE, Stage.READ);
@@ -191,6 +192,7 @@ public class StorageService implements I
put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
put(Verb.INDEX_SCAN, Stage.READ);
put(Verb.REPLICATION_FINISHED, Stage.MISC);
+ put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
}};
@@ -284,7 +286,8 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler() );
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new
StreamReplyVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new
ReplicationFinishedVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new
ResponseVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.REQUEST_RESPONSE,
new ResponseVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.INTERNAL_RESPONSE,
new ResponseVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new
TreeRequestVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new
AntiEntropyService.TreeResponseVerbHandler());
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Sat Oct 30 23:48:20 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
*
*/
+import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,7 @@ public class ReplicationFinishedVerbHand
public void doVerb(Message msg)
{
StorageService.instance.confirmReplication(msg.getFrom());
- Message response = msg.getReply(FBUtilities.getLocalAddress(), new
byte[]{});
+ Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
if (logger.isDebugEnabled())
logger.debug("Replying to " + msg.getMessageId() + "@" +
msg.getFrom());
MessagingService.instance.sendOneWay(response, msg.getFrom());