Author: jbellis
Date: Thu Aug 5 14:16:02 2010
New Revision: 982630
URL: http://svn.apache.org/viewvc?rev=982630&view=rev
Log:
backport CASSANDRA-1284/r964922 from trunk
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Aug 5 14:16:02 2010
@@ -5,6 +5,7 @@
* page within a single row during hinted handoff (CASSANDRA-1327)
* fix compilation on non-sun JKDs (CASSANDRA-1061)
* remove String.trim() call on row keys in batch mutations (CASSANDRA-1235)
+ * Log summary of dropped messages instead of spamming log (CASSANDRA-1284)
0.6.4
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
Thu Aug 5 14:16:02 2010
@@ -44,8 +44,7 @@ class MessageDeserializationTask extends
{
if (System.currentTimeMillis() > constructionTime +
DatabaseDescriptor.getRpcTimeout())
{
- logger.warn(String.format("dropping message (%,dms past timeout)",
- System.currentTimeMillis() -
(constructionTime + DatabaseDescriptor.getRpcTimeout())));
+ MessagingService.incrementDroppedMessages();
return;
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Thu Aug 5 14:16:02 2010
@@ -45,10 +45,13 @@ import java.nio.channels.ServerSocketCha
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class MessagingService
{
@@ -75,11 +78,13 @@ public class MessagingService
private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>
connectionManagers_ = new NonBlockingHashMap<InetAddress,
OutboundTcpConnectionPool>();
private static Logger logger_ = Logger.getLogger(MessagingService.class);
+ private static int LOG_DROPPED_INTERVAL_IN_MS = 1000;
public static final MessagingService instance = new MessagingService();
private SocketThread socketThread;
private SimpleCondition listenGate;
+ private static AtomicInteger droppedMessages = new AtomicInteger();
public Object clone() throws CloneNotSupportedException
{
@@ -109,6 +114,15 @@ public class MessagingService
new
NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+ TimerTask logDropped = new TimerTask()
+ {
+ public void run()
+ {
+ logDroppedMessages();
+ }
+ };
+ Timer timer = new Timer("DroppedMessagesLogger");
+ timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS);
}
public byte[] hash(String type, byte data[])
@@ -477,7 +491,19 @@ public class MessagingService
buffer.flip();
return buffer;
}
-
+
+ public static int incrementDroppedMessages()
+ {
+ return droppedMessages.incrementAndGet();
+ }
+
+ private static void logDroppedMessages()
+ {
+ if (droppedMessages.get() > 0)
+ logger_.warn("Dropped " + droppedMessages + " messages in the last
" + LOG_DROPPED_INTERVAL_IN_MS + "ms");
+ droppedMessages.set(0);
+ }
+
private class SocketThread extends Thread
{
private final ServerSocket server;