Updated Branches: refs/heads/trunk 978d7bb7d -> 0e3a9a55e
include message initiation time to replicas patch by jbellis; reviewed by vijay for CASSANDRA-2858 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3a9a55 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3a9a55 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3a9a55 Branch: refs/heads/trunk Commit: 0e3a9a55e484707addf32f73d18014df61c8d8f9 Parents: 978d7bb Author: Jonathan Ellis <[email protected]> Authored: Mon Sep 3 10:49:49 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Sep 5 23:07:45 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + NEWS.txt | 4 ++ .../cassandra/net/IncomingTcpConnection.java | 8 +++- .../apache/cassandra/net/MessageDeliveryTask.java | 5 ++- .../org/apache/cassandra/net/MessagingService.java | 4 +- .../cassandra/net/OutboundTcpConnection.java | 25 ++++++++------- .../cassandra/streaming/StreamInSession.java | 2 + 7 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1ed7941..b08117d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.2-dev + * include message initiation time to replicas so they can more + accurately drop timed-out requests (CASSANDRA-2858) * fix clientutil.jar dependencies (CASSANDRA-4566) * optimize WriteResponse (CASSANDRA-4548) * new metrics (CASSANDRA-4009) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 731040a..3c9ead7 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -19,6 +19,10 @@ Upgrading the ability to read data files from Cassandra versions at least back to 0.6, so a non-rolling upgrade remains possible with just one step. + - Server clock synchronization is more important in 1.2; replicas + will use a coordinator-provided timestamp to determine when a + request has timed out and is thus not worth proceeding with. + Using a service like NTP is strongly recommended. - The hints schema was changed from 1.1 to 1.2. Cassandra automatically snapshots and then truncates the hints column family as part of starting up 1.2 for the first time. Additionally, upgraded nodes http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 1b0ae05..5bf4c5d 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -174,10 +174,14 @@ public class IncomingTcpConnection extends Thread private InetAddress receiveMessage(DataInputStream input, int version) throws IOException { - if (version <= MessagingService.VERSION_11) + if (version < MessagingService.VERSION_12) input.readInt(); // size of entire message. in 1.0+ this is just a placeholder String id = input.readUTF(); + long timestamp = version >= MessagingService.VERSION_12 + ? (System.currentTimeMillis() | 0x00000000FFFFFFFFL) & input.readInt() + : System.currentTimeMillis(); + MessageIn message = MessageIn.read(input, version, id); if (message == null) { @@ -186,7 +190,7 @@ public class IncomingTcpConnection extends Thread } if (version <= MessagingService.current_version) { - MessagingService.instance().receive(message, id); + MessagingService.instance().receive(message, id, timestamp); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 02a9b1c..e6abdda 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -25,14 +25,15 @@ public class MessageDeliveryTask implements Runnable private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class); private final MessageIn message; - private final long constructionTime = System.currentTimeMillis(); + private final long constructionTime; private final String id; - public MessageDeliveryTask(MessageIn message, String id) + public MessageDeliveryTask(MessageIn message, String id, long timestamp) { assert message != null; this.message = message; this.id = id; + constructionTime = timestamp; } public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 0090f87..258e712 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -655,7 +655,7 @@ public final class MessagingService implements MessagingServiceMBean } } - public void receive(MessageIn message, String id) + public void receive(MessageIn message, String id, long timestamp) { Tracing.instance().initializeFromMessage(message); @@ -666,7 +666,7 @@ public final class MessagingService implements MessagingServiceMBean if (message == null) return; - Runnable runnable = new MessageDeliveryTask(message, id); + Runnable runnable = new MessageDeliveryTask(message, id, timestamp); ExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; stage.execute(runnable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index c0c1bf2..e52b4cc 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -123,7 +123,6 @@ public class OutboundTcpConnection extends Thread } MessageOut<?> m = qm.message; - String id = qm.id; if (m == CLOSE_SENTINEL) { disconnect(); @@ -132,7 +131,7 @@ public class OutboundTcpConnection extends Thread if (qm.timestamp < System.currentTimeMillis() - m.getTimeout()) dropped.incrementAndGet(); else if (socket != null || connect()) - writeConnected(m, id); + writeConnected(qm); else // clear out the queue, else gossip messages back up. active.clear(); @@ -161,11 +160,11 @@ public class OutboundTcpConnection extends Thread || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint())); } - private void writeConnected(MessageOut<?> message, String id) + private void writeConnected(QueuedMessage qm) { try { - write(message, id, out); + write(qm.message, qm.id, qm.timestamp, out, targetVersion); completed++; if (active.peek() == null) { @@ -183,21 +182,23 @@ public class OutboundTcpConnection extends Thread } } - public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException - { - write(message, id, out, targetVersion); - } - - public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException + public static void write(MessageOut message, String id, long timestamp, DataOutputStream out, int version) throws IOException { out.writeInt(MessagingService.PROTOCOL_MAGIC); if (version < MessagingService.VERSION_12) + { writeHeader(out, version, false); - // 0.8 included a total message size int. 1.0 doesn't need it but expects it to be there. - if (version < MessagingService.VERSION_12) + // 0.8 included a total message size int. 1.0 doesn't need it but expects it to be there. out.writeInt(-1); + } out.writeUTF(id); + if (version >= MessagingService.VERSION_12) + { + // int cast cuts off the high-order half of the timestamp, which we can assume remains + // the same between now and when the recipient reconstructs it. + out.writeInt((int) timestamp); + } message.serialize(out, version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3a9a55/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index 47a8896..5753a15 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -170,6 +170,7 @@ public class StreamInSession extends AbstractStreamSession DataOutputStream out = new DataOutputStream(socket.getOutputStream()); OutboundTcpConnection.write(message, String.valueOf(getSessionId()), + System.currentTimeMillis(), out, MessagingService.instance().getVersion(getHost())); out.flush(); @@ -221,6 +222,7 @@ public class StreamInSession extends AbstractStreamSession if (socket != null) OutboundTcpConnection.write(reply.createMessage(), context.right.toString(), + System.currentTimeMillis(), new DataOutputStream(socket.getOutputStream()), MessagingService.instance().getVersion(getHost())); else
