Expire OTC messages by a single Thread

patch by Christian Esken; reviewed by Ariel Weisberg for CASSANDRA-13265


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/617c8eba
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/617c8eba
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/617c8eba

Branch: refs/heads/trunk
Commit: 617c8ebadb6c4df99c35a913184760e82172b1f5
Parents: b77e754
Author: Christian Esken <christian.es...@trivago.com>
Authored: Wed Mar 1 15:56:36 2017 +0100
Committer: Ariel Weisberg <aweisb...@apple.com>
Committed: Wed May 3 16:30:43 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   9 +
 .../org/apache/cassandra/config/Config.java     |   6 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../cassandra/net/OutboundTcpConnection.java    | 113 +++++++++---
 .../apache/cassandra/service/StorageProxy.java  |  10 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../net/OutboundTcpConnectionTest.java          | 170 +++++++++++++++++++
 8 files changed, 295 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 19cd39c..9ec6c95 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.14
+ * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265)
  * Fail repair if insufficient responses received (CASSANDRA-13397)
  * Fix SSTableLoader fail when the loaded table contains dropped columns 
(CASSANDRA-13276)
  * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 61d3844..22491c6 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -989,3 +989,12 @@ windows_timer_interval: 1
 
 # Do not try to coalesce messages if we already got that many messages. This 
should be more than 2 and less than 128.
 # otc_coalescing_enough_coalesced_messages: 8
+
+# How many milliseconds to wait between two expiration runs on the backlog 
(queue) of the OutboundTcpConnection.
+# Expiration is done if messages are piling up in the backlog. Droppable 
messages are expired to free the memory
+# taken by expired messages. The interval should be between 0 and 1000, and in 
most installations the default value
+# will be appropriate. A smaller value could potentially expire messages 
slightly sooner at the expense of more CPU
+# time and queue contention while iterating the backlog of messages.
+# An interval of 0 disables any wait time, which is the behavior of former 
Cassandra versions.
+#
+# otc_backlog_expiration_interval_ms: 200

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 9aaf7ae..6a99cd3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -298,6 +298,12 @@ public class Config
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
     public int otc_coalescing_enough_coalesced_messages = 8;
 
+    /**
+     * Backlog expiration interval in milliseconds for the 
OutboundTcpConnection.
+     */
+    public static final int otc_backlog_expiration_interval_ms_default = 200;
+    public volatile int otc_backlog_expiration_interval_ms = 
otc_backlog_expiration_interval_ms_default;
+ 
     public int windows_timer_interval = 0;
 
     public boolean enable_user_defined_functions = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 602214f..e9e54c3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1967,6 +1967,16 @@ public class DatabaseDescriptor
         conf.otc_coalescing_enough_coalesced_messages = 
otc_coalescing_enough_coalesced_messages;
     }
 
+    public static int getOtcBacklogExpirationInterval()
+    {
+        return conf.otc_backlog_expiration_interval_ms;
+    }
+
+    public static void setOtcBacklogExpirationInterval(int intervalInMillis)
+    {
+        conf.otc_backlog_expiration_interval_ms = intervalInMillis;
+    }
+ 
     public static int getWindowsTimerInterval()
     {
         return conf.windows_timer_interval;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4608399..99ad194 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Checksum;
@@ -62,6 +63,7 @@ import org.xerial.snappy.SnappyOutputStream;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 public class OutboundTcpConnection extends Thread
@@ -116,9 +118,14 @@ public class OutboundTcpConnection extends Thread
         if (coalescingWindow < 0)
             throw new ExceptionInInitializerError(
                     "Value provided for coalescing window must be greather 
than 0: " + coalescingWindow);
+
+        int otc_backlog_expiration_interval_in_ms = 
DatabaseDescriptor.getOtcBacklogExpirationInterval();
+        if (otc_backlog_expiration_interval_in_ms != 
Config.otc_backlog_expiration_interval_ms_default)
+            logger.info("OutboundTcpConnection backlog expiration interval set 
to to {}ms", otc_backlog_expiration_interval_in_ms);
+
     }
 
-    private static final MessageOut CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
+    private static final MessageOut<?> CLOSE_SENTINEL = new 
MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE);
     private volatile boolean isStopped = false;
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
@@ -128,6 +135,11 @@ public class OutboundTcpConnection extends Thread
     static final int LZ4_HASH_SEED = 0x9747b28c;
 
     private final BlockingQueue<QueuedMessage> backlog = new 
LinkedBlockingQueue<>();
+    private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + 
"otc_backlog_purge_size";
+    @VisibleForTesting
+    static final int BACKLOG_PURGE_SIZE = 
Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY, 1024);
+    private final AtomicBoolean backlogExpirationActive = new 
AtomicBoolean(false);
+    private volatile long backlogNextExpirationTime;
 
     private final OutboundTcpConnectionPool poolReference;
 
@@ -164,11 +176,11 @@ public class OutboundTcpConnection extends Thread
 
     public void enqueue(MessageOut<?> message, int id)
     {
-        if (backlog.size() > 1024)
-            expireMessages();
+        long nanoTime = System.nanoTime();
+        expireMessages(nanoTime);
         try
         {
-            backlog.put(new QueuedMessage(message, id));
+            backlog.put(new QueuedMessage(message, id, nanoTime));
         }
         catch (InterruptedException e)
         {
@@ -176,6 +188,18 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
+    /**
+     * This is a helper method for unit testing. Disclaimer: Do not use this 
method outside unit tests, as
+     * this method is iterating the queue which can be an expensive operation 
(CPU time, queue locking).
+     * 
+     * @return true, if the queue contains at least one expired element
+     */
+    @VisibleForTesting // (otherwise = VisibleForTesting.NONE)
+    boolean backlogContainsExpiredMessages(long nowNanos)
+    {
+        return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos));
+    }
+
     void closeSocket(boolean destroyThread)
     {
         isStopped = destroyThread; // Exit loop to stop the thread
@@ -214,9 +238,8 @@ public class OutboundTcpConnection extends Thread
                 throw new AssertionError(e);
             }
 
-            currentMsgBufferCount = drainedMessages.size();
+            int count = currentMsgBufferCount = drainedMessages.size();
 
-            int count = drainedMessages.size();
             //The timestamp of the first message has already been provided to 
the coalescing strategy
             //so skip logging it.
             inner:
@@ -233,14 +256,16 @@ public class OutboundTcpConnection extends Thread
                         continue;
                     }
 
-                    if (qm.isTimedOut())
+                    if (qm.isTimedOut(System.nanoTime()))
                         dropped.incrementAndGet();
                     else if (socket != null || connect())
                         writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
                     {
-                        // clear out the queue, else gossip messages back up.
-                        drainedMessages.clear();
+                        // Not connected! Clear out the queue, else gossip 
messages back up. Update dropped
+                        // statistics accordingly. Hint: The statistics may be 
slightly too low, if messages
+                        // are added between the calls of backlog.size() and 
backlog.clear()
+                        dropped.addAndGet(backlog.size());
                         backlog.clear();
                         break inner;
                     }
@@ -254,6 +279,8 @@ public class OutboundTcpConnection extends Thread
                 }
                 currentMsgBufferCount = --count;
             }
+            // Update dropped statistics by the number of unprocessed 
drainedMessages
+            dropped.addAndGet(currentMsgBufferCount);
             drainedMessages.clear();
         }
     }
@@ -343,7 +370,7 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    private void writeInternal(MessageOut message, int id, long timestamp) 
throws IOException
+    private void writeInternal(MessageOut<?> message, int id, long timestamp) 
throws IOException
     {
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
 
@@ -563,18 +590,53 @@ public class OutboundTcpConnection extends Thread
         return version.get();
     }
 
-    private void expireMessages()
+    /**
+     * Expire elements from the queue if the queue is pretty full and 
expiration is not already in progress.
+     * This method will only remove droppable expired entries. If no such 
element exists, nothing is removed from the queue.
+     * 
+     * @param timestampNanos The current time as from System.nanoTime()
+     */
+    @VisibleForTesting
+    void expireMessages(long timestampNanos)
     {
-        Iterator<QueuedMessage> iter = backlog.iterator();
-        while (iter.hasNext())
+        if (backlog.size() <= BACKLOG_PURGE_SIZE)
+            return; // Plenty of space
+
+        if (backlogNextExpirationTime - timestampNanos > 0)
+            return; // Expiration is not due.
+
+        /**
+         * Expiration is an expensive process. Iterating the queue locks the 
queue for both writes and
+         * reads during iter.next() and iter.remove(). Thus letting only a 
single Thread do expiration.
+         */
+        if (backlogExpirationActive.compareAndSet(false, true))
         {
-            QueuedMessage qm = iter.next();
-            if (!qm.droppable)
-                continue;
-            if (!qm.isTimedOut())
-                return;
-            iter.remove();
-            dropped.incrementAndGet();
+            try
+            {
+                Iterator<QueuedMessage> iter = backlog.iterator();
+                while (iter.hasNext())
+                {
+                    QueuedMessage qm = iter.next();
+                    if (!qm.droppable)
+                        continue;
+                    if (!qm.isTimedOut(timestampNanos))
+                        continue;
+                    iter.remove();
+                    dropped.incrementAndGet();
+                }
+
+                if (logger.isTraceEnabled())
+                {
+                    long duration = 
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos);
+                    logger.trace("Expiration of {} took {}μs", getName(), 
duration);
+                }
+            }
+            finally
+            {
+                long backlogExpirationIntervalNanos = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval());
+                backlogNextExpirationTime = timestampNanos + 
backlogExpirationIntervalNanos;
+                backlogExpirationActive.set(false);
+            }
         }
     }
 
@@ -586,18 +648,19 @@ public class OutboundTcpConnection extends Thread
         final long timestampNanos;
         final boolean droppable;
 
-        QueuedMessage(MessageOut<?> message, int id)
+        QueuedMessage(MessageOut<?> message, int id, long timestampNanos)
         {
             this.message = message;
             this.id = id;
-            this.timestampNanos = System.nanoTime();
+            this.timestampNanos = timestampNanos;
             this.droppable = 
MessagingService.DROPPABLE_VERBS.contains(message.verb);
         }
 
         /** don't drop a non-droppable message just because it's timestamp is 
expired */
-        boolean isTimedOut()
+        boolean isTimedOut(long nowNanos)
         {
-            return droppable && timestampNanos < System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            long messageTimeoutNanos = 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            return droppable && nowNanos - timestampNanos  > 
messageTimeoutNanos;
         }
 
         boolean shouldRetry()
@@ -615,7 +678,7 @@ public class OutboundTcpConnection extends Thread
     {
         RetriedQueuedMessage(QueuedMessage msg)
         {
-            super(msg.message, msg.id);
+            super(msg.message, msg.id, msg.timestampNanos);
         }
 
         boolean shouldRetry()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index cffd63c..ea082d5 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -72,8 +72,6 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import static com.google.common.collect.Iterables.contains;
-
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=StorageProxy";
@@ -2683,4 +2681,12 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
+
+    public int getOtcBacklogExpirationInterval() {
+        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
+    }
+
+    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
+        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 0db0ca6..ee82a5b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -59,6 +59,9 @@ public interface StorageProxyMBean
     public long getReadRepairRepairedBlocking();
     public long getReadRepairRepairedBackground();
 
+    public int getOtcBacklogExpirationInterval();
+    public void setOtcBacklogExpirationInterval(int intervalInMillis);
+
     /** Returns each live node's schema version */
     public Map<String, List<String>> getSchemaVersions();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/617c8eba/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java 
b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
new file mode 100644
index 0000000..c09ae0f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService.Verb;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The tests check whether Queue expiration in the OutboundTcpConnection 
behaves properly for droppable and
+ * non-droppable messages.
+ */
+public class OutboundTcpConnectionTest
+{
+    AtomicInteger messageId = new AtomicInteger(0);
+
+    final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout
+    final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not 
droppable
+
+    final static long NANOS_FOR_TIMEOUT = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2);
+
+    
+    /**
+     * Verifies our assumptions whether a Verb can be dropped or not. The 
tests make use of droppabilty, and
+     * may produce wrong test results if their droppabilty is changed. 
+     */
+    @BeforeClass
+    public static void assertDroppability()
+    {
+        if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE))
+            throw new AssertionError("Expected " + VERB_DROPPABLE + " to be 
droppable");
+        if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE))
+            throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not 
to be droppable");
+    }
+
+    /**
+     * Tests that non-droppable messages are never expired
+     */
+    @Test
+    public void testNondroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        otc.expireMessages(expirationTimeNanos());
+
+        assertFalse("OutboundTcpConnection with non-droppable verbs should not 
expire",
+                otc.backlogContainsExpiredMessages(expirationTimeNanos()));
+    }
+
+    /**
+     * Tests that droppable messages will be dropped after they expire, but 
not before.
+     * 
+     * @throws UnknownHostException
+     */
+    @Test
+    public void testDroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        initialFill(otc, VERB_DROPPABLE);
+        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire immediately",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        otc.expireMessages(nanoTimeBeforeEnqueue);
+        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire with enqueue-time expiration",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        // Lets presume, expiration time have passed => At that time there 
shall be expired messages in the Queue
+        long nanoTimeWhenExpired = expirationTimeNanos();
+        assertTrue("OutboundTcpConnection with droppable verbs should have 
expired",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Using the same timestamp, lets expire them and check whether they 
have gone
+        otc.expireMessages(nanoTimeWhenExpired);
+        assertFalse("OutboundTcpConnection should not have expired entries",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Actually the previous test can be done in a harder way: As 
expireMessages() has run, we cannot have
+        // ANY expired values, thus lets test also against 
nanoTimeBeforeEnqueue
+        assertFalse("OutboundTcpConnection should not have any expired 
entries",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+    }
+
+    /**
+     * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), 
elements. The first
+     * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a 
message with the given Verb and can be
+     * droppable or non-droppable.
+     */
+    private void initialFill(OutboundTcpConnection otc, Verb verb)
+    {
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(System.nanoTime()));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        MessageOut<?> messageDroppable10s = new MessageOut<>(verb);
+        otc.enqueue(messageDroppable10s, nextMessageId());
+        otc.expireMessages(System.nanoTime());
+    }
+
+    /**
+     * Returns a nano timestamp in the far future, when expiration should have 
been performed for VERB_DROPPABLE.
+     * The offset is chosen as 2 times of the expiration time of 
VERB_DROPPABLE.
+     * 
+     * @return The future nano timestamp
+     */
+    private long expirationTimeNanos()
+    {
+        return System.nanoTime() + NANOS_FOR_TIMEOUT;
+    }
+
+    private int nextMessageId()
+    {
+        return messageId.incrementAndGet();
+    }
+
+    /**
+     * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At 
BACKLOG_PURGE_SIZE expiration starts to work.
+     * 
+     * @param otc
+     *            The OutboundTcpConnection
+     * @param verb
+     *            The verb that defines the message type
+     */
+    private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb)
+    {
+        for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++)
+        {
+            otc.enqueue(new MessageOut<>(verb), nextMessageId());
+        }
+    }
+
+    private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() 
throws UnknownHostException
+    {
+        InetAddress lo = InetAddress.getByName("127.0.0.1");
+        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo);
+        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool);
+        return otc;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to