Updated Branches:
  refs/heads/trunk da2b7d3ac -> a7b2ff65a

Fix TimeoutException when there is a firewall issue.
patch by Vijay; reviewed by jbellis for CASSANDRA-3533


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7b2ff65
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7b2ff65
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7b2ff65

Branch: refs/heads/trunk
Commit: a7b2ff65ac1946ca8e7e52e00374a83ebe922627
Parents: da2b7d3
Author: Vijay Parthasarathy <[email protected]>
Authored: Thu Apr 4 10:24:36 2013 -0700
Committer: Vijay Parthasarathy <[email protected]>
Committed: Thu Apr 4 10:53:53 2013 -0700

----------------------------------------------------------------------
 src/java/org/apache/cassandra/gms/EchoMessage.java |   29 ++++++++++
 src/java/org/apache/cassandra/gms/Gossiper.java    |   44 ++++++++++-----
 .../org/apache/cassandra/net/MessagingService.java |    4 +
 .../apache/cassandra/service/EchoVerbHandler.java  |   22 +++++++
 .../apache/cassandra/service/StorageService.java   |    1 +
 5 files changed, 86 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/gms/EchoMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EchoMessage.java 
b/src/java/org/apache/cassandra/gms/EchoMessage.java
new file mode 100644
index 0000000..3f5f566
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/EchoMessage.java
@@ -0,0 +1,29 @@
+package org.apache.cassandra.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+public class EchoMessage
+{
+    public static IVersionedSerializer<EchoMessage> serializer = new 
EchoMessageSerializer();
+
+    public static class EchoMessageSerializer implements 
IVersionedSerializer<EchoMessage>
+    {
+        public void serialize(EchoMessage t, DataOutput out, int version) 
throws IOException
+        {
+        }
+
+        public EchoMessage deserialize(DataInput in, int version) throws 
IOException
+        {
+            return new EchoMessage();
+        }
+
+        public long serializedSize(EchoMessage t, int version)
+        {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index ae920e1..04ece7a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -759,21 +761,35 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     }
 
-    private void markAlive(InetAddress addr, EndpointState localState)
+    private void markAlive(final InetAddress addr, final EndpointState 
localState)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("marking as alive {}", addr);
-        localState.markAlive();
-        localState.updateTimestamp(); // prevents doStatusCheck from racing us 
and evicting if it was down > aVeryLongTime
-        liveEndpoints.add(addr);
-        unreachableEndpoints.remove(addr);
-        expireTimeEndpointMap.remove(addr);
-        logger.debug("removing expire time for endpoint : " + addr);
-        logger.info("InetAddress {} is now UP", addr);
-        for (IEndpointStateChangeSubscriber subscriber : subscribers)
-            subscriber.onAlive(addr, localState);
-        if (logger.isTraceEnabled())
-            logger.trace("Notified " + subscribers);
+        MessageOut<EchoMessage> echoMessage = new 
MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), 
EchoMessage.serializer);
+        logger.trace("Sending a EchoMessage to {}", addr);
+        IAsyncCallback echoHandler = new IAsyncCallback()
+        {
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+
+            public void response(MessageIn msg)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("marking as alive {}", addr);
+                localState.markAlive();
+                localState.updateTimestamp(); // prevents doStatusCheck from 
racing us and evicting if it was down > aVeryLongTime
+                liveEndpoints.add(addr);
+                unreachableEndpoints.remove(addr);
+                expireTimeEndpointMap.remove(addr);
+                logger.debug("removing expire time for endpoint : " + addr);
+                logger.info("InetAddress {} is now UP", addr);
+                for (IEndpointStateChangeSubscriber subscriber : subscribers)
+                    subscriber.onAlive(addr, localState);
+                if (logger.isTraceEnabled())
+                    logger.trace("Notified " + subscribers);
+            }
+        };
+        MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
     }
 
     private void markDead(InetAddress addr, EndpointState localState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 535f54b..d4123f5 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -48,6 +48,7 @@ import 
org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
@@ -116,6 +117,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
         _TRACE, // dummy verb so we can use MS.droppedMessages
+        ECHO,
         // use as padding for backwards compatability where a previous version 
needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -152,6 +154,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
+        put(Verb.ECHO, Stage.GOSSIP);
         put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
@@ -190,6 +193,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
         put(Verb.REPLICATION_FINISHED, null);
         put(Verb.COUNTER_MUTATION, CounterMutation.serializer);
+        put(Verb.ECHO, EchoMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/service/EchoVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java 
b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
new file mode 100644
index 0000000..4f9e451
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -0,0 +1,22 @@
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.gms.EchoMessage;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EchoVerbHandler implements IVerbHandler<EchoMessage>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(EchoVerbHandler.class);
+
+    public void doVerb(MessageIn<EchoMessage> message, int id)
+    {
+        assert message.payload != null;
+        MessageOut<EchoMessage> echoMessage = new 
MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, new 
EchoMessage(), EchoMessage.serializer);
+        logger.trace("Sending a EchoMessage reply {}", message.from);
+        MessagingService.instance().sendReply(echoMessage, id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0620b7d..3c987e1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -256,6 +256,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST,
 new MigrationRequestVerbHandler());
 
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT,
 new SnapshotVerbHandler());
+        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, 
new EchoVerbHandler());
 
         // spin up the streaming service so it is available for jmx tools.
         if (StreamingService.instance == null)

Reply via email to