Author: xedin
Date: Thu Sep  1 13:28:38 2011
New Revision: 1164075

URL: http://svn.apache.org/viewvc?rev=1164075&view=rev
Log:
Add throttling for internode streaming
patch by Stu Hood; reviewed by Pavel Yaskevich for CASSANDRA-3080

Added:
    cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep  1 13:28:38 2011
@@ -54,6 +54,7 @@
  * generate hints for replicas that timeout, not just replicas that are known
    to be down before starting (CASSANDRA-2034)
  * Make the compression algorithm and chunk length configurable 
(CASSANDRA-3001)
+ * Add throttling for internode streaming (CASSANDRA-3080)
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Sep  1 13:28:38 2011
@@ -292,6 +292,13 @@ compaction_throughput_mb_per_sec: 16
 # key caches.
 compaction_preheat_key_cache: true
 
+# Throttles all outbound streaming file transfers on this node to the
+# given total throughput in Mbps. This is necessary because Cassandra does
+# mostly sequential IO when streaming data during bootstrap or repair, which
+# can lead to saturating the network connection and degrading rpc performance.
+# When unset, the default is 400 Mbps or 50 MB/s.
+# stream_throughput_outbound_megabits_per_sec: 400
+
 # Time to wait for a reply from other nodes before failing the command 
 rpc_timeout_in_ms: 10000
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Thu Sep  1 
13:28:38 2011
@@ -83,6 +83,8 @@ public class Config
     public Integer in_memory_compaction_limit_in_mb = 256;
     public Integer concurrent_compactors = 
Runtime.getRuntime().availableProcessors();
     public Integer compaction_throughput_mb_per_sec = 16;
+
+    public Integer stream_throughput_outbound_megabits_per_sec;
     
     public String[] data_file_directories;
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Thu Sep  1 13:28:38 2011
@@ -357,6 +357,9 @@ public class DatabaseDescriptor
             if (conf.compaction_throughput_mb_per_sec == null)
                 conf.compaction_throughput_mb_per_sec = 16;
 
+            if (conf.stream_throughput_outbound_megabits_per_sec == null)
+                conf.stream_throughput_outbound_megabits_per_sec = 400;
+
             if 
(!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
                 throw new ConfigurationException("Unknown rpc_server_type: " + 
conf.rpc_server_type);
             
@@ -676,6 +679,16 @@ public class DatabaseDescriptor
         conf.compaction_throughput_mb_per_sec = value;
     }
 
+    public static int getStreamThroughputOutboundMegabitsPerSec()
+    {
+        return conf.stream_throughput_outbound_megabits_per_sec;
+    }
+
+    public static void setStreamThroughputOutboundMegabitsPerSec(int value)
+    {
+        conf.stream_throughput_outbound_megabits_per_sec = value;
+    }
+
     public static String[] getAllDataFileLocations()
     {
         return conf.data_file_directories;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
 Thu Sep  1 13:28:38 2011
@@ -46,7 +46,6 @@ public class CompactionController
 
     public final boolean isMajor;
     public final int gcBefore;
-    private int throttleResolution;
 
     public CompactionController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
     {
@@ -56,19 +55,6 @@ public class CompactionController
         this.gcBefore = gcBefore;
         this.forceDeserialize = forceDeserialize;
         isMajor = cfs.isCompleteSSTables(this.sstables);
-        // how many rows we expect to compact in 100ms
-        long rowSize = cfs.getMeanRowSize();
-        int rowsPerSecond = rowSize > 0
-                          ? (int) 
(DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / rowSize)
-                          : 1000;
-        throttleResolution = rowsPerSecond / 10;
-        if (throttleResolution <= 0)
-            throttleResolution = 1;
-    }
-
-    public int getThrottleResolution()
-    {
-        return throttleResolution;
     }
 
     public String getKeyspace()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
 Thu Sep  1 13:28:38 2011
@@ -37,9 +37,9 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Throttle;
 
-public class CompactionIterable
-implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable implements Iterable<AbstractCompactedRow>, 
CompactionInfo.Holder
 {
     private static Logger logger = 
LoggerFactory.getLogger(CompactionIterable.class);
 
@@ -49,19 +49,12 @@ implements Iterable<AbstractCompactedRow
     protected final CompactionType type;
     private final List<SSTableScanner> scanners;
     protected final CompactionController controller;
+    private final Throttle throttle;
 
     private long totalBytes;
     private long bytesRead;
     private long row;
 
-    // the bytes that had been compacted the last time we delayed to throttle,
-    // and the time in milliseconds when we last throttled
-    private long bytesAtLastDelay;
-    private long timeAtLastDelay;
-
-    // current target bytes to compact per millisecond
-    private int targetBytesPerMS = -1;
-
     public CompactionIterable(CompactionType type, Iterable<SSTableReader> 
sstables, CompactionController controller) throws IOException
     {
         this(type, getScanners(sstables), controller);
@@ -76,6 +69,20 @@ implements Iterable<AbstractCompactedRow
         totalBytes = bytesRead = 0;
         for (SSTableScanner scanner : scanners)
             totalBytes += scanner.getFileLength();
+        this.throttle = new Throttle(toString(), new 
Throttle.ThroughputFunction()
+        {
+            /** @return Instantaneous throughput target in bytes per 
millisecond. */
+            public int targetThroughput()
+            {
+                if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 
|| StorageService.instance.isBootstrapMode())
+                    // throttling disabled
+                    return 0;
+                // total throughput
+                int totalBytesPerMS = 
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+                // per stream throughput (target bytes per MS)
+                return totalBytesPerMS / Math.max(1, 
CompactionManager.instance.getActiveCompactions());
+            }
+        });
     }
 
     protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> 
sstables) throws IOException
@@ -101,47 +108,6 @@ implements Iterable<AbstractCompactedRow
         return MergeIterator.get(scanners, ICOMP, new Reducer());
     }
 
-    private void throttle()
-    {
-        if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || 
StorageService.instance.isBootstrapMode())
-            // throttling disabled
-            return;
-        int totalBytesPerMS = 
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
-
-        // bytes compacted and time passed since last delay
-        long bytesSinceLast = bytesRead - bytesAtLastDelay;
-        long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
-
-        // determine the current target
-        int newTarget = totalBytesPerMS /
-            Math.max(1, CompactionManager.instance.getActiveCompactions());
-        if (newTarget != targetBytesPerMS)
-            logger.debug("{} now compacting at {} bytes/ms.", this, newTarget);
-        targetBytesPerMS = newTarget;
-
-        // the excess bytes that were compacted in this period
-        long excessBytes = bytesSinceLast - msSinceLast * targetBytesPerMS;
-
-        // the time to delay to recap the deficit
-        long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
-        if (timeToDelay > 0)
-        {
-            if (logger.isTraceEnabled())
-                logger.trace(String.format("Compacted %d bytes in %d ms: 
throttling for %d ms",
-                                           bytesSinceLast, msSinceLast, 
timeToDelay));
-            try
-            {
-                Thread.sleep(timeToDelay);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-        }
-        bytesAtLastDelay = bytesRead;
-        timeAtLastDelay = System.currentTimeMillis();
-    }
-
     public String toString()
     {
         return this.getCompactionInfo().toString();
@@ -186,7 +152,7 @@ implements Iterable<AbstractCompactedRow
                     {
                         bytesRead += scanner.getFilePointer();
                     }
-                    throttle();
+                    throttle.throttle(bytesRead);
                 }
             }
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu 
Sep  1 13:28:38 2011
@@ -79,7 +79,7 @@ public final class MessagingService impl
     private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging write activities */
-    private final ExecutorService streamExecutor_;
+    private final DebuggableThreadPoolExecutor streamExecutor_;
 
     private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> 
connectionManagers_ = new NonBlockingHashMap<InetAddress, 
OutboundTcpConnectionPool>();
 
@@ -454,6 +454,12 @@ public final class MessagingService impl
         streamExecutor_.execute(new FileStreamTask(header, to, 
DatabaseDescriptor.getEncryptionOptions()));
     }
 
+    /** The count of active outbound stream tasks. */
+    public int getActiveStreamsOutbound()
+    {
+        return streamExecutor_.getActiveCount();
+    }
+
     public void register(ILatencySubscriber subcriber)
     {
         subscribers.add(subcriber);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java 
Thu Sep  1 13:28:38 2011
@@ -37,6 +37,7 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import org.slf4j.Logger;
@@ -62,12 +63,28 @@ public class FileStreamTask extends Wrap
     private final EncryptionOptions encryptionOptions;
     // allocate buffer to use for transfers only once
     private final byte[] transferBuffer = new byte[CHUNK_SIZE];
+    // outbound global throughput limiter
+    private final Throttle throttle;
 
     public FileStreamTask(StreamHeader header, InetAddress to, 
EncryptionOptions encryptionOptions)
     {
         this.header = header;
         this.to = to;
         this.encryptionOptions = encryptionOptions;
+        this.throttle = new Throttle(toString(), new 
Throttle.ThroughputFunction()
+        {
+            /** @return Instantaneous throughput target in bytes per 
millisecond. */
+            public int targetThroughput()
+            {
+                if 
(DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1)
+                    // throttling disabled
+                    return 0;
+                // total throughput
+                int totalBytesPerMS = 
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 
8 / 1000;
+                // per stream throughput (target bytes per MS)
+                return totalBytesPerMS / Math.max(1, 
MessagingService.instance().getActiveStreamsOutbound());
+            }
+        });
     }
     
     public void runMayThrow() throws IOException
@@ -163,8 +180,8 @@ public class FileStreamTask extends Wrap
         int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
 
         reader.readFully(transferBuffer, 0, toTransfer);
-
         output.write(transferBuffer, 0, toTransfer);
+        throttle.throttleDelta(toTransfer);
 
         return toTransfer;
     }
@@ -224,4 +241,9 @@ public class FileStreamTask extends Wrap
     {
         socket.close();
     }
+
+    public String toString()
+    {
+        return String.format("FileStreamTask(session=%s, to=%s)", 
header.sessionId, to);
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java?rev=1164075&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java Thu Sep  
1 13:28:38 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the timing/state required to throttle a caller to a target 
throughput in
+ * bytes per millisecond, when periodically passed an absolute count of bytes.
+ */
+public class Throttle
+{
+    private static Logger logger = LoggerFactory.getLogger(Throttle.class);
+
+    private final String name;
+    private final ThroughputFunction fun;
+
+    // the bytes that had been handled the last time we delayed to throttle,
+    // and the time in milliseconds when we last throttled
+    private long bytesAtLastDelay;
+    private long timeAtLastDelay;
+
+    // current target bytes of throughput per millisecond
+    private int targetBytesPerMS = -1;
+
+    public Throttle(String name, ThroughputFunction fun)
+    {
+        this.name = name;
+        this.fun = fun;
+    }
+
+    /** @param currentBytes Bytes of throughput since the beginning of the 
task. */
+    public void throttle(long currentBytes)
+    {
+        throttleDelta(currentBytes - bytesAtLastDelay);
+    }
+
+    /** @param bytesDelta Bytes of throughput since the last call to 
throttle*(). */
+    public void throttleDelta(long bytesDelta)
+    {
+        int newTargetBytesPerMS = fun.targetThroughput();
+        if (newTargetBytesPerMS < 1)
+            // throttling disabled
+            return;
+
+        // if the target changed, log
+        if (newTargetBytesPerMS != targetBytesPerMS)
+            logger.debug("{} target throughput now {} bytes/ms.", this, 
newTargetBytesPerMS);
+        targetBytesPerMS = newTargetBytesPerMS;
+
+        // time passed since last delay
+        long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
+        // the excess bytes in this period
+        long excessBytes = bytesDelta - msSinceLast * targetBytesPerMS;
+
+        // the time to delay to recap the deficit
+        long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
+        if (timeToDelay > 0)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace(String.format("%s actual throughput was %d bytes 
in %d ms: throttling for %d ms",
+                                           this, bytesDelta, msSinceLast, 
timeToDelay));
+            try
+            {
+                Thread.sleep(timeToDelay);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
+        bytesAtLastDelay += bytesDelta;
+        timeAtLastDelay = System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Throttle(for=" + name + ")";
+    }
+    
+    public interface ThroughputFunction
+    {
+        /**
+         * @return The instantaneous target throughput in bytes per 
millisecond. Targets less
+         * than or equal to zero will disable throttling.
+         */
+        public int targetThroughput();
+    }
+}


Reply via email to