Updated Branches: refs/heads/trunk b1f3fc00f -> 4cda3622d
Race condition in detecting version on a mixed 1.1/1.2 cluster patch by Sergio Bossa; reviewed by jasobrown for CASSANDRA-5692 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/296da81f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/296da81f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/296da81f Branch: refs/heads/trunk Commit: 296da81fc809f71e8e08bda612ba89925880fb6c Parents: 9668535 Author: Jason Brown <[email protected]> Authored: Thu Jun 27 12:54:25 2013 -0700 Committer: Jason Brown <[email protected]> Committed: Thu Jun 27 13:32:50 2013 -0700 ---------------------------------------------------------------------- .../cassandra/net/OutboundTcpConnection.java | 60 +++++++++++++++++++- 1 file changed, 57 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/296da81f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 7077922..648123b 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -27,7 +27,10 @@ import java.net.SocketException; import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -50,6 +53,8 @@ public class OutboundTcpConnection extends Thread private volatile boolean isStopped = false; private static final int OPEN_RETRY_DELAY = 100; // ms between retries + private static final int WAIT_FOR_VERSION_MAX_TIME = 5000; + private static final int NO_VERSION = Integer.MIN_VALUE; // sending thread reads from "active" (one of queue1, queue2) until it is empty. // then it swaps it with "backlog." @@ -288,11 +293,10 @@ public class OutboundTcpConnection extends Thread if (logger.isDebugEnabled()) logger.debug("attempting to connect to " + poolReference.endPoint()); - targetVersion = MessagingService.instance().getVersion(poolReference.endPoint()); - long start = System.currentTimeMillis(); while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout()) { + targetVersion = MessagingService.instance().getVersion(poolReference.endPoint()); try { socket = poolReference.newSocket(); @@ -325,7 +329,16 @@ public class OutboundTcpConnection extends Thread out.flush(); DataInputStream in = new DataInputStream(socket.getInputStream()); - int maxTargetVersion = in.readInt(); + int maxTargetVersion = handshakeVersion(in); + if (maxTargetVersion == NO_VERSION) + { + // no version is returned, so disconnect an try again: we will either get + // a different target version (targetVersion < MessagingService.VERSION_12) + // or if the same version the handshake will finally succeed + logger.debug("Target max version is {}; no version information yet, will retry", maxTargetVersion); + disconnect(); + continue; + } if (targetVersion > maxTargetVersion) { logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion); @@ -371,6 +384,47 @@ public class OutboundTcpConnection extends Thread } return false; } + + private int handshakeVersion(final DataInputStream inputStream) + { + final AtomicInteger version = new AtomicInteger(NO_VERSION); + final CountDownLatch versionLatch = new CountDownLatch(1); + new Thread("HANDSHAKE-" + poolReference.endPoint()) + { + @Override + public void run() + { + try + { + logger.info("Handshaking version with {}", poolReference.endPoint()); + version.set(inputStream.readInt()); + } + catch (IOException ex) + { + final String msg = "Cannot handshake version with " + poolReference.endPoint(); + if (logger.isTraceEnabled()) + logger.trace(msg, ex); + else + logger.info(msg); + } + finally + { + //unblock the waiting thread on either success or fail + versionLatch.countDown(); + } + } + }.start(); + + try + { + versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + return version.get(); + } private void expireMessages() {
