multithreaded hint replay
patch by vijay; reviewed by jbellis for CASSANDRA-4189


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b993eecf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b993eecf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b993eecf

Branch: refs/heads/trunk
Commit: b993eecfb033d01345718d396a2c32c5120b5e0e
Parents: 8516bcb
Author: Jonathan Ellis <[email protected]>
Authored: Thu Jun 14 16:19:34 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Jun 14 16:19:34 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 NEWS.txt                                           |    2 +
 conf/cassandra.yaml                                |    8 ++-
 src/java/org/apache/cassandra/config/Config.java   |    3 +-
 .../cassandra/config/DatabaseDescriptor.java       |    9 ++-
 .../apache/cassandra/db/HintedHandOffManager.java  |   49 ++++++++++-----
 src/java/org/apache/cassandra/net/MessageOut.java  |   21 ++++++
 7 files changed, 72 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e47132..25993ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * multithreaded hint replay (CASSANDRA-4189)
  * add inter-node message compression (CASSANDRA-3127)
  * enforce 1m min keycache for auto (CASSANDRA-4306)
  * remove COPP (CASSANDRA-2479)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 92723fd..6cad6c2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -33,6 +33,8 @@ Upgrading
     - The somewhat ill-concieved CollatingOrderPreservingPartitioner
       has been removed.  Use RandomPartitioner (recommended) or
       ByteOrderedPartitioner instead.
+    - Global option hinted_handoff_throttle_delay_in_ms has been removed.
+      hinted_handoff_throttle_in_kb has been added instead.
 
 
 1.1.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 6969095..cddd491 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -26,8 +26,12 @@ hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, hints will be dropped.
 max_hint_window_in_ms: 3600000 # one hour
-# Sleep this long after delivering each hint
-hinted_handoff_throttle_delay_in_ms: 1
+# throttle in KB's per second, per delivery thread
+hinted_handoff_throttle_in_kb: 1024
+# Number of threads with which to deliver hints;
+# Consider increasing this number when you have multi-dc deployments, since
+# cross-dc handoff tends to be slower
+max_hints_delivery_threads: 2
 
 # The following setting populates the page cache on memtable flush and 
compaction
 # WARNING: Enable this setting only when the whole node's data fits in memory.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 8a24400..e1c9a42 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -119,7 +119,8 @@ public class Config
     public Double flush_largest_memtables_at = 1.0;
     public Double reduce_cache_sizes_at = 1.0;
     public double reduce_cache_capacity_to = 0.6;
-    public int hinted_handoff_throttle_delay_in_ms = 0;
+    public int hinted_handoff_throttle_in_kb = 1024;
+    public int max_hints_delivery_threads = 1;
     public boolean compaction_preheat_key_cache = true;
 
     public boolean incremental_backups = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 7d71113..9a073a9 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -954,9 +954,14 @@ public class DatabaseDescriptor
         return conf.reduce_cache_capacity_to;
     }
 
-    public static int getHintedHandoffThrottleDelay()
+    public static int getHintedHandoffThrottleInKB()
     {
-        return conf.hinted_handoff_throttle_delay_in_ms;
+        return conf.hinted_handoff_throttle_in_kb;
+    }
+
+    public static int getMaxHintsThread()
+    {
+        return conf.max_hints_delivery_threads;
     }
 
     public static boolean getPreheatKeyCache()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3aed559..8a3b7de 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.*;
@@ -44,11 +45,13 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -94,7 +97,11 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new 
NonBlockingHashSet<InetAddress>();
 
-    private final ExecutorService executor = new 
JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY);
+    private final ThreadPoolExecutor executor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
+                                                                               
  Integer.MAX_VALUE,
+                                                                               
  TimeUnit.SECONDS,
+                                                                               
  new LinkedBlockingQueue<Runnable>(),
+                                                                               
  new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), 
"HintedHandoff");
 
     public void start()
     {
@@ -119,20 +126,11 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
         StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, 
TimeUnit.MINUTES);
     }
 
-    private static void sendMutation(InetAddress endpoint, RowMutation 
mutation) throws TimeoutException
+    private static void sendMutation(InetAddress endpoint, MessageOut<?> 
message) throws TimeoutException
     {
         IWriteResponseHandler responseHandler = 
WriteResponseHandler.create(endpoint);
-        MessagingService.instance().sendRR(mutation.createMessage(), endpoint, 
responseHandler);
+        MessagingService.instance().sendRR(message, endpoint, responseHandler);
         responseHandler.get();
-
-        try
-        {
-            Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
     }
 
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, 
long timestamp) throws IOException
@@ -260,6 +258,21 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
 
     private void deliverHintsToEndpointInternal(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
InterruptedException
     {
+        long hintSizes = 0;
+        Throttle hintThrottle = new Throttle("HintThrottle", new 
Throttle.ThroughputFunction()
+        {
+            public int targetThroughput()
+            {
+                if (DatabaseDescriptor.getHintedHandoffThrottleInKB() < 1)
+                    // throttling disabled
+                    return 0;
+                // total throughput
+                int totalBytesPerMS = 
(DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024) / 8 / 1000;
+                // per hint throughput (target bytes per MS)
+                return totalBytesPerMS / Math.max(1, 
executor.getActiveCount());
+            }
+        });
+
         ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         if (hintStore.isEmpty())
             return; // nothing to do, don't confuse users by logging a no-op 
handoff
@@ -360,7 +373,11 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                 {
                     if (rm != null)
                     {
-                        sendMutation(endpoint, rm);
+                        MessageOut<RowMutation> message = rm.createMessage();
+                        sendMutation(endpoint, message);
+                        // throttle for the messages sent.
+                        hintSizes += 
message.serializedSize(MessagingService.current_version);
+                        hintThrottle.throttle(hintSizes);
                         rowsReplayed++;
                     }
                     deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
@@ -423,10 +440,10 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
     */
     public void scheduleHintDelivery(final InetAddress to)
     {
-        logger.debug("deliverHints to {}", to);
-        if (!queuedDeliveries.add(to))
+        // We should not deliver hints to the same host in 2 different threads
+        if (queuedDeliveries.contains(to) || !queuedDeliveries.add(to))
             return;
-
+        logger.debug("Scheduling delivery of Hints to {}", to);
         Runnable r = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b993eecf/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index 1d5c34c..fe09c2e 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -112,4 +113,24 @@ public class MessageOut<T>
         if (payload != null)
             serializer.serialize(payload, out, version);
     }
+
+    public int serializedSize(int version)
+    {
+        int size = CompactEndpointSerializationHelper.serializedSize(from);
+
+        size += TypeSizes.NATIVE.sizeof(verb.ordinal());
+        size += TypeSizes.NATIVE.sizeof(parameters.size());
+        for (Map.Entry<String, byte[]> entry : parameters.entrySet())
+        {
+            TypeSizes.NATIVE.sizeof(entry.getKey());
+            TypeSizes.NATIVE.sizeof(entry.getValue().length);
+            size += entry.getValue().length;
+        }
+
+        long longSize = payload == null ? 0 : 
serializer.serializedSize(payload, version);
+        assert longSize <= Integer.MAX_VALUE; // larger values are supported 
in sstables but not messages
+        size += TypeSizes.NATIVE.sizeof((int) longSize);
+        size += longSize;
+        return size;
+    }
 }

Reply via email to