Author: xedin
Date: Thu Sep 1 13:28:38 2011
New Revision: 1164075
URL: http://svn.apache.org/viewvc?rev=1164075&view=rev
Log:
Add throttling for internode streaming
patch by Stu Hood; reviewed by Pavel Yaskevich for CASSANDRA-3080
Added:
cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 1 13:28:38 2011
@@ -54,6 +54,7 @@
* generate hints for replicas that timeout, not just replicas that are known
to be down before starting (CASSANDRA-2034)
* Make the compression algorithm and chunk length configurable
(CASSANDRA-3001)
+ * Add throttling for internode streaming (CASSANDRA-3080)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Sep 1 13:28:38 2011
@@ -292,6 +292,13 @@ compaction_throughput_mb_per_sec: 16
# key caches.
compaction_preheat_key_cache: true
+# Throttles all outbound streaming file transfers on this node to the
+# given total throughput in Mbps. This is necessary because Cassandra does
+# mostly sequential IO when streaming data during bootstrap or repair, which
+# can lead to saturating the network connection and degrading rpc performance.
+# When unset, the default is 400 Mbps or 50 MB/s.
+# stream_throughput_outbound_megabits_per_sec: 400
+
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Thu Sep 1
13:28:38 2011
@@ -83,6 +83,8 @@ public class Config
public Integer in_memory_compaction_limit_in_mb = 256;
public Integer concurrent_compactors =
Runtime.getRuntime().availableProcessors();
public Integer compaction_throughput_mb_per_sec = 16;
+
+ public Integer stream_throughput_outbound_megabits_per_sec;
public String[] data_file_directories;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Sep 1 13:28:38 2011
@@ -357,6 +357,9 @@ public class DatabaseDescriptor
if (conf.compaction_throughput_mb_per_sec == null)
conf.compaction_throughput_mb_per_sec = 16;
+ if (conf.stream_throughput_outbound_megabits_per_sec == null)
+ conf.stream_throughput_outbound_megabits_per_sec = 400;
+
if
(!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
throw new ConfigurationException("Unknown rpc_server_type: " +
conf.rpc_server_type);
@@ -676,6 +679,16 @@ public class DatabaseDescriptor
conf.compaction_throughput_mb_per_sec = value;
}
+ public static int getStreamThroughputOutboundMegabitsPerSec()
+ {
+ return conf.stream_throughput_outbound_megabits_per_sec;
+ }
+
+ public static void setStreamThroughputOutboundMegabitsPerSec(int value)
+ {
+ conf.stream_throughput_outbound_megabits_per_sec = value;
+ }
+
public static String[] getAllDataFileLocations()
{
return conf.data_file_directories;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionController.java
Thu Sep 1 13:28:38 2011
@@ -46,7 +46,6 @@ public class CompactionController
public final boolean isMajor;
public final int gcBefore;
- private int throttleResolution;
public CompactionController(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
@@ -56,19 +55,6 @@ public class CompactionController
this.gcBefore = gcBefore;
this.forceDeserialize = forceDeserialize;
isMajor = cfs.isCompleteSSTables(this.sstables);
- // how many rows we expect to compact in 100ms
- long rowSize = cfs.getMeanRowSize();
- int rowsPerSecond = rowSize > 0
- ? (int)
(DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / rowSize)
- : 1000;
- throttleResolution = rowsPerSecond / 10;
- if (throttleResolution <= 0)
- throttleResolution = 1;
- }
-
- public int getThrottleResolution()
- {
- return throttleResolution;
}
public String getKeyspace()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
Thu Sep 1 13:28:38 2011
@@ -37,9 +37,9 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Throttle;
-public class CompactionIterable
-implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable implements Iterable<AbstractCompactedRow>,
CompactionInfo.Holder
{
private static Logger logger =
LoggerFactory.getLogger(CompactionIterable.class);
@@ -49,19 +49,12 @@ implements Iterable<AbstractCompactedRow
protected final CompactionType type;
private final List<SSTableScanner> scanners;
protected final CompactionController controller;
+ private final Throttle throttle;
private long totalBytes;
private long bytesRead;
private long row;
- // the bytes that had been compacted the last time we delayed to throttle,
- // and the time in milliseconds when we last throttled
- private long bytesAtLastDelay;
- private long timeAtLastDelay;
-
- // current target bytes to compact per millisecond
- private int targetBytesPerMS = -1;
-
public CompactionIterable(CompactionType type, Iterable<SSTableReader>
sstables, CompactionController controller) throws IOException
{
this(type, getScanners(sstables), controller);
@@ -76,6 +69,20 @@ implements Iterable<AbstractCompactedRow
totalBytes = bytesRead = 0;
for (SSTableScanner scanner : scanners)
totalBytes += scanner.getFileLength();
+ this.throttle = new Throttle(toString(), new
Throttle.ThroughputFunction()
+ {
+ /** @return Instantaneous throughput target in bytes per
millisecond. */
+ public int targetThroughput()
+ {
+ if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1
|| StorageService.instance.isBootstrapMode())
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS =
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+ // per stream throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1,
CompactionManager.instance.getActiveCompactions());
+ }
+ });
}
protected static List<SSTableScanner> getScanners(Iterable<SSTableReader>
sstables) throws IOException
@@ -101,47 +108,6 @@ implements Iterable<AbstractCompactedRow
return MergeIterator.get(scanners, ICOMP, new Reducer());
}
- private void throttle()
- {
- if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 ||
StorageService.instance.isBootstrapMode())
- // throttling disabled
- return;
- int totalBytesPerMS =
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
-
- // bytes compacted and time passed since last delay
- long bytesSinceLast = bytesRead - bytesAtLastDelay;
- long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
-
- // determine the current target
- int newTarget = totalBytesPerMS /
- Math.max(1, CompactionManager.instance.getActiveCompactions());
- if (newTarget != targetBytesPerMS)
- logger.debug("{} now compacting at {} bytes/ms.", this, newTarget);
- targetBytesPerMS = newTarget;
-
- // the excess bytes that were compacted in this period
- long excessBytes = bytesSinceLast - msSinceLast * targetBytesPerMS;
-
- // the time to delay to recap the deficit
- long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
- if (timeToDelay > 0)
- {
- if (logger.isTraceEnabled())
- logger.trace(String.format("Compacted %d bytes in %d ms:
throttling for %d ms",
- bytesSinceLast, msSinceLast,
timeToDelay));
- try
- {
- Thread.sleep(timeToDelay);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- bytesAtLastDelay = bytesRead;
- timeAtLastDelay = System.currentTimeMillis();
- }
-
public String toString()
{
return this.getCompactionInfo().toString();
@@ -186,7 +152,7 @@ implements Iterable<AbstractCompactedRow
{
bytesRead += scanner.getFilePointer();
}
- throttle();
+ throttle.throttle(bytesRead);
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu
Sep 1 13:28:38 2011
@@ -79,7 +79,7 @@ public final class MessagingService impl
private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
/* Thread pool to handle messaging write activities */
- private final ExecutorService streamExecutor_;
+ private final DebuggableThreadPoolExecutor streamExecutor_;
private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>
connectionManagers_ = new NonBlockingHashMap<InetAddress,
OutboundTcpConnectionPool>();
@@ -454,6 +454,12 @@ public final class MessagingService impl
streamExecutor_.execute(new FileStreamTask(header, to,
DatabaseDescriptor.getEncryptionOptions()));
}
+ /** The count of active outbound stream tasks. */
+ public int getActiveStreamsOutbound()
+ {
+ return streamExecutor_.getActiveCount();
+ }
+
public void register(ILatencySubscriber subcriber)
{
subscribers.add(subcriber);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1164075&r1=1164074&r2=1164075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
Thu Sep 1 13:28:38 2011
@@ -37,6 +37,7 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
@@ -62,12 +63,28 @@ public class FileStreamTask extends Wrap
private final EncryptionOptions encryptionOptions;
// allocate buffer to use for transfers only once
private final byte[] transferBuffer = new byte[CHUNK_SIZE];
+ // outbound global throughput limiter
+ private final Throttle throttle;
public FileStreamTask(StreamHeader header, InetAddress to,
EncryptionOptions encryptionOptions)
{
this.header = header;
this.to = to;
this.encryptionOptions = encryptionOptions;
+ this.throttle = new Throttle(toString(), new
Throttle.ThroughputFunction()
+ {
+ /** @return Instantaneous throughput target in bytes per
millisecond. */
+ public int targetThroughput()
+ {
+ if
(DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1)
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS =
DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 /
8 / 1000;
+ // per stream throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1,
MessagingService.instance().getActiveStreamsOutbound());
+ }
+ });
}
public void runMayThrow() throws IOException
@@ -163,8 +180,8 @@ public class FileStreamTask extends Wrap
int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
reader.readFully(transferBuffer, 0, toTransfer);
-
output.write(transferBuffer, 0, toTransfer);
+ throttle.throttleDelta(toTransfer);
return toTransfer;
}
@@ -224,4 +241,9 @@ public class FileStreamTask extends Wrap
{
socket.close();
}
+
+ public String toString()
+ {
+ return String.format("FileStreamTask(session=%s, to=%s)",
header.sessionId, to);
+ }
}
Added: cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java?rev=1164075&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/Throttle.java Thu Sep
1 13:28:38 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the timing/state required to throttle a caller to a target
throughput in
+ * bytes per millisecond, when periodically passed an absolute count of bytes.
+ */
+public class Throttle
+{
+ private static Logger logger = LoggerFactory.getLogger(Throttle.class);
+
+ private final String name;
+ private final ThroughputFunction fun;
+
+ // the bytes that had been handled the last time we delayed to throttle,
+ // and the time in milliseconds when we last throttled
+ private long bytesAtLastDelay;
+ private long timeAtLastDelay;
+
+ // current target bytes of throughput per millisecond
+ private int targetBytesPerMS = -1;
+
+ public Throttle(String name, ThroughputFunction fun)
+ {
+ this.name = name;
+ this.fun = fun;
+ }
+
+ /** @param currentBytes Bytes of throughput since the beginning of the
task. */
+ public void throttle(long currentBytes)
+ {
+ throttleDelta(currentBytes - bytesAtLastDelay);
+ }
+
+ /** @param bytesDelta Bytes of throughput since the last call to
throttle*(). */
+ public void throttleDelta(long bytesDelta)
+ {
+ int newTargetBytesPerMS = fun.targetThroughput();
+ if (newTargetBytesPerMS < 1)
+ // throttling disabled
+ return;
+
+ // if the target changed, log
+ if (newTargetBytesPerMS != targetBytesPerMS)
+ logger.debug("{} target throughput now {} bytes/ms.", this,
newTargetBytesPerMS);
+ targetBytesPerMS = newTargetBytesPerMS;
+
+ // time passed since last delay
+ long msSinceLast = System.currentTimeMillis() - timeAtLastDelay;
+ // the excess bytes in this period
+ long excessBytes = bytesDelta - msSinceLast * targetBytesPerMS;
+
+ // the time to delay to recap the deficit
+ long timeToDelay = excessBytes / Math.max(1, targetBytesPerMS);
+ if (timeToDelay > 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace(String.format("%s actual throughput was %d bytes
in %d ms: throttling for %d ms",
+ this, bytesDelta, msSinceLast,
timeToDelay));
+ try
+ {
+ Thread.sleep(timeToDelay);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ bytesAtLastDelay += bytesDelta;
+ timeAtLastDelay = System.currentTimeMillis();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Throttle(for=" + name + ")";
+ }
+
+ public interface ThroughputFunction
+ {
+ /**
+ * @return The instantaneous target throughput in bytes per
millisecond. Targets less
+ * than or equal to zero will disable throttling.
+ */
+ public int targetThroughput();
+ }
+}