multithreaded hint replay patch by vijay; reviewed by jbellis for CASSANDRA-4189
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b993eecf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b993eecf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b993eecf Branch: refs/heads/trunk Commit: b993eecfb033d01345718d396a2c32c5120b5e0e Parents: 8516bcb Author: Jonathan Ellis <[email protected]> Authored: Thu Jun 14 16:19:34 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jun 14 16:19:34 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 2 + conf/cassandra.yaml | 8 ++- src/java/org/apache/cassandra/config/Config.java | 3 +- .../cassandra/config/DatabaseDescriptor.java | 9 ++- .../apache/cassandra/db/HintedHandOffManager.java | 49 ++++++++++----- src/java/org/apache/cassandra/net/MessageOut.java | 21 ++++++ 7 files changed, 72 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e47132..25993ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * multithreaded hint replay (CASSANDRA-4189) * add inter-node message compression (CASSANDRA-3127) * enforce 1m min keycache for auto (CASSANDRA-4306) * remove COPP (CASSANDRA-2479) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 92723fd..6cad6c2 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -33,6 +33,8 @@ Upgrading - The somewhat ill-concieved CollatingOrderPreservingPartitioner has been removed. Use RandomPartitioner (recommended) or ByteOrderedPartitioner instead. + - Global option hinted_handoff_throttle_delay_in_ms has been removed. + hinted_handoff_throttle_in_kb has been added instead. 1.1.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 6969095..cddd491 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -26,8 +26,12 @@ hinted_handoff_enabled: true # this defines the maximum amount of time a dead host will have hints # generated. After it has been dead this long, hints will be dropped. max_hint_window_in_ms: 3600000 # one hour -# Sleep this long after delivering each hint -hinted_handoff_throttle_delay_in_ms: 1 +# throttle in KB's per second, per delivery thread +hinted_handoff_throttle_in_kb: 1024 +# Number of threads with which to deliver hints; +# Consider increasing this number when you have multi-dc deployments, since +# cross-dc handoff tends to be slower +max_hints_delivery_threads: 2 # The following setting populates the page cache on memtable flush and compaction # WARNING: Enable this setting only when the whole node's data fits in memory. http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 8a24400..e1c9a42 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -119,7 +119,8 @@ public class Config public Double flush_largest_memtables_at = 1.0; public Double reduce_cache_sizes_at = 1.0; public double reduce_cache_capacity_to = 0.6; - public int hinted_handoff_throttle_delay_in_ms = 0; + public int hinted_handoff_throttle_in_kb = 1024; + public int max_hints_delivery_threads = 1; public boolean compaction_preheat_key_cache = true; public boolean incremental_backups = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7d71113..9a073a9 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -954,9 +954,14 @@ public class DatabaseDescriptor return conf.reduce_cache_capacity_to; } - public static int getHintedHandoffThrottleDelay() + public static int getHintedHandoffThrottleInKB() { - return conf.hinted_handoff_throttle_delay_in_ms; + return conf.hinted_handoff_throttle_in_kb; + } + + public static int getMaxHintsThread() + { + return conf.max_hints_delivery_threads; } public static boolean getPreheatKeyCache() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 3aed559..8a3b7de 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.*; @@ -44,11 +45,13 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.WrappedRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; @@ -94,7 +97,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>(); - private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY); + private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "HintedHandoff"); public void start() { @@ -119,20 +126,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void sendMutation(InetAddress endpoint, RowMutation mutation) throws TimeoutException + private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws TimeoutException { IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); - MessagingService.instance().sendRR(mutation.createMessage(), endpoint, responseHandler); + MessagingService.instance().sendRR(message, endpoint, responseHandler); responseHandler.get(); - - try - { - Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay()); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } } private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, long timestamp) throws IOException @@ -260,6 +258,21 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException { + long hintSizes = 0; + Throttle hintThrottle = new Throttle("HintThrottle", new Throttle.ThroughputFunction() + { + public int targetThroughput() + { + if (DatabaseDescriptor.getHintedHandoffThrottleInKB() < 1) + // throttling disabled + return 0; + // total throughput + int totalBytesPerMS = (DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024) / 8 / 1000; + // per hint throughput (target bytes per MS) + return totalBytesPerMS / Math.max(1, executor.getActiveCount()); + } + }); + ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF); if (hintStore.isEmpty()) return; // nothing to do, don't confuse users by logging a no-op handoff @@ -360,7 +373,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { if (rm != null) { - sendMutation(endpoint, rm); + MessageOut<RowMutation> message = rm.createMessage(); + sendMutation(endpoint, message); + // throttle for the messages sent. + hintSizes += message.serializedSize(MessagingService.current_version); + hintThrottle.throttle(hintSizes); rowsReplayed++; } deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); @@ -423,10 +440,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean */ public void scheduleHintDelivery(final InetAddress to) { - logger.debug("deliverHints to {}", to); - if (!queuedDeliveries.add(to)) + // We should not deliver hints to the same host in 2 different threads + if (queuedDeliveries.contains(to) || !queuedDeliveries.add(to)) return; - + logger.debug("Scheduling delivery of Hints to {}", to); Runnable r = new WrappedRunnable() { public void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 1d5c34c..fe09c2e 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -27,6 +27,7 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.utils.FBUtilities; @@ -112,4 +113,24 @@ public class MessageOut<T> if (payload != null) serializer.serialize(payload, out, version); } + + public int serializedSize(int version) + { + int size = CompactEndpointSerializationHelper.serializedSize(from); + + size += TypeSizes.NATIVE.sizeof(verb.ordinal()); + size += TypeSizes.NATIVE.sizeof(parameters.size()); + for (Map.Entry<String, byte[]> entry : parameters.entrySet()) + { + TypeSizes.NATIVE.sizeof(entry.getKey()); + TypeSizes.NATIVE.sizeof(entry.getValue().length); + size += entry.getValue().length; + } + + long longSize = payload == null ? 0 : serializer.serializedSize(payload, version); + assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages + size += TypeSizes.NATIVE.sizeof((int) longSize); + size += longSize; + return size; + } }
