Author: jbellis
Date: Mon Sep 6 22:22:50 2010
New Revision: 993166
URL: http://svn.apache.org/viewvc?rev=993166&view=rev
Log:
derive stage from verb instead of transmitting it for each message
patch by jbellis; reviewed by Nate McCall for CASSANDRA-1465
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 6 22:22:50 2010
@@ -54,6 +54,8 @@ dev
* remove failed bootstrap attempt from pending ranges when gossip times
it out after 1h (CASSANDRA-1463)
* eager-create tcp connections to other cluster members (CASSANDRA-1465)
+ * enumerate stages and derive stage from message type instead of
+ transmitting separately (CASSANDRA-1465)
* apply reversed flag during collation from different data sources
(CASSANDRA-1450)
Modified: cassandra/trunk/NEWS.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Mon Sep 6 22:22:50 2010
@@ -33,9 +33,9 @@ Upgrading
to http://wiki.apache.org/cassandra/ClientOptions for a list of
higher-level clients that have been updated to support the 0.7 API.
- The Cassandra inter-node protocol is incompatible with 0.6.x releases,
- meaning you will have to bring your cluster down prior to upgrading;
- you cannot mix 0.6 and 0.7 nodes.
+ The Cassandra inter-node protocol is incompatible with 0.6.x
+ releases (and with 0.7 beta1), meaning you will have to bring your
+ cluster down prior to upgrading: you cannot mix 0.6 and 0.7 nodes.
The hints schema was changed from 0.6 to 0.7. Cassandra automatically
snapshots and then truncates the hints column family as part of
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Mon
Sep 6 22:22:50 2010
@@ -68,7 +68,6 @@ public class IndexScanCommand
throw new IOError(e);
}
return new Message(FBUtilities.getLocalAddress(),
- Stage.READ,
StorageService.Verb.INDEX_SCAN,
Arrays.copyOf(dob.getData(), dob.getLength()));
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon
Sep 6 22:22:50 2010
@@ -91,7 +91,6 @@ public class RangeSliceCommand
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob);
return new Message(FBUtilities.getLocalAddress(),
- Stage.READ,
StorageService.Verb.RANGE_SLICE,
Arrays.copyOf(dob.getData(), dob.getLength()));
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Sep
6 22:22:50 2010
@@ -53,7 +53,7 @@ public abstract class ReadCommand
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), Stage.READ,
StorageService.Verb.READ, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.READ, bos.toByteArray());
}
public final QueryPath queryPath;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Sep
6 22:22:50 2010
@@ -216,7 +216,7 @@ public class RowMutation
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
verb, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), verb,
bos.toByteArray());
}
public static RowMutation getRowMutationFromMutations(String keyspace,
byte[] key, Map<String, List<Mutation>> cfmap)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
Mon Sep 6 22:22:50 2010
@@ -52,7 +52,7 @@ public class RowMutationMessage
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
RowMutationMessage.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
verb, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), verb,
bos.toByteArray());
}
@XmlElement(name="RowMutation")
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Sep 6
22:22:50 2010
@@ -74,16 +74,7 @@ public class Truncation
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
StorageService.Verb.TRUNCATE,
- bos.toByteArray());
- }
-
-
- public DataOutputBuffer getSerializedBuffer() throws IOException
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Truncation.serializer().serialize(this, buffer);
- return buffer;
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.TRUNCATE, bos.toByteArray());
}
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Sep
6 22:22:50 2010
@@ -172,7 +172,7 @@ public class BootStrapper
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
- Message message = new Message(FBUtilities.getLocalAddress(),
Stage.MISC, StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message message = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
MessagingService.instance.sendRR(message, maxEndpoint, btc);
return btc.getToken();
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Sep 6
22:22:50 2010
@@ -301,7 +301,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
+ return new Message(localEndpoint_,
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
}
Message makeGossipDigestAckMessage(GossipDigestAckMessage
gDigestAckMessage) throws IOException
@@ -311,7 +311,7 @@ public class Gossiper implements IFailur
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
logger_.trace("@@@@ Size of GossipDigestAckMessage is " +
bos.toByteArray().length);
- return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+ return new Message(localEndpoint_,
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
}
Message makeGossipDigestAck2Message(GossipDigestAck2Message
gDigestAck2Message) throws IOException
@@ -319,7 +319,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message,
dos);
- return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
+ return new Message(localEndpoint_,
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Sep 6
22:22:50 2010
@@ -46,33 +46,30 @@ public class Header
private InetAddress from_;
// TODO STAGE can be determined from verb
- private Stage type_;
private StorageService.Verb verb_;
private String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
- Header(String id, InetAddress from, Stage messageType, StorageService.Verb
verb)
+ Header(String id, InetAddress from, StorageService.Verb verb)
{
assert id != null;
assert from != null;
- assert messageType != null;
assert verb != null;
messageId_ = id;
from_ = from;
- type_ = messageType;
- verb_ = verb;
+ verb_ = verb;
}
- Header(String id, InetAddress from, Stage messageType, StorageService.Verb
verb, Map<String, byte[]> details)
+ Header(String id, InetAddress from, StorageService.Verb verb, Map<String,
byte[]> details)
{
- this(id, from, messageType, verb);
+ this(id, from, verb);
details_ = details;
}
- Header(InetAddress from, Stage messageType, StorageService.Verb verb)
+ Header(InetAddress from, StorageService.Verb verb)
{
- this(Integer.toString(idGen_.incrementAndGet()), from, messageType,
verb);
+ this(Integer.toString(idGen_.incrementAndGet()), from, verb);
}
InetAddress getFrom()
@@ -80,11 +77,6 @@ public class Header
return from_;
}
- Stage getMessageType()
- {
- return type_;
- }
-
StorageService.Verb getVerb()
{
return verb_;
@@ -117,7 +109,6 @@ class HeaderSerializer implements ICompa
{
dos.writeUTF(t.getMessageId());
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
- dos.writeInt(t.getMessageType().ordinal());
dos.writeInt(t.getVerb().ordinal());
/* Serialize the message header */
@@ -138,7 +129,6 @@ class HeaderSerializer implements ICompa
{
String id = dis.readUTF();
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
- int typeOrdinal = dis.readInt();
int verbOrdinal = dis.readInt();
/* Deserializing the message header */
@@ -153,7 +143,7 @@ class HeaderSerializer implements ICompa
details.put(key, bytes);
}
- return new Header(id, from, Stage.values()[typeOrdinal],
StorageService.VERBS[verbOrdinal], details);
+ return new Header(id, from, StorageService.VERBS[verbOrdinal],
details);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Sep 6
22:22:50 2010
@@ -24,14 +24,13 @@ import java.io.IOException;
import java.net.InetAddress;
import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.service.StorageService;
public class Message
{
private static MessageSerializer serializer_;
-
+
static
{
serializer_ = new MessageSerializer();
@@ -54,9 +53,9 @@ public class Message
body_ = body;
}
- public Message(InetAddress from, Stage messageType, StorageService.Verb
verb, byte[] body)
+ public Message(InetAddress from, StorageService.Verb verb, byte[] body)
{
- this(new Header(from, messageType, verb), body);
+ this(new Header(from, verb), body);
}
public byte[] getHeader(Object key)
@@ -81,7 +80,7 @@ public class Message
public Stage getMessageType()
{
- return header_.getMessageType();
+ return StorageService.verbStages.get(getVerb());
}
public StorageService.Verb getVerb()
@@ -102,7 +101,7 @@ public class Message
// TODO should take byte[] + length so we don't have to copy to a byte[]
of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
- Header header = new Header(getMessageId(), from, Stage.RESPONSE,
StorageService.Verb.READ_RESPONSE);
+ Header header = new Header(getMessageId(), from,
StorageService.Verb.READ_RESPONSE);
return new Message(header, args);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon Sep 6 22:22:50 2010
@@ -578,10 +578,7 @@ public class AntiEntropyService
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(request, dos);
- return new Message(FBUtilities.getLocalAddress(),
- Stage.AE_SERVICE,
- StorageService.Verb.TREE_REQUEST,
- bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.TREE_REQUEST, bos.toByteArray());
}
catch(IOException e)
{
@@ -644,7 +641,7 @@ public class AntiEntropyService
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(validator, dos);
- return new Message(local, Stage.AE_SERVICE,
StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
+ return new Message(local, StorageService.Verb.TREE_RESPONSE,
bos.toByteArray());
}
catch(IOException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Mon Sep 6 22:22:50 2010
@@ -165,7 +165,7 @@ class ConsistencyChecker implements Runn
ReadResponse.serializer().serialize(readResponse, out);
byte[] bytes = new byte[out.getLength()];
System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- responses_.add(new Message(FBUtilities.getLocalAddress(),
Stage.RESPONSE, StorageService.Verb.READ_RESPONSE, bytes));
+ responses_.add(new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.READ_RESPONSE, bytes));
}
// synchronized so the " == majority" is safe
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Mon Sep 6 22:22:50 2010
@@ -180,7 +180,7 @@ public class MigrationManager implements
private static Message makeVersionMessage(UUID version)
{
byte[] body = version.toString().getBytes();
- return new Message(FBUtilities.getLocalAddress(), Stage.READ,
StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
}
// other half of transformation is in DefinitionsUpdateResponseVerbHandler.
@@ -199,7 +199,7 @@ public class MigrationManager implements
}
dout.close();
byte[] body = bout.toByteArray();
- return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
}
// other half of this transformation is in MigrationManager.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon
Sep 6 22:22:50 2010
@@ -494,7 +494,7 @@ public class StorageProxy implements Sto
final String myVersion =
DatabaseDescriptor.getDefsVersion().toString();
final Map<InetAddress, UUID> versions = new
ConcurrentHashMap<InetAddress, UUID>();
final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
- final Message msg = new Message(FBUtilities.getLocalAddress(),
Stage.MIGRATION, StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+ final Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
// an empty message acts as a request to the SchemaCheckVerbHandler.
MessagingService.instance.sendRR(msg, liveHosts.toArray(new
InetAddress[]{}), new IAsyncCallback()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon Sep 6 22:22:50 2010
@@ -130,6 +130,29 @@ public class StorageService implements I
}
public static final Verb[] VERBS = Verb.values();
+ public static final EnumMap<StorageService.Verb, Stage> verbStages = new
EnumMap<StorageService.Verb, Stage>(StorageService.Verb.class)
+ {{
+ put(Verb.MUTATION, Stage.MUTATION);
+ put(Verb.BINARY, Stage.MUTATION);
+ put(Verb.READ, Stage.READ);
+ put(Verb.READ_RESPONSE, Stage.RESPONSE);
+ put(Verb.STREAM_STATUS, Stage.MISC); // TODO does this really belong
on misc? I've just copied old behavior here
+ put(Verb.STREAM_REQUEST, Stage.STREAM);
+ put(Verb.RANGE_SLICE, Stage.READ);
+ put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
+ put(Verb.TREE_REQUEST, Stage.AE_SERVICE);
+ put(Verb.TREE_RESPONSE, Stage.RESPONSE);
+ put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
+ put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
+ put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
+ put(Verb.DEFINITIONS_ANNOUNCE, Stage.READ);
+ put(Verb.DEFINITIONS_UPDATE_RESPONSE, Stage.READ);
+ put(Verb.TRUNCATE, Stage.MUTATION);
+ put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
+ put(Verb.INDEX_SCAN, Stage.READ);
+ }};
+
+
private static IPartitioner partitioner_ =
DatabaseDescriptor.getPartitioner();
public static final StorageService instance = new StorageService();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Mon
Sep 6 22:22:50 2010
@@ -95,7 +95,7 @@ class FileStatus
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
FileStatus.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), Stage.MISC,
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
}
private static class FileStatusSerializer implements
ICompactSerializer<FileStatus>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=993166&r1=993165&r2=993166&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
Mon Sep 6 22:22:50 2010
@@ -97,7 +97,7 @@ class StreamRequestMessage
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(), Stage.STREAM,
StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
}
public String toString()