Updated Branches: refs/heads/cassandra-1.0 eca0c4856 -> 0d0939582
prevent multiple concurrent HH to the same target patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d093958 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d093958 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d093958 Branch: refs/heads/cassandra-1.0 Commit: 0d0939582ceda20de2f7765e3496bda3d5318520 Parents: eca0c48 Author: Jonathan Ellis <[email protected]> Authored: Thu Jan 5 16:39:57 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jan 5 17:05:53 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/db/HintedHandOffManager.java | 86 ++++++++++----- 1 files changed, 57 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d093958/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 6661ee3..0b92821 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -198,16 +198,24 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null); } - private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException + private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException { Gossiper gossiper = Gossiper.instance; int waited = 0; // first, wait for schema to be gossiped. - while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) { - Thread.sleep(1000); + while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } waited += 1000; if (waited > 2 * StorageService.RING_DELAY) - throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); + throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); } waited = 0; // then wait for the correct schema version. @@ -217,44 +225,65 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals( gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value)) { - Thread.sleep(1000); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } waited += 1000; if (waited > 2 * StorageService.RING_DELAY) - throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); + throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); } logger_.debug("schema for {} matches local schema", endpoint); return waited; } - private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException + private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException { - ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF); try { - if (hintStore.isEmpty()) - return; // nothing to do, don't confuse users by logging a no-op handoff - - logger_.debug("Checking remote({}) schema before delivering hints", endpoint); - int waited = waitForSchemaAgreement(endpoint); - // sleep a random amount to stagger handoff delivery from different replicas. - // (if we had to wait, then gossiper randomness took care of that for us already.) - if (waited == 0) { - // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375 - int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30; - logger_.debug("Sleeping {}ms to stagger hint delivery", sleep); - Thread.sleep(sleep); - } - - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger_.info("Endpoint {} died before hint delivery, aborting", endpoint); - return; - } + deliverHintsToEndpointInternal(endpoint); } finally { queuedDeliveries.remove(endpoint); } + } + + private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException + { + ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF); + if (hintStore.isEmpty()) + return; // nothing to do, don't confuse users by logging a no-op handoff + + logger_.debug("Checking remote({}) schema before delivering hints", endpoint); + int waited; + try + { + waited = waitForSchemaAgreement(endpoint); + } + catch (TimeoutException e) + { + return; + } + // sleep a random amount to stagger handoff delivery from different replicas. + // (if we had to wait, then gossiper randomness took care of that for us already.) + if (waited == 0) + { + // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375 + int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30; + logger_.debug("Sleeping {}ms to stagger hint delivery", sleep); + Thread.sleep(sleep); + } + + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger_.info("Endpoint {} died before hint delivery, aborting", endpoint); + return; + } // 1. Get the key of the endpoint we need to handoff // 2. For each column, deserialize the mutation and send it to the endpoint @@ -341,8 +370,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } - logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", - rowsReplayed, endpoint)); + logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint)); } /**
