Support optional backpressure strategies at the coordinator

patch by Sergio Bossa; reviewed by Stefania Alborghetti for CASSANDRA-9318


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d43b9ce5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d43b9ce5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d43b9ce5

Branch: refs/heads/trunk
Commit: d43b9ce5092f8879a1a66afebab74d86e9e127fb
Parents: 560faba
Author: Sergio Bossa <sergio.bo...@gmail.com>
Authored: Mon Sep 19 10:42:50 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Committed: Tue Sep 20 09:07:36 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  22 +
 .../org/apache/cassandra/config/Config.java     |   4 +-
 .../cassandra/config/DatabaseDescriptor.java    |  46 +++
 .../apache/cassandra/hints/HintsDispatcher.java |   6 +
 .../apache/cassandra/net/BackPressureState.java |  51 +++
 .../cassandra/net/BackPressureStrategy.java     |  42 ++
 .../apache/cassandra/net/IAsyncCallback.java    |   5 +
 .../apache/cassandra/net/MessagingService.java  | 124 +++++-
 .../cassandra/net/MessagingServiceMBean.java    |  18 +-
 .../net/OutboundTcpConnectionPool.java          |  12 +-
 .../cassandra/net/RateBasedBackPressure.java    | 296 ++++++++++++++
 .../net/RateBasedBackPressureState.java         | 133 ++++++
 .../cassandra/net/ResponseVerbHandler.java      |   5 +
 .../service/AbstractWriteResponseHandler.java   |  27 +-
 .../apache/cassandra/service/StorageProxy.java  |  37 +-
 .../apache/cassandra/utils/SlidingTimeRate.java | 167 ++++++++
 .../cassandra/utils/SystemTimeSource.java       |  54 +++
 .../apache/cassandra/utils/TestRateLimiter.java |  58 +++
 .../org/apache/cassandra/utils/TimeSource.java  |  58 +++
 .../utils/concurrent/IntervalLock.java          |  69 ++++
 test/resources/byteman/mutation_limiter.btm     |   8 +
 .../config/DatabaseDescriptorRefTest.java       |   1 +
 .../cassandra/net/MessagingServiceTest.java     | 247 ++++++++++-
 .../net/RateBasedBackPressureTest.java          | 409 +++++++++++++++++++
 .../cassandra/utils/SlidingTimeRateTest.java    | 146 +++++++
 .../apache/cassandra/utils/TestTimeSource.java  |  72 ++++
 27 files changed, 2072 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b625a58..e9e8ccf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
  * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
  * Fix cassandra-stress graphing (CASSANDRA-12237)
  * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a25e084..8a0b3ee 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1172,3 +1172,25 @@ gc_warn_threshold_in_ms: 1000
 # early. Any value size larger than this threshold will result into marking an 
SSTable
 # as corrupted.
 # max_value_size_in_mb: 256
+
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified 
below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming 
mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the 
incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values 
for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate 
limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement 
org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+    - class_name: org.apache.cassandra.net.RateBasedBackPressure
+      parameters:
+        - high_ratio: 0.90
+          factor: 5
+          flow: FAST

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index f2f21ad..e6b3638 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -46,7 +46,6 @@ public class Config
      */
     public static final String PROPERTY_PREFIX = "cassandra.";
 
-
     public String cluster_name = "Test Cluster";
     public String authenticator;
     public String authorizer;
@@ -355,6 +354,9 @@ public class Config
      */
     public UserFunctionTimeoutPolicy user_function_timeout_policy = 
UserFunctionTimeoutPolicy.die;
 
+    public volatile boolean back_pressure_enabled = false;
+    public volatile ParameterizedClass back_pressure_strategy;
+
     public static boolean getOutboundBindAny()
     {
         return outboundBindAny;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index dc4cc36..ce889ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.*;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
@@ -53,6 +54,8 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.EndpointSnitchInfo;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.net.BackPressureStrategy;
+import org.apache.cassandra.net.RateBasedBackPressure;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.security.EncryptionContext;
@@ -110,6 +113,7 @@ public class DatabaseDescriptor
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
+    private static BackPressureStrategy backPressureStrategy;
     private static DiskOptimizationStrategy diskOptimizationStrategy;
 
     private static boolean clientInitialized;
@@ -706,6 +710,27 @@ public class DatabaseDescriptor
                 diskOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
                 break;
         }
+
+        try
+        {
+            ParameterizedClass strategy = conf.back_pressure_strategy != null 
? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
+            Class<?> clazz = Class.forName(strategy.class_name);
+            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
+                throw new ConfigurationException(strategy + " is not an 
instance of " + BackPressureStrategy.class.getCanonicalName(), false);
+
+            Constructor<?> ctor = clazz.getConstructor(Map.class);
+            BackPressureStrategy instance = (BackPressureStrategy) 
ctor.newInstance(strategy.parameters);
+            logger.info("Back-pressure is {} with strategy {}.", 
backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
+            backPressureStrategy = instance;
+        }
+        catch (ConfigurationException ex)
+        {
+            throw ex;
+        }
+        catch (Exception ex)
+        {
+            throw new ConfigurationException("Error configuring back-pressure 
strategy: " + conf.back_pressure_strategy, ex);
+        }
     }
 
     public static void applyAddressConfig() throws ConfigurationException
@@ -2342,4 +2367,25 @@ public class DatabaseDescriptor
     {
         return 
Integer.parseInt(System.getProperty("cassandra.search_concurrency_factor", 
"1"));
     }
+
+    public static void setBackPressureEnabled(boolean backPressureEnabled)
+    {
+        conf.back_pressure_enabled = backPressureEnabled;
+    }
+
+    public static boolean backPressureEnabled()
+    {
+        return conf.back_pressure_enabled;
+    }
+
+    @VisibleForTesting
+    public static void setBackPressureStrategy(BackPressureStrategy strategy)
+    {
+        backPressureStrategy = strategy;
+    }
+
+    public static BackPressureStrategy getBackPressureStrategy()
+    {
+        return backPressureStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 29bab80..6940e1e 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -209,5 +209,11 @@ final class HintsDispatcher implements AutoCloseable
         {
             return false;
         }
+
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return true;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java 
b/src/java/org/apache/cassandra/net/BackPressureState.java
new file mode 100644
index 0000000..34fd0dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+/**
+ * Interface meant to track the back-pressure state per replica host.
+ */
+public interface BackPressureState
+{
+    /**
+     * Called when a message is sent to a replica.
+     */
+    void onMessageSent(MessageOut<?> message);
+
+    /**
+     * Called when a response is received from a replica.
+     */
+    void onResponseReceived();
+
+    /**
+     * Called when no response is received from replica.
+     */
+    void onResponseTimeout();
+
+    /**
+     * Gets the current back-pressure rate limit.
+     */
+    double getBackPressureRateLimit();
+
+    /**
+     * Returns the host this state refers to.
+     */
+    InetAddress getHost();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java 
b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
new file mode 100644
index 0000000..b61a0a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Back-pressure algorithm interface.
+ * <br/>
+ * For experts usage only. Implementors must provide a constructor accepting a 
single {@code Map<String, Object>} argument,
+ * representing any parameters eventually required by the specific 
implementation.
+ */
+public interface BackPressureStrategy<S extends BackPressureState>
+{
+    /**
+     * Applies the back-pressure algorithm, based and acting on the given 
{@link BackPressureState}s, and up to the given
+     * timeout.
+     */
+    void apply(Set<S> states, long timeout, TimeUnit unit);
+
+    /**
+     * Creates a new {@link BackPressureState} initialized as needed by the 
specific implementation.
+     */
+    S newState(InetAddress host);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java 
b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index d159e0c..7835079 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -49,4 +49,9 @@ public interface IAsyncCallback<T>
      * given as input to the dynamic snitch.
      */
     boolean isLatencyForSnitch();
+
+    default boolean supportsBackPressure()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index e72a9a2..7632ebd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -39,8 +41,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.ExecutorLocals;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
@@ -419,7 +423,6 @@ public final class MessagingService implements 
MessagingServiceMBean
                                                                    
Verb.BATCH_STORE,
                                                                    
Verb.BATCH_REMOVE);
 
-
     private static final class DroppedMessages
     {
         final DroppedMessageMetrics metrics;
@@ -445,20 +448,8 @@ public final class MessagingService implements 
MessagingServiceMBean
     // message sinks are a testing hook
     private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
 
-    public void addMessageSink(IMessageSink sink)
-    {
-        messageSinks.add(sink);
-    }
-
-    public void removeMessageSink(IMessageSink sink)
-    {
-        messageSinks.remove(sink);
-    }
-
-    public void clearMessageSinks()
-    {
-        messageSinks.clear();
-    }
+    // back-pressure implementation
+    private final BackPressureStrategy backPressure = 
DatabaseDescriptor.getBackPressureStrategy();
 
     private static class MSHandle
     {
@@ -504,9 +495,17 @@ public final class MessagingService implements 
MessagingServiceMBean
             public Object apply(Pair<Integer, 
ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
                 final CallbackInfo expiredCallbackInfo = pair.right.value;
+
                 maybeAddLatency(expiredCallbackInfo.callback, 
expiredCallbackInfo.target, pair.right.timeout);
+
                 ConnectionMetrics.totalTimeouts.mark();
                 
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+
+                if (expiredCallbackInfo.callback.supportsBackPressure())
+                {
+                    updateBackPressureOnReceive(expiredCallbackInfo.target, 
expiredCallbackInfo.callback, true);
+                }
+
                 if (expiredCallbackInfo.isFailureCallback())
                 {
                     StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new 
Runnable()
@@ -545,6 +544,76 @@ public final class MessagingService implements 
MessagingServiceMBean
         }
     }
 
+    public void addMessageSink(IMessageSink sink)
+    {
+        messageSinks.add(sink);
+    }
+
+    public void removeMessageSink(IMessageSink sink)
+    {
+        messageSinks.remove(sink);
+    }
+
+    public void clearMessageSinks()
+    {
+        messageSinks.clear();
+    }
+
+    /**
+     * Updates the back-pressure state on sending to the given host if enabled 
and the given message callback supports it.
+     *
+     * @param host The replica host the back-pressure state refers to.
+     * @param callback The message callback.
+     * @param message The actual message.
+     */
+    public void updateBackPressureOnSend(InetAddress host, IAsyncCallback 
callback, MessageOut<?> message)
+    {
+        if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
+        {
+            BackPressureState backPressureState = 
getConnectionPool(host).getBackPressureState();
+            backPressureState.onMessageSent(message);
+        }
+    }
+
+    /**
+     * Updates the back-pressure state on reception from the given host if 
enabled and the given message callback supports it.
+     *
+     * @param host The replica host the back-pressure state refers to.
+     * @param callback The message callback.
+     * @param timeout True if updated following a timeout, false otherwise.
+     */
+    public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback 
callback, boolean timeout)
+    {
+        if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
+        {
+            BackPressureState backPressureState = 
getConnectionPool(host).getBackPressureState();
+            if (!timeout)
+                backPressureState.onResponseReceived();
+            else
+                backPressureState.onResponseTimeout();
+        }
+    }
+
+    /**
+     * Applies back-pressure for the given hosts, according to the configured 
strategy.
+     *
+     * If the local host is present, it is removed from the pool, as 
back-pressure is only applied
+     * to remote hosts.
+     *
+     * @param hosts The hosts to apply back-pressure to.
+     * @param timeoutInNanos The max back-pressure timeout.
+     */
+    public void applyBackPressure(Iterable<InetAddress> hosts, long 
timeoutInNanos)
+    {
+        if (DatabaseDescriptor.backPressureEnabled())
+        {
+            backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
+                    .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
+                    .map(h -> getConnectionPool(h).getBackPressureState())
+                    .collect(Collectors.toSet()), timeoutInNanos, 
TimeUnit.NANOSECONDS);
+        }
+    }
+
     /**
      * Track latency information for the dynamic snitch
      *
@@ -699,7 +768,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         OutboundTcpConnectionPool cp = connectionManagers.get(to);
         if (cp == null)
         {
-            cp = new OutboundTcpConnectionPool(to);
+            cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
             OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
             if (existingPool != null)
                 cp = existingPool;
@@ -805,6 +874,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout, boolean failureCallback)
     {
         int id = addCallback(cb, message, to, timeout, failureCallback);
+        updateBackPressureOnSend(to, cb, message);
         sendOneWay(failureCallback ? 
message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
         return id;
     }
@@ -827,6 +897,7 @@ public final class MessagingService implements 
MessagingServiceMBean
                       boolean allowHints)
     {
         int id = addCallback(handler, message, to, message.getTimeout(), 
handler.consistencyLevel, allowHints);
+        updateBackPressureOnSend(to, handler, message);
         sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), 
id, to);
         return id;
     }
@@ -1379,6 +1450,27 @@ public final class MessagingService implements 
MessagingServiceMBean
         return result;
     }
 
+    public Map<String, Double> getBackPressurePerHost()
+    {
+        Map<String, Double> map = new HashMap<>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
+            map.put(entry.getKey().getHostAddress(), 
entry.getValue().getBackPressureState().getBackPressureRateLimit());
+
+        return map;
+    }
+
+    @Override
+    public void setBackPressureEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setBackPressureEnabled(enabled);
+    }
+
+    @Override
+    public boolean isBackPressureEnabled()
+    {
+        return DatabaseDescriptor.backPressureEnabled();
+    }
+
     public static IPartitioner globalPartitioner()
     {
         return StorageService.instance.getTokenMetadata().partitioner;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java 
b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 3bcb0d5..b2e79e0 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -23,8 +23,7 @@ import java.net.UnknownHostException;
 import java.util.Map;
 
 /**
- * MBean exposing MessagingService metrics.
- * - OutboundConnectionPools - Command/Response - Pending/Completed Tasks
+ * MBean exposing MessagingService metrics plus allowing to enable/disable 
back-pressure.
  */
 public interface MessagingServiceMBean
 {
@@ -88,5 +87,20 @@ public interface MessagingServiceMBean
      */
     public Map<String, Long> getTimeoutsPerHost();
 
+    /**
+     * Back-pressure rate limiting per host
+     */
+    public Map<String, Double> getBackPressurePerHost();
+
+    /**
+     * Enable/Disable back-pressure
+     */
+    public void setBackPressureEnabled(boolean enabled);
+
+    /**
+     * Get back-pressure enabled state
+     */
+    public boolean isBackPressureEnabled();
+
     public int getVersion(String address) throws UnknownHostException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 0418ff6..b0391ba 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -50,7 +50,10 @@ public class OutboundTcpConnectionPool
     private InetAddress resetEndpoint;
     private ConnectionMetrics metrics;
 
-    OutboundTcpConnectionPool(InetAddress remoteEp)
+    // back-pressure state linked to this connection:
+    private final BackPressureState backPressureState;
+
+    OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState 
backPressureState)
     {
         id = remoteEp;
         resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
@@ -59,6 +62,8 @@ public class OutboundTcpConnectionPool
         smallMessages = new OutboundTcpConnection(this, "Small");
         largeMessages = new OutboundTcpConnection(this, "Large");
         gossipMessages = new OutboundTcpConnection(this, "Gossip");
+
+        this.backPressureState = backPressureState;
     }
 
     /**
@@ -74,6 +79,11 @@ public class OutboundTcpConnectionPool
                : smallMessages;
     }
 
+    public BackPressureState getBackPressureState()
+    {
+        return backPressureState;
+    }
+
     void reset()
     {
         for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages, gossipMessages })

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java 
b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
new file mode 100644
index 0000000..1dae243
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.SystemTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * Back-pressure algorithm based on rate limiting according to the ratio 
between incoming and outgoing rates, computed
+ * over a sliding time window with size equal to write RPC timeout.
+ */
+public class RateBasedBackPressure implements 
BackPressureStrategy<RateBasedBackPressureState>
+{
+    static final String HIGH_RATIO = "high_ratio";
+    static final String FACTOR = "factor";
+    static final String FLOW = "flow";
+    private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
+    private static final String BACK_PRESSURE_FACTOR = "5";
+    private static final String BACK_PRESSURE_FLOW = "FAST";
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RateBasedBackPressure.class);
+    private static final NoSpamLogger tenSecsNoSpamLogger = 
NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+    private static final NoSpamLogger oneMinNoSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    protected final TimeSource timeSource;
+    protected final double highRatio;
+    protected final int factor;
+    protected final Flow flow;
+    protected final long windowSize;
+
+    private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> 
rateLimiters =
+            CacheBuilder.newBuilder().expireAfterAccess(1, 
TimeUnit.HOURS).build();
+
+    enum Flow
+    {
+        FAST,
+        SLOW
+    }
+
+    public static ParameterizedClass withDefaultParams()
+    {
+        return new ParameterizedClass(RateBasedBackPressure.class.getName(),
+                                      ImmutableMap.of(HIGH_RATIO, 
BACK_PRESSURE_HIGH_RATIO,
+                                                      FACTOR, 
BACK_PRESSURE_FACTOR,
+                                                      FLOW, 
BACK_PRESSURE_FLOW));
+    }
+
+    public RateBasedBackPressure(Map<String, Object> args)
+    {
+        this(args, new SystemTimeSource(), 
DatabaseDescriptor.getWriteRpcTimeout());
+    }
+
+    @VisibleForTesting
+    public RateBasedBackPressure(Map<String, Object> args, TimeSource 
timeSource, long windowSize)
+    {
+        if (args.size() != 3)
+        {
+            throw new 
IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
+                    + " requires 3 arguments: high ratio, back-pressure factor 
and flow type.");
+        }
+
+        try
+        {
+            highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, 
"").toString().trim());
+            factor = Integer.parseInt(args.getOrDefault(FACTOR, 
"").toString().trim());
+            flow = Flow.valueOf(args.getOrDefault(FLOW, 
"").toString().trim().toUpperCase());
+        }
+        catch (Exception ex)
+        {
+            throw new IllegalArgumentException(ex.getMessage(), ex);
+        }
+
+        if (highRatio <= 0 || highRatio > 1)
+        {
+            throw new IllegalArgumentException("Back-pressure high ratio must 
be > 0 and <= 1");
+        }
+        if (factor < 1)
+        {
+            throw new IllegalArgumentException("Back-pressure factor must be 
>= 1");
+        }
+        if (windowSize < 10)
+        {
+            throw new IllegalArgumentException("Back-pressure window size must 
be >= 10");
+        }
+
+        this.timeSource = timeSource;
+        this.windowSize = windowSize;
+
+        logger.info("Initialized back-pressure with high ratio: {}, factor: 
{}, flow: {}, window size: {}.",
+                    highRatio, factor, flow, windowSize);
+    }
+
+    @Override
+    public void apply(Set<RateBasedBackPressureState> states, long timeout, 
TimeUnit unit)
+    {
+        // Go through the back-pressure states, try updating each of them and 
collect min/max rates:
+        boolean isUpdated = false;
+        double minRateLimit = Double.POSITIVE_INFINITY;
+        double maxRateLimit = Double.NEGATIVE_INFINITY;
+        double minIncomingRate = Double.POSITIVE_INFINITY;
+        RateLimiter currentMin = null;
+        RateLimiter currentMax = null;
+        for (RateBasedBackPressureState backPressure : states)
+        {
+            // Get the incoming/outgoing rates:
+            double incomingRate = 
backPressure.incomingRate.get(TimeUnit.SECONDS);
+            double outgoingRate = 
backPressure.outgoingRate.get(TimeUnit.SECONDS);
+            // Compute the min incoming rate:
+            if (incomingRate < minIncomingRate)
+                minIncomingRate = incomingRate;
+
+            // Try acquiring the interval lock:
+            if (backPressure.tryIntervalLock(windowSize))
+            {
+                // If acquired, proceed updating thi back-pressure state rate 
limit:
+                isUpdated = true;
+                try
+                {
+                    RateLimiter limiter = backPressure.rateLimiter;
+
+                    // If we have sent any outgoing requests during this time 
window, go ahead with rate limiting
+                    // (this is safe against concurrent back-pressure state 
updates thanks to the rw-locking in
+                    // RateBasedBackPressureState):
+                    if (outgoingRate > 0)
+                    {
+                        // Compute the incoming/outgoing ratio:
+                        double actualRatio = incomingRate / outgoingRate;
+
+                        // If the ratio is above the high mark, try growing by 
the back-pressure factor:
+                        if (actualRatio >= highRatio)
+                        {
+                            // Only if the outgoing rate is able to keep up 
with the rate increase:
+                            if (limiter.getRate() <= outgoingRate)
+                            {
+                                double newRate = limiter.getRate() + 
((limiter.getRate() * factor) / 100);
+                                if (newRate > 0 && newRate != 
Double.POSITIVE_INFINITY)
+                                {
+                                    limiter.setRate(newRate);
+                                }
+                            }
+                        }
+                        // If below, set the rate limiter at the incoming 
rate, decreased by factor:
+                        else
+                        {
+                            // Only if the new rate is actually less than the 
actual rate:
+                            double newRate = incomingRate - ((incomingRate * 
factor) / 100);
+                            if (newRate > 0 && newRate < limiter.getRate())
+                            {
+                                limiter.setRate(newRate);
+                            }
+                        }
+
+                        logger.trace("Back-pressure state for {}: incoming 
rate {}, outgoing rate {}, ratio {}, rate limiting {}",
+                                     backPressure.getHost(), incomingRate, 
outgoingRate, actualRatio, limiter.getRate());
+                    }
+                    // Otherwise reset the rate limiter:
+                    else
+                    {
+                        limiter.setRate(Double.POSITIVE_INFINITY);
+                    }
+
+                    // Housekeeping: pruning windows and resetting the last 
check timestamp!
+                    backPressure.incomingRate.prune();
+                    backPressure.outgoingRate.prune();
+                }
+                finally
+                {
+                    backPressure.releaseIntervalLock();
+                }
+            }
+            if (backPressure.rateLimiter.getRate() <= minRateLimit)
+            {
+                minRateLimit = backPressure.rateLimiter.getRate();
+                currentMin = backPressure.rateLimiter;
+            }
+            if (backPressure.rateLimiter.getRate() >= maxRateLimit)
+            {
+                maxRateLimit = backPressure.rateLimiter.getRate();
+                currentMax = backPressure.rateLimiter;
+            }
+        }
+
+        // Now find the rate limiter corresponding to the replica group 
represented by these back-pressure states:
+        if (!states.isEmpty())
+        {
+            try
+            {
+                // Get the rate limiter:
+                IntervalRateLimiter rateLimiter = rateLimiters.get(states, () 
-> new IntervalRateLimiter(timeSource));
+
+                // If the back-pressure was updated and we acquire the 
interval lock for the rate limiter of this group:
+                if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
+                {
+                    try
+                    {
+                        // Update the rate limiter value based on the 
configured flow:
+                        if (flow.equals(Flow.FAST))
+                            rateLimiter.limiter = currentMax;
+                        else
+                            rateLimiter.limiter = currentMin;
+
+                        tenSecsNoSpamLogger.info("{} currently applied for 
remote replicas: {}", rateLimiter.limiter, states);
+                    }
+                    finally
+                    {
+                        rateLimiter.releaseIntervalLock();
+                    }
+                }
+                // Assigning a single rate limiter per replica group once per 
window size allows the back-pressure rate
+                // limiting to be stable within the group itself.
+
+                // Finally apply the rate limit with a max pause time equal to 
the provided timeout minus the
+                // response time computed from the incoming rate, to reduce 
the number of client timeouts by taking into
+                // account how long it could take to process responses after 
back-pressure:
+                long responseTimeInNanos = (long) 
(TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
+                doRateLimit(rateLimiter.limiter, Math.max(0, 
TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
+            }
+            catch (ExecutionException ex)
+            {
+                throw new IllegalStateException(ex);
+            }
+        }
+    }
+
+    @Override
+    public RateBasedBackPressureState newState(InetAddress host)
+    {
+        return new RateBasedBackPressureState(host, timeSource, windowSize);
+    }
+
+    @VisibleForTesting
+    RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> 
states)
+    {
+        IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
+        return rateLimiter != null ? rateLimiter.limiter : 
RateLimiter.create(Double.POSITIVE_INFINITY);
+    }
+
+    @VisibleForTesting
+    boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+    {
+        if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
+        {
+            timeSource.sleepUninterruptibly(timeoutInNanos, 
TimeUnit.NANOSECONDS);
+            oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write 
timeout, pausing {} nanoseconds instead.",
+                                    rateLimiter, timeoutInNanos);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private static class IntervalRateLimiter extends IntervalLock
+    {
+        public volatile RateLimiter limiter = 
RateLimiter.create(Double.POSITIVE_INFINITY);
+
+        IntervalRateLimiter(TimeSource timeSource)
+        {
+            super(timeSource);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java 
b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
new file mode 100644
index 0000000..c19f277
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.utils.SlidingTimeRate;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * The rate-based back-pressure state, tracked per replica host.
+ * <br/><br/>
+ *
+ * This back-pressure state is made up of the following attributes:
+ * <ul>
+ * <li>windowSize: the length of the back-pressure window in milliseconds.</li>
+ * <li>incomingRate: the rate of back-pressure supporting incoming 
messages.</li>
+ * <li>outgoingRate: the rate of back-pressure supporting outgoing 
messages.</li>
+ * <li>rateLimiter: the rate limiter to eventually apply to outgoing 
messages.</li>
+ * </ul>
+ * <br/>
+ * The incomingRate and outgoingRate are updated together when a response is 
received to guarantee consistency between
+ * the two.
+ * <br/>
+ * It also provides methods to exclusively lock/release back-pressure windows 
at given intervals;
+ * this allows to apply back-pressure even under concurrent modifications. 
Please also note a read lock is acquired
+ * during response processing so that no concurrent rate updates can screw 
rate computations.
+ */
+class RateBasedBackPressureState extends IntervalLock implements 
BackPressureState
+{
+    private final InetAddress host;
+    private final long windowSize;
+    final SlidingTimeRate incomingRate;
+    final SlidingTimeRate outgoingRate;
+    final RateLimiter rateLimiter;
+
+    RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long 
windowSize)
+    {
+        super(timeSource);
+        this.host = host;
+        this.windowSize = windowSize;
+        this.incomingRate = new SlidingTimeRate(timeSource, this.windowSize, 
this.windowSize / 10, TimeUnit.MILLISECONDS);
+        this.outgoingRate = new SlidingTimeRate(timeSource, this.windowSize, 
this.windowSize / 10, TimeUnit.MILLISECONDS);
+        this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
+    }
+
+    @Override
+    public void onMessageSent(MessageOut<?> message) {}
+
+    @Override
+    public void onResponseReceived()
+    {
+        readLock().lock();
+        try
+        {
+            incomingRate.update(1);
+            outgoingRate.update(1);
+        }
+        finally
+        {
+            readLock().unlock();
+        }
+    }
+
+    @Override
+    public void onResponseTimeout()
+    {
+        readLock().lock();
+        try
+        {
+            outgoingRate.update(1);
+        }
+        finally
+        {
+            readLock().unlock();
+        }
+    }
+
+    @Override
+    public double getBackPressureRateLimit()
+    {
+        return rateLimiter.getRate();
+    }
+
+    @Override
+    public InetAddress getHost()
+    {
+        return host;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj instanceof RateBasedBackPressureState)
+        {
+            RateBasedBackPressureState other = (RateBasedBackPressureState) 
obj;
+            return this.host.equals(other.host);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return this.host.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("[host: %s, incoming rate: %.3f, outgoing rate: 
%.3f, rate limit: %.3f]",
+                             host, incomingRate.get(TimeUnit.SECONDS), 
outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 89e1051..fe22e42 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -52,5 +52,10 @@ public class ResponseVerbHandler implements IVerbHandler
             MessagingService.instance().maybeAddLatency(cb, message.from, 
latency);
             cb.response(message);
         }
+
+        if (callbackInfo.callback.supportsBackPressure())
+        {
+            
MessagingService.instance().updateBackPressureOnReceive(message.from, cb, 
false);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 7cc854a..8c30b89 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     private volatile int failures = 0;
     private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
     private final long queryStartNanoTime;
+    private volatile boolean supportsBackPressure = true;
 
     /**
      * @param callback A callback to be called when the write is successful.
@@ -78,11 +80,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
     public void get() throws WriteTimeoutException, WriteFailureException
     {
-        long requestTimeout = writeType == WriteType.COUNTER
-                            ? DatabaseDescriptor.getCounterWriteRpcTimeout()
-                            : DatabaseDescriptor.getWriteRpcTimeout();
-
-        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - 
(System.nanoTime() - queryStartNanoTime);
+        long timeout = currentTimeout();
 
         boolean success;
         try
@@ -112,6 +110,14 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
         }
     }
 
+    public final long currentTimeout()
+    {
+        long requestTimeout = writeType == WriteType.COUNTER
+                              ? DatabaseDescriptor.getCounterWriteRpcTimeout()
+                              : DatabaseDescriptor.getWriteRpcTimeout();
+        return TimeUnit.MILLISECONDS.toNanos(requestTimeout) - 
(System.nanoTime() - queryStartNanoTime);
+    }
+
     /**
      * @return the minimum number of endpoints that must reply.
      */
@@ -172,4 +178,15 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
         if (totalBlockFor() + n > totalEndpoints())
             signal();
     }
+
+    @Override
+    public boolean supportsBackPressure()
+    {
+        return supportsBackPressure;
+    }
+
+    public void setSupportsBackPressure(boolean supportsBackPressure)
+    {
+        this.supportsBackPressure = supportsBackPressure;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ca8a9c6..241aa4f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -33,7 +34,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -527,6 +530,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
             responseHandler = rs.getWriteResponseHandler(naturalEndpoints, 
pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler.setSupportsBackPressure(false);
         }
 
         MessageOut<Commit> message = new 
MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, 
Commit.serializer);
@@ -1224,6 +1228,10 @@ public class StorageProxy implements StorageProxyMBean
                                              Stage stage)
     throws OverloadedException
     {
+        int targetsSize = Iterables.size(targets);
+
+        // this dc replicas:
+        Collection<InetAddress> localDc = null;
         // extra-datacenter replicas, grouped by dc
         Map<String, Collection<InetAddress>> dcGroups = null;
         // only need to create a Message for non-local writes
@@ -1232,6 +1240,8 @@ public class StorageProxy implements StorageProxyMBean
         boolean insertLocal = false;
         ArrayList<InetAddress> endpointsToHint = null;
 
+        List<InetAddress> backPressureHosts = null;
+
         for (InetAddress destination : targets)
         {
             checkHintOverload(destination);
@@ -1247,12 +1257,17 @@ public class StorageProxy implements StorageProxyMBean
                     // belongs on a different server
                     if (message == null)
                         message = mutation.createMessage();
+
                     String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
                     // direct writes to local DC or old Cassandra versions
                     // (1.1 knows how to forward old-style String message IDs; 
updated to int in 2.0)
                     if (localDataCenter.equals(dc))
                     {
-                        MessagingService.instance().sendRR(message, 
destination, responseHandler, true);
+                        if (localDc == null)
+                            localDc = new ArrayList<>(targetsSize);
+
+                        localDc.add(destination);
                     }
                     else
                     {
@@ -1264,8 +1279,14 @@ public class StorageProxy implements StorageProxyMBean
                                 dcGroups = new HashMap<>();
                             dcGroups.put(dc, messages);
                         }
+
                         messages.add(destination);
                     }
+
+                    if (backPressureHosts == null)
+                        backPressureHosts = new ArrayList<>(targetsSize);
+
+                    backPressureHosts.add(destination);
                 }
             }
             else
@@ -1273,24 +1294,30 @@ public class StorageProxy implements StorageProxyMBean
                 if (shouldHint(destination))
                 {
                     if (endpointsToHint == null)
-                        endpointsToHint = new 
ArrayList<>(Iterables.size(targets));
+                        endpointsToHint = new ArrayList<>(targetsSize);
+
                     endpointsToHint.add(destination);
                 }
             }
         }
 
+        if (backPressureHosts != null)
+            MessagingService.instance().applyBackPressure(backPressureHosts, 
responseHandler.currentTimeout());
+
         if (endpointsToHint != null)
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
             performLocally(stage, Optional.of(mutation), mutation::apply, 
responseHandler);
 
+        if (localDc != null)
+        {
+            for (InetAddress destination : localDc)
+                MessagingService.instance().sendRR(message, destination, 
responseHandler, true);
+        }
         if (dcGroups != null)
         {
             // for each datacenter, send the message to one node to relay the 
write to other replicas
-            if (message == null)
-                message = mutation.createMessage();
-
             for (Collection<InetAddress> dcTargets : dcGroups.values())
                 sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SlidingTimeRate.java 
b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
new file mode 100644
index 0000000..3053a05
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Concurrent rate computation over a sliding time window.
+ */
+public class SlidingTimeRate
+{
+    private final ConcurrentSkipListMap<Long, AtomicInteger> counters = new 
ConcurrentSkipListMap<>();
+    private final AtomicLong lastCounterTimestamp = new AtomicLong(0);
+    private final ReadWriteLock pruneLock = new ReentrantReadWriteLock();
+    private final long sizeInMillis;
+    private final long precisionInMillis;
+    private final TimeSource timeSource;
+
+    /**
+     * Creates a sliding rate whose time window is of the given size, with the 
given precision and time unit.
+     * <br/>
+     * The precision defines how accurate the rate computation is, as it will 
be computed over window size +/-
+     * precision.
+     */
+    public SlidingTimeRate(TimeSource timeSource, long size, long precision, 
TimeUnit unit)
+    {
+        Preconditions.checkArgument(size > precision, "Size should be greater 
than precision.");
+        Preconditions.checkArgument(TimeUnit.MILLISECONDS.convert(precision, 
unit) >= 1, "Precision must be greater than or equal to 1 millisecond.");
+        this.sizeInMillis = TimeUnit.MILLISECONDS.convert(size, unit);
+        this.precisionInMillis = TimeUnit.MILLISECONDS.convert(precision, 
unit);
+        this.timeSource = timeSource;
+    }
+
+    /**
+     * Updates the rate.
+     */
+    public void update(int delta)
+    {
+        pruneLock.readLock().lock();
+        try
+        {
+            while (true)
+            {
+                long now = timeSource.currentTimeMillis();
+                long lastTimestamp = lastCounterTimestamp.get();
+                boolean isWithinPrecisionRange = (now - lastTimestamp) < 
precisionInMillis;
+                AtomicInteger lastCounter = counters.get(lastTimestamp);
+                // If there's a valid counter for the current last timestamp, 
and we're in the precision range,
+                // update such counter:
+                if (lastCounter != null && isWithinPrecisionRange)
+                {
+                    lastCounter.addAndGet(delta);
+
+                    break;
+                }
+                // Else if there's no counter or we're past the precision 
range, try to create a new counter,
+                // but only the thread updating the last timestamp will create 
a new counter:
+                else if (lastCounterTimestamp.compareAndSet(lastTimestamp, 
now))
+                {
+                    AtomicInteger existing = counters.putIfAbsent(now, new 
AtomicInteger(delta));
+                    if (existing != null)
+                    {
+                        existing.addAndGet(delta);
+                    }
+
+                    break;
+                }
+            }
+        }
+        finally
+        {
+            pruneLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets the current rate in the given time unit from the beginning of the 
time window to the
+     * provided point in time ago.
+     */
+    public double get(long toAgo, TimeUnit unit)
+    {
+        pruneLock.readLock().lock();
+        try
+        {
+            long toAgoInMillis = TimeUnit.MILLISECONDS.convert(toAgo, unit);
+            Preconditions.checkArgument(toAgoInMillis < sizeInMillis, "Cannot 
get rate in the past!");
+
+            long now = timeSource.currentTimeMillis();
+            long sum = 0;
+            ConcurrentNavigableMap<Long, AtomicInteger> tailCounters = counters
+                    .tailMap(now - sizeInMillis, true)
+                    .headMap(now - toAgoInMillis, true);
+            for (AtomicInteger i : tailCounters.values())
+            {
+                sum += i.get();
+            }
+
+            double rateInMillis = sum == 0
+                                  ? sum
+                                  : sum / (double) Math.max(1000, (now - 
toAgoInMillis) - tailCounters.firstKey());
+            double multiplier = TimeUnit.MILLISECONDS.convert(1, unit);
+            return rateInMillis * multiplier;
+        }
+        finally
+        {
+            pruneLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets the current rate in the given time unit.
+     */
+    public double get(TimeUnit unit)
+    {
+        return get(0, unit);
+    }
+
+    /**
+     * Prunes the time window of old unused updates.
+     */
+    public void prune()
+    {
+        pruneLock.writeLock().lock();
+        try
+        {
+            long now = timeSource.currentTimeMillis();
+            counters.headMap(now - sizeInMillis, false).clear();
+        }
+        finally
+        {
+            pruneLock.writeLock().unlock();
+        }
+    }
+
+    @VisibleForTesting
+    public int size()
+    {
+        return counters.values().stream().reduce(new AtomicInteger(), (v1, v2) 
-> {
+            v1.addAndGet(v2.get());
+            return v1;
+        }).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SystemTimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SystemTimeSource.java 
b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
new file mode 100644
index 0000000..fef525e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Time source backed by JVM clock.
+ */
+public class SystemTimeSource implements TimeSource
+{
+    @Override
+    public long currentTimeMillis()
+    {
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public long nanoTime()
+    {
+        return System.nanoTime();
+    }
+
+    @Override
+    public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+    {
+        Uninterruptibles.sleepUninterruptibly(sleepFor, unit);
+        return this;
+    }
+
+    @Override
+    public TimeSource sleep(long sleepFor, TimeUnit unit) throws 
InterruptedException
+    {
+        TimeUnit.NANOSECONDS.sleep(TimeUnit.NANOSECONDS.convert(sleepFor, 
unit));
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TestRateLimiter.java 
b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
new file mode 100644
index 0000000..a9eb871
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information * regarding copyright 
ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.jboss.byteman.rule.Rule;
+import org.jboss.byteman.rule.helper.Helper;
+
+/**
+ * Helper class to apply rate limiting during fault injection testing;
+ * for an example script, see test/resources/byteman/mutation_limiter.btm.
+ */
+@VisibleForTesting
+public class TestRateLimiter extends Helper
+{
+    private static final AtomicReference<RateLimiter> ref = new 
AtomicReference<>();
+
+    protected TestRateLimiter(Rule rule)
+    {
+        super(rule);
+    }
+
+    /**
+     * Acquires a single unit at the given rate. If the rate changes between 
calls, a new rate limiter is created
+     * and the old one is discarded.
+     */
+    public void acquire(double rate)
+    {
+        RateLimiter limiter = ref.get();
+        if (limiter == null || limiter.getRate() != rate)
+        {
+            ref.compareAndSet(limiter, RateLimiter.create(rate));
+            limiter = ref.get();
+        }
+        limiter.acquire(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TimeSource.java 
b/src/java/org/apache/cassandra/utils/TimeSource.java
new file mode 100644
index 0000000..5d8acec
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TimeSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TimeSource
+{
+    /**
+     *
+     * @return the current time in milliseconds
+     */
+    long currentTimeMillis();
+
+    /**
+     *
+     * @return Returns the current time value in nanoseconds.
+     *
+     * <p>This method can only be used to measure elapsed time and is
+     * not related to any other notion of system or wall-clock time.
+     */
+    long nanoTime();
+
+    /**
+     * Sleep for the given amount of time uninterruptibly.
+     *
+     * @param  sleepFor given amout.
+     * @param  unit time unit
+     * @return The time source itself after the given sleep period.
+     */
+    TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit);
+
+    /**
+     * Sleep for the given amount of time. This operation could interrupted.
+     * Hence after returning from this method, it is not guaranteed
+     * that the request amount of time has passed.
+     *
+     * @param  sleepFor given amout.
+     * @param  unit time unit
+     * @return The time source itself after the given sleep period.
+     */
+    TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java 
b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
new file mode 100644
index 0000000..382a2dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.TimeSource;
+
+/**
+ * This class extends ReentrantReadWriteLock to provide a write lock that can 
only be acquired at provided intervals.
+ */
+public class IntervalLock extends ReentrantReadWriteLock
+{
+    private final AtomicLong lastAcquire = new AtomicLong();
+    private final TimeSource timeSource;
+
+    public IntervalLock(TimeSource timeSource)
+    {
+        this.timeSource = timeSource;
+    }
+
+    /**
+     * Try acquiring a write lock if the given interval is passed since the 
last call to this method.
+     *
+     * @param interval In millis.
+     * @return True if acquired and locked, false otherwise.
+     */
+    public boolean tryIntervalLock(long interval)
+    {
+        long now = timeSource.currentTimeMillis();
+        boolean acquired = (now - lastAcquire.get() >= interval) && 
writeLock().tryLock();
+        if (acquired)
+            lastAcquire.set(now);
+
+        return acquired;
+    }
+
+    /**
+     * Release the last acquired interval lock.
+     */
+    public void releaseIntervalLock()
+    {
+        writeLock().unlock();
+    }
+
+    @VisibleForTesting
+    public long getLastIntervalAcquire()
+    {
+        return lastAcquire.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/resources/byteman/mutation_limiter.btm
----------------------------------------------------------------------
diff --git a/test/resources/byteman/mutation_limiter.btm 
b/test/resources/byteman/mutation_limiter.btm
new file mode 100644
index 0000000..ca936fc
--- /dev/null
+++ b/test/resources/byteman/mutation_limiter.btm
@@ -0,0 +1,8 @@
+RULE mutation_limiter
+CLASS org.apache.cassandra.db.MutationVerbHandler
+METHOD doVerb
+HELPER org.apache.cassandra.utils.TestRateLimiter
+AT ENTRY
+IF TRUE
+DO acquire(1000.0)
+ENDRULE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 63c44e0..2dcfbd1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -101,6 +101,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.DiskOptimizationStrategy",
     "org.apache.cassandra.locator.SimpleSeedProvider",
     "org.apache.cassandra.locator.SeedProvider",
+    "org.apache.cassandra.net.BackPressureStrategy",
     "org.apache.cassandra.scheduler.IRequestScheduler",
     "org.apache.cassandra.security.EncryptionContext",
     "org.apache.cassandra.service.CacheService$CacheType",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 0e0e4ba..2a3ecbe 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -24,34 +24,53 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
+
 import com.codahale.metrics.Timer;
 
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.caffinitas.ohc.histo.EstimatedHistogram;
+
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import static org.junit.Assert.*;
+
 public class MessagingServiceTest
 {
+    private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, 
TimeUnit.SECONDS);
+    private final static long[] bucketOffsets = new 
EstimatedHistogram(160).getBucketOffsets();
+    private final MessagingService messagingService = MessagingService.test();
+
     @BeforeClass
-    public static void initDD()
+    public static void beforeClass() throws UnknownHostException
     {
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setBackPressureStrategy(new 
MockBackPressureStrategy(Collections.emptyMap()));
+        
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
     }
 
-    private final MessagingService messagingService = MessagingService.test();
-    private final static long[] bucketOffsets = new 
EstimatedHistogram(160).getBucketOffsets();
+    @Before
+    public void before() throws UnknownHostException
+    {
+        MockBackPressureStrategy.applied = false;
+        
messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2"));
+        
messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
+    }
 
     @Test
     public void testDroppedMessages()
@@ -77,17 +96,6 @@ public class MessagingServiceTest
         assertEquals(7500, 
(int)messagingService.getDroppedMessages().get(verb.toString()));
     }
 
-    private static void addDCLatency(long sentAt, long now) throws IOException
-    {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
-        {
-            out.writeInt((int) sentAt);
-        }
-        DataInputStreamPlus in = new DataInputStreamPlus(new 
ByteArrayInputStream(baos.toByteArray()));
-        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
-    }
-
     @Test
     public void testDCLatency() throws Exception
     {
@@ -123,4 +131,211 @@ public class MessagingServiceTest
         addDCLatency(sentAt, now);
         assertNull(dcLatency.get("datacenter1"));
     }
+
+    @Test
+    public void 
testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        MessageOut<?> ignored = null;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), 
noCallback, ignored);
+        assertFalse(backPressureState.onSend);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        
messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), 
bpCallback, ignored);
+        assertFalse(backPressureState.onSend);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), 
bpCallback, ignored);
+        assertTrue(backPressureState.onSend);
+    }
+
+    @Test
+    public void 
testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        boolean timeout = false;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 noCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 bpCallback, timeout);
+        assertTrue(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+    }
+
+    @Test
+    public void 
testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        boolean timeout = true;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 noCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"),
 bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertTrue(backPressureState.onTimeout);
+    }
+
+    @Test
+    public void testAppliesBackPressureWhenEnabled() throws 
UnknownHostException
+    {
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        
messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")),
 ONE_SECOND);
+        assertFalse(MockBackPressureStrategy.applied);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")),
 ONE_SECOND);
+        assertTrue(MockBackPressureStrategy.applied);
+    }
+
+    @Test
+    public void testDoesntApplyBackPressureToBroadcastAddress() throws 
UnknownHostException
+    {
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        
messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")),
 ONE_SECOND);
+        assertFalse(MockBackPressureStrategy.applied);
+    }
+
+    private static void addDCLatency(long sentAt, long now) throws IOException
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+        {
+            out.writeInt((int) sentAt);
+        }
+        DataInputStreamPlus in = new DataInputStreamPlus(new 
ByteArrayInputStream(baos.toByteArray()));
+        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
+    }
+
+    public static class MockBackPressureStrategy implements 
BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
+    {
+        public static volatile boolean applied = false;
+
+        public MockBackPressureStrategy(Map<String, Object> args)
+        {
+        }
+
+        @Override
+        public void apply(Set<MockBackPressureState> states, long timeout, 
TimeUnit unit)
+        {
+            if (!Iterables.isEmpty(states))
+                applied = true;
+        }
+
+        @Override
+        public MockBackPressureState newState(InetAddress host)
+        {
+            return new MockBackPressureState(host);
+        }
+
+        public static class MockBackPressureState implements BackPressureState
+        {
+            private final InetAddress host;
+            public volatile boolean onSend = false;
+            public volatile boolean onReceive = false;
+            public volatile boolean onTimeout = false;
+
+            private MockBackPressureState(InetAddress host)
+            {
+                this.host = host;
+            }
+
+            @Override
+            public void onMessageSent(MessageOut<?> message)
+            {
+                onSend = true;
+            }
+
+            @Override
+            public void onResponseReceived()
+            {
+                onReceive = true;
+            }
+
+            @Override
+            public void onResponseTimeout()
+            {
+                onTimeout = true;
+            }
+
+            @Override
+            public double getBackPressureRateLimit()
+            {
+                throw new UnsupportedOperationException("Not supported yet.");
+            }
+
+            @Override
+            public InetAddress getHost()
+            {
+                return host;
+            }
+        }
+    }
+
+    private static class BackPressureCallback implements IAsyncCallback
+    {
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+
+        @Override
+        public void response(MessageIn msg)
+        {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+    }
+
+    private static class NoBackPressureCallback implements IAsyncCallback
+    {
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+
+        @Override
+        public void response(MessageIn msg)
+        {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+    }
 }

Reply via email to