Updated Branches:
  refs/heads/cassandra-1.0 eca0c4856 -> 0d0939582

prevent multiple concurrent HH to the same target
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d093958
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d093958
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d093958

Branch: refs/heads/cassandra-1.0
Commit: 0d0939582ceda20de2f7765e3496bda3d5318520
Parents: eca0c48
Author: Jonathan Ellis <[email protected]>
Authored: Thu Jan 5 16:39:57 2012 -0600
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Jan 5 17:05:53 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/db/HintedHandOffManager.java  |   86 ++++++++++-----
 1 files changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d093958/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6661ee3..0b92821 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -198,16 +198,24 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                || (hintColumnFamily.getSortedColumns().size() == 1 && 
hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    private int waitForSchemaAgreement(InetAddress endpoint) throws 
InterruptedException
+    private int waitForSchemaAgreement(InetAddress endpoint) throws 
TimeoutException
     {
         Gossiper gossiper = Gossiper.instance;
         int waited = 0;
         // first, wait for schema to be gossiped.
-        while 
(gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA)
 == null) {
-            Thread.sleep(1000);
+        while 
(gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA)
 == null)
+        {
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
             waited += 1000;
             if (waited > 2 * StorageService.RING_DELAY)
-                throw new RuntimeException("Didin't receive gossiped schema 
from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+                throw new TimeoutException("Didin't receive gossiped schema 
from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
         }
         waited = 0;
         // then wait for the correct schema version.
@@ -217,44 +225,65 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
         while 
(!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
                 
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {
-            Thread.sleep(1000);
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
             waited += 1000;
             if (waited > 2 * StorageService.RING_DELAY)
-                throw new RuntimeException("Could not reach schema agreement 
with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+                throw new TimeoutException("Could not reach schema agreement 
with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
         }
         logger_.debug("schema for {} matches local schema", endpoint);
         return waited;
     }
 
-    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
TimeoutException, InterruptedException
+    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
InterruptedException
     {
-        ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         try
         {
-            if (hintStore.isEmpty())
-                return; // nothing to do, don't confuse users by logging a 
no-op handoff
-
-            logger_.debug("Checking remote({}) schema before delivering 
hints", endpoint);
-            int waited = waitForSchemaAgreement(endpoint);
-            // sleep a random amount to stagger handoff delivery from 
different replicas.
-            // (if we had to wait, then gossiper randomness took care of that 
for us already.)
-            if (waited == 0) {
-                // use a 'rounded' sleep interval because of a strange bug 
with windows: CASSANDRA-3375
-                int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
-                logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
-                Thread.sleep(sleep);
-            }
-
-            if (!FailureDetector.instance.isAlive(endpoint))
-            {
-                logger_.info("Endpoint {} died before hint delivery, 
aborting", endpoint);
-                return;
-            }
+            deliverHintsToEndpointInternal(endpoint);
         }
         finally
         {
             queuedDeliveries.remove(endpoint);
         }
+    }
+
+    private void deliverHintsToEndpointInternal(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
InterruptedException
+    {
+        ColumnFamilyStore hintStore = 
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        if (hintStore.isEmpty())
+            return; // nothing to do, don't confuse users by logging a no-op 
handoff
+
+        logger_.debug("Checking remote({}) schema before delivering hints", 
endpoint);
+        int waited;
+        try
+        {
+            waited = waitForSchemaAgreement(endpoint);
+        }
+        catch (TimeoutException e)
+        {
+            return;
+        }
+        // sleep a random amount to stagger handoff delivery from different 
replicas.
+        // (if we had to wait, then gossiper randomness took care of that for 
us already.)
+        if (waited == 0)
+        {
+            // use a 'rounded' sleep interval because of a strange bug with 
windows: CASSANDRA-3375
+            int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
+            logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
+            Thread.sleep(sleep);
+        }
+
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger_.info("Endpoint {} died before hint delivery, aborting", 
endpoint);
+            return;
+        }
 
         // 1. Get the key of the endpoint we need to handoff
         // 2. For each column, deserialize the mutation and send it to the 
endpoint
@@ -341,8 +370,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
             }
         }
 
-        logger_.info(String.format("Finished hinted handoff of %s rows to 
endpoint %s",
-                                   rowsReplayed, endpoint));
+        logger_.info(String.format("Finished hinted handoff of %s rows to 
endpoint %s", rowsReplayed, endpoint));
     }
 
     /**

Reply via email to