Updated Branches: refs/heads/cassandra-1.2 87d936251 -> d224c2beb
Improve asynchronous hint delivery patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-5179 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d224c2be Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d224c2be Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d224c2be Branch: refs/heads/cassandra-1.2 Commit: d224c2beb6cee67a4ab390026697a2fd7e8c7fd5 Parents: 87d9362 Author: Aleksey Yeschenko <[email protected]> Authored: Wed Mar 20 20:02:21 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Mar 20 20:02:21 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/HintedHandOffManager.java | 178 ++++++++------- 2 files changed, 98 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d224c2be/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 174cca4..3eb85d6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Fix mixing prepared statements between keyspaces (CASSANDRA-5352) * Fix consistency level during bootstrap - strike 3 (CASSANDRA-5354) * Fix transposed arguments in AlreadyExistsException (CASSANDRA-5362) + * Improve asynchronous hint delivery (CASSANDRA-5179) 1.2.3 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d224c2be/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 58e6ef0..53411f5 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -31,6 +31,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,13 +48,13 @@ import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.metrics.HintedHandoffMetrics; -import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; @@ -61,7 +62,6 @@ import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.WrappedRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; /** @@ -107,7 +107,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "internal"); + new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), + "internal"); + + private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); public void start() { @@ -133,7 +136,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException + private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) { RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes); rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), timestamp); @@ -149,8 +152,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } catch (UnknownHostException e) { - logger.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:"); - e.printStackTrace(); + logger.warn("Unable to find {}, not a hostname or ipaddr of a node", ipOrHostname); throw new RuntimeException(e); } } @@ -171,13 +173,13 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { try { - logger.info("Deleting any stored hints for " + endpoint); + logger.info("Deleting any stored hints for {}", endpoint); rm.apply(); compact(); } catch (Exception e) { - logger.warn("Could not delete hints for " + endpoint + ": " + e); + logger.warn("Could not delete hints for {}: {}", endpoint, e); } } }; @@ -187,7 +189,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean @VisibleForTesting protected Future<?> compact() throws ExecutionException, InterruptedException { - final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); hintStore.forceBlockingFlush(); ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>(); for (SSTable sstable : hintStore.getSSTables()) @@ -245,21 +246,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean return waited; } - private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException - { - try - { - deliverHintsToEndpointInternal(endpoint); - } - finally - { - queuedDeliveries.remove(endpoint); - } - } - - private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException + private void deliverHintsToEndpoint(InetAddress endpoint) { - ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); if (hintStore.isEmpty()) return; // nothing to do, don't confuse users by logging a no-op handoff @@ -286,12 +274,18 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean return; } - // 1. Get the key of the endpoint we need to handoff - // 2. For each column, deserialize the mutation and send it to the endpoint - // 3. Delete the subcolumn if the write was successful - // 4. Force a flush - // 5. Do major compaction to clean up all deletes etc. + doDeliverHintsToEndpoint(endpoint); + } + /* + * 1. Get the key of the endpoint we need to handoff + * 2. For each column, deserialize the mutation and send it to the endpoint + * 3. Delete the subcolumn if the write was successful + * 4. Force a flush + * 5. Do major compaction to clean up all deletes etc. + */ + private void doDeliverHintsToEndpoint(InetAddress endpoint) + { // find the hints for the node using its token. UUID hostId = Gossiper.instance.getHostId(endpoint); logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint); @@ -301,50 +295,47 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean final AtomicInteger rowsReplayed = new AtomicInteger(0); ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; - int pageSize = PAGE_SIZE; - // read less columns (mutations) per page if they are very large - if (hintStore.getMeanColumns() > 0) - { - int averageColumnSize = (int) (hintStore.getMeanRowSize() / hintStore.getMeanColumns()); - pageSize = Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize); - pageSize = Math.max(2, pageSize); // page size of 1 does not allow actual paging b/c of >= behavior on startColumn - logger.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize, pageSize); - } + int pageSize = calculatePageSize(); + logger.debug("Using pageSize of {}", pageSize); // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); + delivery: while (true) { - // check if hints delivery has been paused during the process - if (hintedHandOffPaused) - { - logger.debug("Hints delivery process is paused, aborting"); - break; - } + QueryFilter filter = QueryFilter.getSliceFilter(epkey, + new QueryPath(SystemTable.HINTS_CF), + startColumn, + ByteBufferUtil.EMPTY_BYTE_BUFFER, + false, + pageSize); + + ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), + (int) (System.currentTimeMillis() / 1000)); - QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize); - ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000)); if (pagingFinished(hintsPage, startColumn)) - { - if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn)) - { - // we've started from the beginning and could not find anything (only maybe tombstones) - break; - } - else - { - // restart query from the first column until we read an empty row; - // that will tell us everything was delivered successfully with no timeouts - startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; - continue; - } + break; + // check if node is still alive and we should continue delivery process + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed); + return; } + List<WriteResponseHandler> responseHandlers = Lists.newArrayList(); + for (final IColumn hint : hintsPage.getSortedColumns()) { + // check if hints delivery has been paused during the process + if (hintedHandOffPaused) + { + logger.debug("Hints delivery process is paused, aborting"); + break delivery; + } + // Skip tombstones: // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond // in which the local deletion timestamp was generated on the last column in the old page, in which @@ -353,11 +344,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (!hint.isLive()) continue; - if (hintedHandOffPaused) - { - logger.debug("Hints delivery process is paused, aborting"); - break; - } startColumn = hint.name(); ByteBuffer[] components = comparator.split(hint.name()); @@ -374,29 +360,42 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); continue; } + catch (IOException e) + { + throw new AssertionError(e); + } MessageOut<RowMutation> message = rm.createMessage(); rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); - WrappedRunnable callback = new WrappedRunnable() + Runnable callback = new Runnable() { - public void runMayThrow() throws IOException + public void run() { rowsReplayed.incrementAndGet(); deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); } }; - IAsyncCallback responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback); + WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback); MessagingService.instance().sendRR(message, endpoint, responseHandler); + responseHandlers.add(responseHandler); } - // check if node is still alive and we should continue delivery process - if (!FailureDetector.instance.isAlive(endpoint)) + for (WriteResponseHandler handler : responseHandlers) { - logger.debug("Endpoint {} died during hint delivery, aborting", endpoint); - return; + try + { + handler.get(); + } + catch (WriteTimeoutException e) + { + logger.info("Timed out replaying hints to {}; aborting ({} delivered)", endpoint, rowsReplayed); + return; + } } } + logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint); + try { compact().get(); @@ -405,11 +404,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { throw new RuntimeException(e); } + } - logger.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint)); - if (hintedHandOffPaused) + private int calculatePageSize() + { + // read less columns (mutations) per page if they are very large + if (hintStore.getMeanColumns() > 0) { - logger.info("Hints delivery process is paused, not delivering further hints"); + int averageColumnSize = (int) (hintStore.getMeanRowSize() / hintStore.getMeanColumns()); + // page size of 1 does not allow actual paging b/c of >= behavior on startColumn + return Math.max(2, Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize)); + } + else + { + return PAGE_SIZE; } } @@ -422,7 +430,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (logger.isDebugEnabled()) logger.debug("Started scheduleAllDeliveries"); - ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF); IPartitioner p = StorageService.getPartitioner(); RowPosition minPos = p.getMinimumToken().minKeyBound(); Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p); @@ -449,17 +456,25 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public void scheduleHintDelivery(final InetAddress to) { // We should not deliver hints to the same host in 2 different threads - if (queuedDeliveries.contains(to) || !queuedDeliveries.add(to)) + if (!queuedDeliveries.add(to)) return; + logger.debug("Scheduling delivery of Hints to {}", to); - Runnable r = new WrappedRunnable() + + executor.execute(new Runnable() { - public void runMayThrow() throws Exception + public void run() { - deliverHintsToEndpoint(to); + try + { + deliverHintsToEndpoint(to); + } + finally + { + queuedDeliveries.remove(to); + } } - }; - executor.execute(r); + }); } public void scheduleHintDelivery(String to) throws UnknownHostException @@ -467,7 +482,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean scheduleHintDelivery(InetAddress.getByName(to)); } - public void pauseHintsDelivery(boolean b) { + public void pauseHintsDelivery(boolean b) + { hintedHandOffPaused = b; }
