Updated Branches: refs/heads/trunk 15e3f142a -> a28a2ba93
Require enabling cross-node timeouts patch by Vijay; reviewed by jbellis for CASSANDRA-4812 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a28a2ba9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a28a2ba9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a28a2ba9 Branch: refs/heads/trunk Commit: a28a2ba93764c602268d17ce4e5604ba179428f4 Parents: 15e3f14 Author: Vijay Parthasarathy <[email protected]> Authored: Fri Oct 19 15:58:37 2012 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Fri Oct 19 15:58:37 2012 -0700 ---------------------------------------------------------------------- conf/cassandra.yaml | 8 ++++++++ src/java/org/apache/cassandra/config/Config.java | 2 ++ .../cassandra/config/DatabaseDescriptor.java | 5 +++++ .../cassandra/net/IncomingTcpConnection.java | 12 +++++++++--- 4 files changed, 24 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a28a2ba9/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 0a261c8..37fc572 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -462,6 +462,14 @@ truncate_rpc_timeout_in_ms: 300000 # The default timeout for other, miscellaneous operations rpc_timeout_in_ms: 10000 +# Enable operation timeout information exchange between nodes to accurately +# measure request timeouts, If disabled cassandra will assuming the request +# was forwarded to the replica instantly by the coordinator +# +# Warning: before enabling this property make sure to ntp is installed +# and the times are synchronized between the nodes. +cross_node_timeout: false + # Enable socket timeout for streaming operation. # When a timeout occurs during streaming, streaming is retried from the start # of the current file. This *can* involve re-streaming an important amount of http://git-wip-us.apache.org/repos/asf/cassandra/blob/a28a2ba9/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 732760b..c605a3a 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -58,6 +58,8 @@ public class Config public Integer streaming_socket_timeout_in_ms = new Integer(0); + public boolean cross_node_timeout = false; + public volatile Double phi_convict_threshold = 8.0; public Integer concurrent_reads = 8; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a28a2ba9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7d87c23..e615887 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -786,6 +786,11 @@ public class DatabaseDescriptor conf.truncate_rpc_timeout_in_ms = timeOutInMillis; } + public static boolean hasCrossNodeTimeout() + { + return conf.cross_node_timeout; + } + // not part of the Verb enum so we can change timeouts easily via JMX public static long getTimeout(MessagingService.Verb verb) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a28a2ba9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 949c5b6..cb989c2 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -24,6 +24,7 @@ import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.streaming.IncomingStreamReader; @@ -178,9 +179,14 @@ public class IncomingTcpConnection extends Thread input.readInt(); // size of entire message. in 1.0+ this is just a placeholder String id = input.readUTF(); - long timestamp = version >= MessagingService.VERSION_12 - ? (System.currentTimeMillis() & 0xFFFFFFFF00000000L) | (((input.readInt() & 0xFFFFFFFFL) << 2) >> 2) - : System.currentTimeMillis(); + long timestamp = System.currentTimeMillis();; + if (version >= MessagingService.VERSION_12) + { + // make sure to readInt, even if cross_node_to is not enabled + int partial = input.readInt(); + if (DatabaseDescriptor.hasCrossNodeTimeout()) + timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + } MessageIn message = MessageIn.read(input, version, id); if (message == null)
