Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cda3622
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cda3622
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cda3622
Branch: refs/heads/trunk
Commit: 4cda3622d0a86d45d73a31576fc8b39b9e66928d
Parents: b1f3fc0 296da81
Author: Jason Brown <[email protected]>
Authored: Thu Jun 27 13:37:27 2013 -0700
Committer: Jason Brown <[email protected]>
Committed: Thu Jun 27 13:37:27 2013 -0700
----------------------------------------------------------------------
.../cassandra/net/OutboundTcpConnection.java | 59 +++++++++++++++++++-
1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cda3622/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4c6f498,648123b..1bdead2
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -285,12 -293,10 +289,11 @@@ public class OutboundTcpConnection exte
if (logger.isDebugEnabled())
logger.debug("attempting to connect to " +
poolReference.endPoint());
- targetVersion =
MessagingService.instance().getVersion(poolReference.endPoint());
-
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() < start +
DatabaseDescriptor.getRpcTimeout())
+ long start = System.nanoTime();
+ long timeout =
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
+ while (System.nanoTime() - start < timeout)
{
+ targetVersion =
MessagingService.instance().getVersion(poolReference.endPoint());
try
{
socket = poolReference.newSocket();
@@@ -316,35 -322,47 +319,44 @@@
}
out = new DataOutputStream(new
BufferedOutputStream(socket.getOutputStream(), 4096));
- if (targetVersion >= MessagingService.VERSION_12)
- {
- out.writeInt(MessagingService.PROTOCOL_MAGIC);
- writeHeader(out, targetVersion,
shouldCompressConnection());
- out.flush();
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ writeHeader(out, targetVersion, shouldCompressConnection());
+ out.flush();
- DataInputStream in = new
DataInputStream(socket.getInputStream());
- int maxTargetVersion = handshakeVersion(in);
- if (maxTargetVersion == NO_VERSION)
- {
- // no version is returned, so disconnect an try
again: we will either get
- // a different target version (targetVersion <
MessagingService.VERSION_12)
- // or if the same version the handshake will finally
succeed
- logger.debug("Target max version is {}; no version
information yet, will retry", maxTargetVersion);
- disconnect();
- continue;
- }
- if (targetVersion > maxTargetVersion)
- {
- logger.debug("Target max version is {}; will
reconnect with that version", maxTargetVersion);
-
MessagingService.instance().setVersion(poolReference.endPoint(),
maxTargetVersion);
- disconnect();
- return false;
- }
+ DataInputStream in = new
DataInputStream(socket.getInputStream());
- int maxTargetVersion = in.readInt();
++ int maxTargetVersion = handshakeVersion(in);
++ if (maxTargetVersion == NO_VERSION)
++ {
++ // no version is returned, so disconnect an try again: we
will either get
++ // a different target version (targetVersion <
MessagingService.VERSION_12)
++ // or if the same version the handshake will finally
succeed
++ logger.debug("Target max version is {}; no version
information yet, will retry", maxTargetVersion);
++ disconnect();
++ continue;
++ }
+ if (targetVersion > maxTargetVersion)
+ {
+ logger.debug("Target max version is {}; will reconnect
with that version", maxTargetVersion);
+
MessagingService.instance().setVersion(poolReference.endPoint(),
maxTargetVersion);
+ disconnect();
+ return false;
+ }
- if (targetVersion < maxTargetVersion && targetVersion <
MessagingService.current_version)
- {
- logger.trace("Detected higher max version {} (using
{}); will reconnect when queued messages are done",
- maxTargetVersion, targetVersion);
-
MessagingService.instance().setVersion(poolReference.endPoint(),
Math.min(MessagingService.current_version, maxTargetVersion));
- softCloseSocket();
- }
+ if (targetVersion < maxTargetVersion && targetVersion <
MessagingService.current_version)
+ {
+ logger.trace("Detected higher max version {} (using {});
will reconnect when queued messages are done",
+ maxTargetVersion, targetVersion);
+
MessagingService.instance().setVersion(poolReference.endPoint(),
Math.min(MessagingService.current_version, maxTargetVersion));
+ softCloseSocket();
+ }
- out.writeInt(MessagingService.current_version);
-
CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(),
out);
- if (shouldCompressConnection())
- {
- out.flush();
- logger.trace("Upgrading OutputStream to be
compressed");
- out = new DataOutputStream(new SnappyOutputStream(new
BufferedOutputStream(socket.getOutputStream())));
- }
+ out.writeInt(MessagingService.current_version);
+
CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(),
out);
+ if (shouldCompressConnection())
+ {
+ out.flush();
+ logger.trace("Upgrading OutputStream to be compressed");
+ out = new DataOutputStream(new SnappyOutputStream(new
BufferedOutputStream(socket.getOutputStream())));
}
return true;