[ 
https://issues.apache.org/jira/browse/CASSANDRA-13265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christian Esken updated CASSANDRA-13265:
----------------------------------------
    Status: Patch Available  (was: Open)

>From 6bd3f3fc3b2da3a66b53a94a819446a9ea8ea2cf Mon Sep 17 00:00:00 2001
From: Christian Esken <christian.es...@trivago.com>
Date: Wed, 1 Mar 2017 15:56:36 +0100
Subject: [PATCH] Expire OTC messages by a single Thread

This patch consists of the following aspects related to OutboundTcpConnection:
- Backlog queue expiration by a single Thread
- Drop count statistics
- QueuedMessage.isTimedOut() fix

When backlog queue expiration is done, one single Thread is elected to do the
work. Previously, all Threads would go in and do the same work,
producing high lock contention. The Thread reading from the Queue could
even be starved by not be able to acquire the read lock.
Backlog queue is inspected every otc_backlog_expiration_interval_ms
milliseconds if its size exceeds BACKLOG_PURGE_SIZE. Added unit tests
for OutboundTcpConnection.

Timed out messages are counted in the dropped statistics. Additionally
count the dropped messages when it is not possible to write to the
socket, e.g. if there is no connection because a target node is down.

Fix QueuedMessage.isTimedOut(), which had used a "a < b" comparison on
nano time values, which can be wrong due to wrapping of System.nanoTime().

CASSANDRA-13265
---
 conf/cassandra.yaml                                |   9 ++
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../cassandra/net/OutboundTcpConnection.java       | 113 +++++++++++---
 .../org/apache/cassandra/service/StorageProxy.java |  10 +-
 .../cassandra/service/StorageProxyMBean.java       |   3 +
 .../cassandra/net/OutboundTcpConnectionTest.java   | 170 +++++++++++++++++++++
 7 files changed, 294 insertions(+), 27 deletions(-)
 create mode 100644 
test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java

diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 790dfd743b..9c1510b66a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -985,3 +985,12 @@ windows_timer_interval: 1
 
 # Do not try to coalesce messages if we already got that many messages. This 
should be more than 2 and less than 128.
 # otc_coalescing_enough_coalesced_messages: 8
+
+# How many milliseconds to wait between two expiration runs on the backlog 
(queue) of the OutboundTcpConnection.
+# Expiration is done if messages are piling up in the backlog. Droppable 
messages are expired to free the memory
+# taken by expired messages. The interval should be between 0 and 1000, and in 
most installations the default value
+# will be appropriate. A smaller value could potentially expire messages 
slightly sooner at the expense of more CPU
+# time and queue contention while iterating the backlog of messages.
+# An interval of 0 disables any wait time, which is the behavior of former 
Cassandra versions.
+#
+# otc_backlog_expiration_interval_ms: 200
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 9aaf7ae33e..6a99cd3cbd 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -298,6 +298,12 @@ public class Config
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
     public int otc_coalescing_enough_coalesced_messages = 8;
 
+    /**
+     * Backlog expiration interval in milliseconds for the 
OutboundTcpConnection.
+     */
+    public static final int otc_backlog_expiration_interval_ms_default = 200;
+    public volatile int otc_backlog_expiration_interval_ms = 
otc_backlog_expiration_interval_ms_default;
+ 
     public int windows_timer_interval = 0;
 
     public boolean enable_user_defined_functions = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 602214f3c6..e9e54c3e20 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1967,6 +1967,16 @@ public class DatabaseDescriptor
         conf.otc_coalescing_enough_coalesced_messages = 
otc_coalescing_enough_coalesced_messages;
     }
 
+    public static int getOtcBacklogExpirationInterval()
+    {
+        return conf.otc_backlog_expiration_interval_ms;
+    }
+
+    public static void setOtcBacklogExpirationInterval(int intervalInMillis)
+    {
+        conf.otc_backlog_expiration_interval_ms = intervalInMillis;
+    }
+ 
     public static int getWindowsTimerInterval()
     {
         return conf.windows_timer_interval;
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 46083994df..99ad194b94 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Checksum;
@@ -62,6 +63,7 @@ import org.xerial.snappy.SnappyOutputStream;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 public class OutboundTcpConnection extends Thread
@@ -116,9 +118,14 @@ public class OutboundTcpConnection extends Thread
         if (coalescingWindow < 0)
             throw new ExceptionInInitializerError(
                     "Value provided for coalescing window must be greather 
than 0: " + coalescingWindow);
+
+        int otc_backlog_expiration_interval_in_ms = 
DatabaseDescriptor.getOtcBacklogExpirationInterval();
+        if (otc_backlog_expiration_interval_in_ms != 
Config.otc_backlog_expiration_interval_ms_default)
+            logger.info("OutboundTcpConnection backlog expiration interval set 
to to {}ms", otc_backlog_expiration_interval_in_ms);
+
     }
 
-    private static final MessageOut CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
+    private static final MessageOut<?> CLOSE_SENTINEL = new 
MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE);
     private volatile boolean isStopped = false;
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
@@ -128,6 +135,11 @@ public class OutboundTcpConnection extends Thread
     static final int LZ4_HASH_SEED = 0x9747b28c;
 
     private final BlockingQueue<QueuedMessage> backlog = new 
LinkedBlockingQueue<>();
+    private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + 
"otc_backlog_purge_size";
+    @VisibleForTesting
+    static final int BACKLOG_PURGE_SIZE = 
Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY, 1024);
+    private final AtomicBoolean backlogExpirationActive = new 
AtomicBoolean(false);
+    private volatile long backlogNextExpirationTime;
 
     private final OutboundTcpConnectionPool poolReference;
 
@@ -164,11 +176,11 @@ public class OutboundTcpConnection extends Thread
 
     public void enqueue(MessageOut<?> message, int id)
     {
-        if (backlog.size() > 1024)
-            expireMessages();
+        long nanoTime = System.nanoTime();
+        expireMessages(nanoTime);
         try
         {
-            backlog.put(new QueuedMessage(message, id));
+            backlog.put(new QueuedMessage(message, id, nanoTime));
         }
         catch (InterruptedException e)
         {
@@ -176,6 +188,18 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
+    /**
+     * This is a helper method for unit testing. Disclaimer: Do not use this 
method outside unit tests, as
+     * this method is iterating the queue which can be an expensive operation 
(CPU time, queue locking).
+     * 
+     * @return true, if the queue contains at least one expired element
+     */
+    @VisibleForTesting // (otherwise = VisibleForTesting.NONE)
+    boolean backlogContainsExpiredMessages(long nowNanos)
+    {
+        return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos));
+    }
+
     void closeSocket(boolean destroyThread)
     {
         isStopped = destroyThread; // Exit loop to stop the thread
@@ -214,9 +238,8 @@ public class OutboundTcpConnection extends Thread
                 throw new AssertionError(e);
             }
 
-            currentMsgBufferCount = drainedMessages.size();
+            int count = currentMsgBufferCount = drainedMessages.size();
 
-            int count = drainedMessages.size();
             //The timestamp of the first message has already been provided to 
the coalescing strategy
             //so skip logging it.
             inner:
@@ -233,14 +256,16 @@ public class OutboundTcpConnection extends Thread
                         continue;
                     }
 
-                    if (qm.isTimedOut())
+                    if (qm.isTimedOut(System.nanoTime()))
                         dropped.incrementAndGet();
                     else if (socket != null || connect())
                         writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
                     {
-                        // clear out the queue, else gossip messages back up.
-                        drainedMessages.clear();
+                        // Not connected! Clear out the queue, else gossip 
messages back up. Update dropped
+                        // statistics accordingly. Hint: The statistics may be 
slightly too low, if messages
+                        // are added between the calls of backlog.size() and 
backlog.clear()
+                        dropped.addAndGet(backlog.size());
                         backlog.clear();
                         break inner;
                     }
@@ -254,6 +279,8 @@ public class OutboundTcpConnection extends Thread
                 }
                 currentMsgBufferCount = --count;
             }
+            // Update dropped statistics by the number of unprocessed 
drainedMessages
+            dropped.addAndGet(currentMsgBufferCount);
             drainedMessages.clear();
         }
     }
@@ -343,7 +370,7 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    private void writeInternal(MessageOut message, int id, long timestamp) 
throws IOException
+    private void writeInternal(MessageOut<?> message, int id, long timestamp) 
throws IOException
     {
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
 
@@ -563,18 +590,53 @@ public class OutboundTcpConnection extends Thread
         return version.get();
     }
 
-    private void expireMessages()
+    /**
+     * Expire elements from the queue if the queue is pretty full and 
expiration is not already in progress.
+     * This method will only remove droppable expired entries. If no such 
element exists, nothing is removed from the queue.
+     * 
+     * @param timestampNanos The current time as from System.nanoTime()
+     */
+    @VisibleForTesting
+    void expireMessages(long timestampNanos)
     {
-        Iterator<QueuedMessage> iter = backlog.iterator();
-        while (iter.hasNext())
+        if (backlog.size() <= BACKLOG_PURGE_SIZE)
+            return; // Plenty of space
+
+        if (backlogNextExpirationTime - timestampNanos > 0)
+            return; // Expiration is not due.
+
+        /**
+         * Expiration is an expensive process. Iterating the queue locks the 
queue for both writes and
+         * reads during iter.next() and iter.remove(). Thus letting only a 
single Thread do expiration.
+         */
+        if (backlogExpirationActive.compareAndSet(false, true))
         {
-            QueuedMessage qm = iter.next();
-            if (!qm.droppable)
-                continue;
-            if (!qm.isTimedOut())
-                return;
-            iter.remove();
-            dropped.incrementAndGet();
+            try
+            {
+                Iterator<QueuedMessage> iter = backlog.iterator();
+                while (iter.hasNext())
+                {
+                    QueuedMessage qm = iter.next();
+                    if (!qm.droppable)
+                        continue;
+                    if (!qm.isTimedOut(timestampNanos))
+                        continue;
+                    iter.remove();
+                    dropped.incrementAndGet();
+                }
+
+                if (logger.isTraceEnabled())
+                {
+                    long duration = 
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos);
+                    logger.trace("Expiration of {} took {}μs", getName(), 
duration);
+                }
+            }
+            finally
+            {
+                long backlogExpirationIntervalNanos = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval());
+                backlogNextExpirationTime = timestampNanos + 
backlogExpirationIntervalNanos;
+                backlogExpirationActive.set(false);
+            }
         }
     }
 
@@ -586,18 +648,19 @@ public class OutboundTcpConnection extends Thread
         final long timestampNanos;
         final boolean droppable;
 
-        QueuedMessage(MessageOut<?> message, int id)
+        QueuedMessage(MessageOut<?> message, int id, long timestampNanos)
         {
             this.message = message;
             this.id = id;
-            this.timestampNanos = System.nanoTime();
+            this.timestampNanos = timestampNanos;
             this.droppable = 
MessagingService.DROPPABLE_VERBS.contains(message.verb);
         }
 
         /** don't drop a non-droppable message just because it's timestamp is 
expired */
-        boolean isTimedOut()
+        boolean isTimedOut(long nowNanos)
         {
-            return droppable && timestampNanos < System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            long messageTimeoutNanos = 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            return droppable && nowNanos - timestampNanos  > 
messageTimeoutNanos;
         }
 
         boolean shouldRetry()
@@ -615,7 +678,7 @@ public class OutboundTcpConnection extends Thread
     {
         RetriedQueuedMessage(QueuedMessage msg)
         {
-            super(msg.message, msg.id);
+            super(msg.message, msg.id, msg.timestampNanos);
         }
 
         boolean shouldRetry()
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index cffd63cd8d..ea082d5f20 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -72,8 +72,6 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import static com.google.common.collect.Iterables.contains;
-
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=StorageProxy";
@@ -2683,4 +2681,12 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
+
+    public int getOtcBacklogExpirationInterval() {
+        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
+    }
+
+    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
+        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 0db0ca60ff..ee82a5b1dd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -59,6 +59,9 @@ public interface StorageProxyMBean
     public long getReadRepairRepairedBlocking();
     public long getReadRepairRepairedBackground();
 
+    public int getOtcBacklogExpirationInterval();
+    public void setOtcBacklogExpirationInterval(int intervalInMillis);
+
     /** Returns each live node's schema version */
     public Map<String, List<String>> getSchemaVersions();
 }
diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java 
b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
new file mode 100644
index 0000000000..c09ae0f07e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService.Verb;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The tests check whether Queue expiration in the OutboundTcpConnection 
behaves properly for droppable and
+ * non-droppable messages.
+ */
+public class OutboundTcpConnectionTest
+{
+    AtomicInteger messageId = new AtomicInteger(0);
+
+    final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout
+    final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not 
droppable
+
+    final static long NANOS_FOR_TIMEOUT = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2);
+
+    
+    /**
+     * Verifies our assumptions whether a Verb can be dropped or not. The 
tests make use of droppabilty, and
+     * may produce wrong test results if their droppabilty is changed. 
+     */
+    @BeforeClass
+    public static void assertDroppability()
+    {
+        if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE))
+            throw new AssertionError("Expected " + VERB_DROPPABLE + " to be 
droppable");
+        if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE))
+            throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not 
to be droppable");
+    }
+
+    /**
+     * Tests that non-droppable messages are never expired
+     */
+    @Test
+    public void testNondroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        otc.expireMessages(expirationTimeNanos());
+
+        assertFalse("OutboundTcpConnection with non-droppable verbs should not 
expire",
+                otc.backlogContainsExpiredMessages(expirationTimeNanos()));
+    }
+
+    /**
+     * Tests that droppable messages will be dropped after they expire, but 
not before.
+     * 
+     * @throws UnknownHostException
+     */
+    @Test
+    public void testDroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        initialFill(otc, VERB_DROPPABLE);
+        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire immediately",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        otc.expireMessages(nanoTimeBeforeEnqueue);
+        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire with enqueue-time expiration",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        // Lets presume, expiration time have passed => At that time there 
shall be expired messages in the Queue
+        long nanoTimeWhenExpired = expirationTimeNanos();
+        assertTrue("OutboundTcpConnection with droppable verbs should have 
expired",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Using the same timestamp, lets expire them and check whether they 
have gone
+        otc.expireMessages(nanoTimeWhenExpired);
+        assertFalse("OutboundTcpConnection should not have expired entries",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Actually the previous test can be done in a harder way: As 
expireMessages() has run, we cannot have
+        // ANY expired values, thus lets test also against 
nanoTimeBeforeEnqueue
+        assertFalse("OutboundTcpConnection should not have any expired 
entries",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+    }
+
+    /**
+     * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), 
elements. The first
+     * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a 
message with the given Verb and can be
+     * droppable or non-droppable.
+     */
+    private void initialFill(OutboundTcpConnection otc, Verb verb)
+    {
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(System.nanoTime()));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        MessageOut<?> messageDroppable10s = new MessageOut<>(verb);
+        otc.enqueue(messageDroppable10s, nextMessageId());
+        otc.expireMessages(System.nanoTime());
+    }
+
+    /**
+     * Returns a nano timestamp in the far future, when expiration should have 
been performed for VERB_DROPPABLE.
+     * The offset is chosen as 2 times of the expiration time of 
VERB_DROPPABLE.
+     * 
+     * @return The future nano timestamp
+     */
+    private long expirationTimeNanos()
+    {
+        return System.nanoTime() + NANOS_FOR_TIMEOUT;
+    }
+
+    private int nextMessageId()
+    {
+        return messageId.incrementAndGet();
+    }
+
+    /**
+     * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At 
BACKLOG_PURGE_SIZE expiration starts to work.
+     * 
+     * @param otc
+     *            The OutboundTcpConnection
+     * @param verb
+     *            The verb that defines the message type
+     */
+    private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb)
+    {
+        for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++)
+        {
+            otc.enqueue(new MessageOut<>(verb), nextMessageId());
+        }
+    }
+
+    private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() 
throws UnknownHostException
+    {
+        InetAddress lo = InetAddress.getByName("127.0.0.1");
+        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo);
+        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool);
+        return otc;
+    }
+}
-- 
2.12.0



> Expiration in OutboundTcpConnection can block the reader Thread
> ---------------------------------------------------------------
>
>                 Key: CASSANDRA-13265
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-13265
>             Project: Cassandra
>          Issue Type: Bug
>         Environment: Cassandra 3.0.9
> Java HotSpot(TM) 64-Bit Server VM version 25.112-b15 (Java version 
> 1.8.0_112-b15)
> Linux 3.16
>            Reporter: Christian Esken
>            Assignee: Christian Esken
>             Fix For: 3.0.x
>
>         Attachments: cassandra.pb-cache4-dus.2017-02-17-19-36-26.chist.xz, 
> cassandra.pb-cache4-dus.2017-02-17-19-36-26.td.xz
>
>
> I observed that sometimes a single node in a Cassandra cluster fails to 
> communicate to the other nodes. This can happen at any time, during peak load 
> or low load. Restarting that single node from the cluster fixes the issue.
> Before going in to details, I want to state that I have analyzed the 
> situation and am already developing a possible fix. Here is the analysis so 
> far:
> - A Threaddump in this situation showed  324 Threads in the 
> OutboundTcpConnection class that want to lock the backlog queue for doing 
> expiration.
> - A class histogram shows 262508 instances of 
> OutboundTcpConnection$QueuedMessage.
> What is the effect of it? As soon as the Cassandra node has reached a certain 
> amount of queued messages, it starts thrashing itself to death. Each of the 
> Thread fully locks the Queue for reading and writing by calling 
> iterator.next(), making the situation worse and worse.
> - Writing: Only after 262508 locking operation it can progress with actually 
> writing to the Queue.
> - Reading: Is also blocked, as 324 Threads try to do iterator.next(), and 
> fully lock the Queue
> This means: Writing blocks the Queue for reading, and readers might even be 
> starved which makes the situation even worse.
> -----
> The setup is:
>  - 3-node cluster
>  - replication factor 2
>  - Consistency LOCAL_ONE
>  - No remote DC's
>  - high write throughput (100000 INSERT statements per second and more during 
> peak times).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to