Author: jbellis
Date: Thu Jan 21 21:22:25 2010
New Revision: 901858

URL: http://svn.apache.org/viewvc?rev=901858&view=rev
Log:
mv tree and gossip verb registration into StorageService
patch by jbellis; reviewed by Stu Hood for CASSANDRA-717

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java 
Thu Jan 21 21:22:25 2010
@@ -51,15 +51,6 @@
         return serializer_;
     }
     
-       public static Message makeReadResponseMessage(ReadResponse 
readResponse) throws IOException
-    {
-       ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_, 
bos.toByteArray());
-        return message;
-    }
-       
        private Row row_;
        private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
     private boolean isDigestQuery_ = false;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java 
Thu Jan 21 21:22:25 2010
@@ -27,6 +27,7 @@
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import org.apache.log4j.Logger;
 
@@ -98,14 +99,6 @@
     }
 
     final static int MAX_GOSSIP_PACKET_SIZE = 1428;
-    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
-    final static String JOIN_VERB_HANDLER = "JVH";
-    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
-    final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
-    /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
-    final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
-    /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
-    final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
     public final static int intervalInMillis_ = 1000;
     private static Logger logger_ = Logger.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
@@ -136,11 +129,6 @@
         aVeryLongTime_ = 259200 * 1000;
         /* register with the Failure Detector for receiving Failure detector 
events */
         FailureDetector.instance.registerFailureDetectionEventListener(this);
-        /* register the verbs */
-        MessagingService.instance.registerVerbHandlers(JOIN_VERB_HANDLER, new 
JoinVerbHandler());
-        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, 
new GossipDigestSynVerbHandler());
-        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, 
new GossipDigestAckVerbHandler());
-        
MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new 
GossipDigestAck2VerbHandler());
     }
 
     /** Register with the Gossiper for EndPointState notifications */
@@ -271,7 +259,7 @@
         ByteArrayOutputStream bos = new 
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
StorageService.GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage 
gDigestAckMessage) throws IOException
@@ -281,7 +269,7 @@
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
             logger_.trace("@@@@ Size of GossipDigestAckMessage is " + 
bos.toByteArray().length);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
StorageService.GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message 
gDigestAck2Message) throws IOException
@@ -289,7 +277,7 @@
         ByteArrayOutputStream bos = new 
ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, 
dos);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, 
StorageService.GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
     }
 
     /**
@@ -822,197 +810,196 @@
         gossipTimer_.cancel();
         gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
     }
-}
 
-class JoinVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
-    public void doVerb(Message message)
+    public static class JoinVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isDebugEnabled())
-          logger_.debug("Received a JoinMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
+        private static Logger logger_ = Logger.getLogger( 
JoinVerbHandler.class);
 
-        JoinMessage joinMessage;
-        try
-        {
-            joinMessage = JoinMessage.serializer().deserialize(dis);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        if ( joinMessage.clusterId_.equals( 
DatabaseDescriptor.getClusterName() ) )
+        public void doVerb(Message message)
         {
-            Gossiper.instance.join(from);
+            InetAddress from = message.getFrom();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Received a JoinMessage from " + from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
+
+            JoinMessage joinMessage;
+            try
+            {
+                joinMessage = JoinMessage.serializer().deserialize(dis);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            if ( joinMessage.clusterId_.equals( 
DatabaseDescriptor.getClusterName() ) )
+            {
+                Gossiper.instance.join(from);
+            }
         }
     }
-}
-
-class GossipDigestSynVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger( 
GossipDigestSynVerbHandler.class);
 
-    public void doVerb(Message message)
+    public static class GossipDigestSynVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestSynMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
+        private static Logger logger_ = Logger.getLogger( 
GossipDigestSynVerbHandler.class);
 
-        try
+        public void doVerb(Message message)
         {
-            GossipDigestSynMessage gDigestMessage = 
GossipDigestSynMessage.serializer().deserialize(dis);
-            /* If the message is from a different cluster throw it away. */
-            if ( 
!gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
-                return;
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestSynMessage from " + 
from);
 
-            List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(gDigestList);
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
 
-            doSort(gDigestList);
+            try
+            {
+                GossipDigestSynMessage gDigestMessage = 
GossipDigestSynMessage.serializer().deserialize(dis);
+                /* If the message is from a different cluster throw it away. */
+                if ( 
!gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+                    return;
 
-            List<GossipDigest> deltaGossipDigestList = new 
ArrayList<GossipDigest>();
-            Map<InetAddress, EndPointState> deltaEpStateMap = new 
HashMap<InetAddress, EndPointState>();
-            Gossiper.instance.examineGossiper(gDigestList, 
deltaGossipDigestList, deltaEpStateMap);
+                List<GossipDigest> gDigestList = 
gDigestMessage.getGossipDigests();
+                /* Notify the Failure Detector */
+                Gossiper.instance.notifyFailureDetector(gDigestList);
 
-            GossipDigestAckMessage gDigestAck = new 
GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
-            Message gDigestAckMessage = 
Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
-            if (logger_.isTraceEnabled())
-                logger_.trace("Sending a GossipDigestAckMessage to " + from);
-            MessagingService.instance.sendOneWay(gDigestAckMessage, from);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
+                doSort(gDigestList);
 
-    /*
-     * First construct a map whose key is the endpoint in the GossipDigest and 
the value is the
-     * GossipDigest itself. Then build a list of version differences i.e 
difference between the
-     * version in the GossipDigest and the version in the local state for a 
given InetAddress.
-     * Sort this list. Now loop through the sorted list and retrieve the 
GossipDigest corresponding
-     * to the endpoint from the map that was initially constructed.
-    */
-    private void doSort(List<GossipDigest> gDigestList)
-    {
-        /* Construct a map of endpoint to GossipDigest. */
-        Map<InetAddress, GossipDigest> epToDigestMap = new 
HashMap<InetAddress, GossipDigest>();
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            epToDigestMap.put(gDigest.getEndPoint(), gDigest);
-        }
+                List<GossipDigest> deltaGossipDigestList = new 
ArrayList<GossipDigest>();
+                Map<InetAddress, EndPointState> deltaEpStateMap = new 
HashMap<InetAddress, EndPointState>();
+                Gossiper.instance.examineGossiper(gDigestList, 
deltaGossipDigestList, deltaEpStateMap);
 
-        /*
-         * These digests have their maxVersion set to the difference of the 
version
-         * of the local EndPointState and the version found in the 
GossipDigest.
-        */
-        List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            InetAddress ep = gDigest.getEndPoint();
-            EndPointState epState = 
Gossiper.instance.getEndPointStateForEndPoint(ep);
-            int version = (epState != null) ? 
Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
-            int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
-            diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), 
diffVersion) );
+                GossipDigestAckMessage gDigestAck = new 
GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+                Message gDigestAckMessage = 
Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+                if (logger_.isTraceEnabled())
+                    logger_.trace("Sending a GossipDigestAckMessage to " + 
from);
+                MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
 
-        gDigestList.clear();
-        Collections.sort(diffDigests);
-        int size = diffDigests.size();
         /*
-         * Report the digests in descending order. This takes care of the 
endpoints
-         * that are far behind w.r.t this local endpoint
+         * First construct a map whose key is the endpoint in the GossipDigest 
and the value is the
+         * GossipDigest itself. Then build a list of version differences i.e 
difference between the
+         * version in the GossipDigest and the version in the local state for 
a given InetAddress.
+         * Sort this list. Now loop through the sorted list and retrieve the 
GossipDigest corresponding
+         * to the endpoint from the map that was initially constructed.
         */
-        for ( int i = size - 1; i >= 0; --i )
+        private void doSort(List<GossipDigest> gDigestList)
         {
-            gDigestList.add( 
epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+            /* Construct a map of endpoint to GossipDigest. */
+            Map<InetAddress, GossipDigest> epToDigestMap = new 
HashMap<InetAddress, GossipDigest>();
+            for ( GossipDigest gDigest : gDigestList )
+            {
+                epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+            }
+
+            /*
+             * These digests have their maxVersion set to the difference of 
the version
+             * of the local EndPointState and the version found in the 
GossipDigest.
+            */
+            List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+            for ( GossipDigest gDigest : gDigestList )
+            {
+                InetAddress ep = gDigest.getEndPoint();
+                EndPointState epState = 
Gossiper.instance.getEndPointStateForEndPoint(ep);
+                int version = (epState != null) ? 
Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
+                int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+                diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), 
diffVersion) );
+            }
+
+            gDigestList.clear();
+            Collections.sort(diffDigests);
+            int size = diffDigests.size();
+            /*
+             * Report the digests in descending order. This takes care of the 
endpoints
+             * that are far behind w.r.t this local endpoint
+            */
+            for ( int i = size - 1; i >= 0; --i )
+            {
+                gDigestList.add( 
epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+            }
         }
     }
-}
-
-class GossipDigestAckVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = 
Logger.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(Message message)
+    public static class GossipDigestAckVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestAckMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
+        private static Logger logger_ = 
Logger.getLogger(GossipDigestAckVerbHandler.class);
 
-        try
+        public void doVerb(Message message)
         {
-            GossipDigestAckMessage gDigestAckMessage = 
GossipDigestAckMessage.serializer().deserialize(dis);
-            List<GossipDigest> gDigestList = 
gDigestAckMessage.getGossipDigestList();
-            Map<InetAddress, EndPointState> epStateMap = 
gDigestAckMessage.getEndPointStateMap();
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestAckMessage from " + 
from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
 
-            if ( epStateMap.size() > 0 )
+            try
             {
-                /* Notify the Failure Detector */
-                Gossiper.instance.notifyFailureDetector(epStateMap);
-                Gossiper.instance.applyStateLocally(epStateMap);
-            }
+                GossipDigestAckMessage gDigestAckMessage = 
GossipDigestAckMessage.serializer().deserialize(dis);
+                List<GossipDigest> gDigestList = 
gDigestAckMessage.getGossipDigestList();
+                Map<InetAddress, EndPointState> epStateMap = 
gDigestAckMessage.getEndPointStateMap();
+
+                if ( epStateMap.size() > 0 )
+                {
+                    /* Notify the Failure Detector */
+                    Gossiper.instance.notifyFailureDetector(epStateMap);
+                    Gossiper.instance.applyStateLocally(epStateMap);
+                }
+
+                /* Get the state required to send to this gossipee - construct 
GossipDigestAck2Message */
+                Map<InetAddress, EndPointState> deltaEpStateMap = new 
HashMap<InetAddress, EndPointState>();
+                for( GossipDigest gDigest : gDigestList )
+                {
+                    InetAddress addr = gDigest.getEndPoint();
+                    EndPointState localEpStatePtr = 
Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+                    if ( localEpStatePtr != null )
+                        deltaEpStateMap.put(addr, localEpStatePtr);
+                }
 
-            /* Get the state required to send to this gossipee - construct 
GossipDigestAck2Message */
-            Map<InetAddress, EndPointState> deltaEpStateMap = new 
HashMap<InetAddress, EndPointState>();
-            for( GossipDigest gDigest : gDigestList )
+                GossipDigestAck2Message gDigestAck2 = new 
GossipDigestAck2Message(deltaEpStateMap);
+                Message gDigestAck2Message = 
Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+                if (logger_.isTraceEnabled())
+                    logger_.trace("Sending a GossipDigestAck2Message to " + 
from);
+                MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+            }
+            catch ( IOException e )
             {
-                InetAddress addr = gDigest.getEndPoint();
-                EndPointState localEpStatePtr = 
Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
-                if ( localEpStatePtr != null )
-                    deltaEpStateMap.put(addr, localEpStatePtr);
+                throw new RuntimeException(e);
             }
-
-            GossipDigestAck2Message gDigestAck2 = new 
GossipDigestAck2Message(deltaEpStateMap);
-            Message gDigestAck2Message = 
Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
-            if (logger_.isTraceEnabled())
-                logger_.trace("Sending a GossipDigestAck2Message to " + from);
-            MessagingService.instance.sendOneWay(gDigestAck2Message, from);
-        }
-        catch ( IOException e )
-        {
-            throw new RuntimeException(e);
         }
     }
-}
 
-class GossipDigestAck2VerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = 
Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
-    public void doVerb(Message message)
+    public static class GossipDigestAck2VerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestAck2Message from " + from);
+        private static Logger logger_ = 
Logger.getLogger(GossipDigestAck2VerbHandler.class);
 
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
-        GossipDigestAck2Message gDigestAck2Message;
-        try
+        public void doVerb(Message message)
         {
-            gDigestAck2Message = 
GossipDigestAck2Message.serializer().deserialize(dis);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestAck2Message from " + 
from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new 
ByteArrayInputStream(bytes) );
+            GossipDigestAck2Message gDigestAck2Message;
+            try
+            {
+                gDigestAck2Message = 
GossipDigestAck2Message.serializer().deserialize(dis);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            Map<InetAddress, EndPointState> remoteEpStateMap = 
gDigestAck2Message.getEndPointStateMap();
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
+            Gossiper.instance.applyStateLocally(remoteEpStateMap);
         }
-        Map<InetAddress, EndPointState> remoteEpStateMap = 
gDigestAck2Message.getEndPointStateMap();
-        /* Notify the Failure Detector */
-        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
-        Gossiper.instance.applyStateLocally(remoteEpStateMap);
     }
-}
-
+}
\ No newline at end of file

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java 
Thu Jan 21 21:22:25 2010
@@ -26,6 +26,7 @@
 
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
 
 public class Message
 {
@@ -121,7 +122,7 @@
     // TODO should take byte[] + length so we don't have to copy to a byte[] 
of exactly the right len
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, 
StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_);
+        Header header = new Header(getMessageId(), from, 
StageManager.RESPONSE_STAGE, StorageService.responseVerbHandler_);
         return new Message(header, args);
     }
     

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
 Thu Jan 21 21:22:25 2010
@@ -105,11 +105,6 @@
                                                                         new 
NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
 
         streamExecutor_ = new 
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
-                
-        /* register the response verb handler */
-        registerVerbHandlers(MessagingService.responseVerbHandler_, new 
ResponseVerbHandler());
-
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
     
     public byte[] hash(String type, byte data[])

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 Thu Jan 21 21:22:25 2010
@@ -20,7 +20,7 @@
 
 import org.apache.log4j.Logger;
 
-class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = Logger.getLogger( 
ResponseVerbHandler.class );
     

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Thu Jan 21 21:22:25 2010
@@ -89,9 +89,6 @@
 {
     private static final Logger logger = 
Logger.getLogger(AntiEntropyService.class);
 
-    public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
-    public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
-
     // millisecond lifetime to store trees before they become stale
     public final static long TREE_STORE_TIMEOUT = 600000;
     // max millisecond frequency that natural (automatic) repairs should run at
@@ -120,8 +117,6 @@
      */
     protected AntiEntropyService()
     {
-        MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new 
TreeRequestVerbHandler());
-        MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new 
TreeResponseVerbHandler());
         naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
         trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
     }
@@ -662,7 +657,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(new CFPair(table, cf), dos);
-                return new Message(FBUtilities.getLocalAddress(), 
StageManager.AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
+                return new Message(FBUtilities.getLocalAddress(), 
StageManager.AE_SERVICE_STAGE, StorageService.TREE_REQUEST_VERB, 
bos.toByteArray());
             }
             catch(IOException e)
             {
@@ -720,7 +715,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos);
-                return new Message(local, StageManager.AE_SERVICE_STAGE, 
TREE_RESPONSE_VERB, bos.toByteArray());
+                return new Message(local, StageManager.AE_SERVICE_STAGE, 
StorageService.TREE_RESPONSE_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Thu Jan 21 21:22:25 2010
@@ -85,6 +85,17 @@
     public final static String streamRequestVerbHandler_ = 
"BS-METADATA-VERB-HANDLER";
     public final static String rangeSliceVerbHandler_ = 
"RANGE-SLICE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = 
"SPLITS-VERB-HANDLER";
+    public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
+    public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
+    public static final String responseVerbHandler_ = "RESPONSE";
+    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+    public final static String JOIN_VERB = "JVH";
+    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+    public final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+    /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+    public final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+    /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+    public final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
 
     private static IPartitioner partitioner_ = 
DatabaseDescriptor.getPartitioner();
 
@@ -199,6 +210,14 @@
         
MessagingService.instance.registerVerbHandlers(streamInitiateVerbHandler_, new 
Streaming.StreamInitiateVerbHandler());
         
MessagingService.instance.registerVerbHandlers(streamInitiateDoneVerbHandler_, 
new Streaming.StreamInitiateDoneVerbHandler());
         
MessagingService.instance.registerVerbHandlers(streamFinishedVerbHandler_, new 
Streaming.StreamFinishedVerbHandler());
+        MessagingService.instance.registerVerbHandlers(responseVerbHandler_, 
new ResponseVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new 
TreeRequestVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new 
AntiEntropyService.TreeResponseVerbHandler());
+
+        MessagingService.instance.registerVerbHandlers(JOIN_VERB, new 
Gossiper.JoinVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, 
new Gossiper.GossipDigestSynVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, 
new Gossiper.GossipDigestAckVerbHandler());
+        
MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new 
Gossiper.GossipDigestAck2VerbHandler());
 
         replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
     }


Reply via email to