Updated Branches: refs/heads/trunk da2b7d3ac -> a7b2ff65a
Fix TimeoutException when there is a firewall issue. patch by Vijay; reviewed by jbellis for CASSANDRA-3533 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7b2ff65 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7b2ff65 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7b2ff65 Branch: refs/heads/trunk Commit: a7b2ff65ac1946ca8e7e52e00374a83ebe922627 Parents: da2b7d3 Author: Vijay Parthasarathy <[email protected]> Authored: Thu Apr 4 10:24:36 2013 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Thu Apr 4 10:53:53 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/EchoMessage.java | 29 ++++++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 44 ++++++++++----- .../org/apache/cassandra/net/MessagingService.java | 4 + .../apache/cassandra/service/EchoVerbHandler.java | 22 +++++++ .../apache/cassandra/service/StorageService.java | 1 + 5 files changed, 86 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/gms/EchoMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EchoMessage.java b/src/java/org/apache/cassandra/gms/EchoMessage.java new file mode 100644 index 0000000..3f5f566 --- /dev/null +++ b/src/java/org/apache/cassandra/gms/EchoMessage.java @@ -0,0 +1,29 @@ +package org.apache.cassandra.gms; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.io.IVersionedSerializer; + +public class EchoMessage +{ + public static IVersionedSerializer<EchoMessage> serializer = new EchoMessageSerializer(); + + public static class EchoMessageSerializer implements IVersionedSerializer<EchoMessage> + { + public void serialize(EchoMessage t, DataOutput out, int version) throws IOException + { + } + + public EchoMessage deserialize(DataInput in, int version) throws IOException + { + return new EchoMessage(); + } + + public long serializedSize(EchoMessage t, int version) + { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index ae920e1..04ece7a 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -759,21 +761,35 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } - private void markAlive(InetAddress addr, EndpointState localState) + private void markAlive(final InetAddress addr, final EndpointState localState) { - if (logger.isTraceEnabled()) - logger.trace("marking as alive {}", addr); - localState.markAlive(); - localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime - liveEndpoints.add(addr); - unreachableEndpoints.remove(addr); - expireTimeEndpointMap.remove(addr); - logger.debug("removing expire time for endpoint : " + addr); - logger.info("InetAddress {} is now UP", addr); - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onAlive(addr, localState); - if (logger.isTraceEnabled()) - logger.trace("Notified " + subscribers); + MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer); + logger.trace("Sending a EchoMessage to {}", addr); + IAsyncCallback echoHandler = new IAsyncCallback() + { + public boolean isLatencyForSnitch() + { + return false; + } + + public void response(MessageIn msg) + { + if (logger.isTraceEnabled()) + logger.trace("marking as alive {}", addr); + localState.markAlive(); + localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime + liveEndpoints.add(addr); + unreachableEndpoints.remove(addr); + expireTimeEndpointMap.remove(addr); + logger.debug("removing expire time for endpoint : " + addr); + logger.info("InetAddress {} is now UP", addr); + for (IEndpointStateChangeSubscriber subscriber : subscribers) + subscriber.onAlive(addr, localState); + if (logger.isTraceEnabled()) + logger.trace("Notified " + subscribers); + } + }; + MessagingService.instance().sendRR(echoMessage, addr, echoHandler); } private void markDead(InetAddress addr, EndpointState localState) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 535f54b..d4123f5 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -48,6 +48,7 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.*; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.EchoMessage; import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestAck2; import org.apache.cassandra.gms.GossipDigestSyn; @@ -116,6 +117,7 @@ public final class MessagingService implements MessagingServiceMBean MIGRATION_REQUEST, GOSSIP_SHUTDOWN, _TRACE, // dummy verb so we can use MS.droppedMessages + ECHO, // use as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2, @@ -152,6 +154,7 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE); put(Verb.COUNTER_MUTATION, Stage.MUTATION); put(Verb.SNAPSHOT, Stage.MISC); + put(Verb.ECHO, Stage.GOSSIP); put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); @@ -190,6 +193,7 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.INDEX_SCAN, IndexScanCommand.serializer); put(Verb.REPLICATION_FINISHED, null); put(Verb.COUNTER_MUTATION, CounterMutation.serializer); + put(Verb.ECHO, EchoMessage.serializer); }}; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/service/EchoVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java new file mode 100644 index 0000000..4f9e451 --- /dev/null +++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java @@ -0,0 +1,22 @@ +package org.apache.cassandra.service; + +import org.apache.cassandra.gms.EchoMessage; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EchoVerbHandler implements IVerbHandler<EchoMessage> +{ + private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class); + + public void doVerb(MessageIn<EchoMessage> message, int id) + { + assert message.payload != null; + MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, new EchoMessage(), EchoMessage.serializer); + logger.trace("Sending a EchoMessage reply {}", message.from); + MessagingService.instance().sendReply(echoMessage, id, message.from); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b2ff65/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0620b7d..3c987e1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -256,6 +256,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler()); // spin up the streaming service so it is available for jmx tools. if (StreamingService.instance == null)
