Updated Branches: refs/heads/trunk 93685a478 -> 203b3ad04
Replace Throttle with Guava's RateLimiter for HintedHandOff patch by Aleksey Yeschenko; reviewed by Pavel Yaskevich for CASSANDRA-4541 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/203b3ad0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/203b3ad0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/203b3ad0 Branch: refs/heads/trunk Commit: 203b3ad04b63345373f9ad5473b2b30b8b467b03 Parents: 93685a4 Author: Pavel Yaskevich <xe...@apache.org> Authored: Tue Sep 11 19:27:56 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Tue Sep 11 19:27:56 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/HintedHandOffManager.java | 27 +++----------- 2 files changed, 7 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/203b3ad0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 58b3272..1af1a4a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -58,6 +58,7 @@ * Make CQL3 the default for CQL (CASSANDRA-4640) * update stress tool to be able to use CQL3 (CASSANDRA-4406) * Accept all thrift update on CQL3 cf but don't expose their metadata (CASSANDRA-4377) + * Replace Throttle with Guava's RateLimiter for HintedHandOff (CASSANDRA-4541) 1.1.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/203b3ad0/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 22052ad..064033b 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -29,6 +29,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.collect.ImmutableSortedSet; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,13 +57,10 @@ import org.apache.cassandra.service.*; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; - - /** * The hint schema looks like this: * @@ -258,21 +256,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException { - long hintSizes = 0; - Throttle hintThrottle = new Throttle("HintThrottle", new Throttle.ThroughputFunction() - { - public int targetThroughput() - { - if (DatabaseDescriptor.getHintedHandoffThrottleInKB() < 1) - // throttling disabled - return 0; - // total throughput - int totalBytesPerMS = (DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024) / 8 / 1000; - // per hint throughput (target bytes per MS) - return totalBytesPerMS / Math.max(1, executor.getActiveCount()); - } - }); - ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); if (hintStore.isEmpty()) return; // nothing to do, don't confuse users by logging a no-op handoff @@ -318,6 +301,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean logger.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize, pageSize); } + // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). + int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB(); + RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + delivery: while (true) { @@ -357,10 +344,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (rm != null) { MessageOut<RowMutation> message = rm.createMessage(); + rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); sendMutation(endpoint, message); - // throttle for the messages sent. - hintSizes += message.serializedSize(MessagingService.current_version); - hintThrottle.throttle(hintSizes); rowsReplayed++; } deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());