fix hinting for dropped local writes patch by jbellis and aleksey for CASSANDRA-4753
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de2495c2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de2495c2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de2495c2 Branch: refs/heads/cassandra-1.2 Commit: de2495c2c9bf8d8387d241df2962ae52f092b145 Parents: bebaf45 Author: Jonathan Ellis <[email protected]> Authored: Wed Dec 19 14:55:53 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Wed Dec 19 14:56:16 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/net/MessagingService.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 187 ++++++++++----- 3 files changed, 125 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 260aae5..d8caa14 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.1 + * fix hinting for dropped local writes (CASSANDRA-4753) * off-heap cache doesn't need mutable column container (CASSANDRA-5057) * apply disk_failure_policy to bad disks on initial directory creation (CASSANDRA-4847) http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 5329e1a..98495be 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -328,7 +328,7 @@ public final class MessagingService implements MessagingServiceMBean { assert expiredCallbackInfo.sentMessage != null; RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload; - return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null); + return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null); } return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1884132..fe427af 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -63,7 +63,6 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.WrappedRunnable; public class StorageProxy implements StorageProxyMBean { @@ -314,16 +313,7 @@ public class StorageProxy implements StorageProxyMBean Table.SYSTEM_KS, null, WriteType.BATCH_LOG); - - try - { - sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler); - } - catch (IOException e) - { - throw new RuntimeException("Error writing to batchlog", e); - } - + updateBatchlog(rm, endpoints, handler); handler.get(); } @@ -332,14 +322,19 @@ public class StorageProxy implements StorageProxyMBean RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid)); rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros()); AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE); + updateBatchlog(rm, endpoints, handler); + } - try + private static void updateBatchlog(RowMutation rm, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler) + { + if (endpoints.contains(FBUtilities.getBroadcastAddress())) { - sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler); + assert endpoints.size() == 1; + insertLocal(rm, handler); } - catch (IOException e) + else { - throw new RuntimeException("Error deleting batch " + uuid, e); + sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler); } } @@ -350,15 +345,8 @@ public class StorageProxy implements StorageProxyMBean { for (WriteResponseHandlerWrapper wrapper : wrappers) { - try - { - Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel); - } - catch (IOException e) - { - throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e); - } + Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel); } for (WriteResponseHandlerWrapper wrapper : wrappers) @@ -382,11 +370,11 @@ public class StorageProxy implements StorageProxyMBean * successful. */ public static AbstractWriteResponseHandler performWrite(IMutation mutation, - ConsistencyLevel consistency_level, - String localDataCenter, - WritePerformer performer, - Runnable callback, - WriteType writeType) + ConsistencyLevel consistency_level, + String localDataCenter, + WritePerformer performer, + Runnable callback, + WriteType writeType) throws UnavailableException, OverloadedException, IOException { String table = mutation.getTable(); @@ -490,7 +478,7 @@ public class StorageProxy implements StorageProxyMBean AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, OverloadedException + throws OverloadedException { // Multimap that holds onto all the messages and addresses meant for a specific datacenter Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>(); @@ -537,45 +525,41 @@ public class StorageProxy implements StorageProxyMBean continue; // Schedule a local hint - scheduleLocalHint(rm, destination, responseHandler, consistency_level); + submitHint(rm, destination, responseHandler, consistency_level); } } sendMessages(localDataCenter, dcMessages, responseHandler); } - public static Future<Void> scheduleLocalHint(final RowMutation mutation, - final InetAddress target, - final AbstractWriteResponseHandler responseHandler, - final ConsistencyLevel consistencyLevel) + public static Future<Void> submitHint(final RowMutation mutation, + final InetAddress target, + final AbstractWriteResponseHandler responseHandler, + final ConsistencyLevel consistencyLevel) { - // Hint of itself doesn't make sense. + // local write that time out should be handled by LocalMutationRunnable assert !target.equals(FBUtilities.getBroadcastAddress()) : target; - totalHintsInProgress.incrementAndGet(); - final AtomicInteger targetHints = hintsInProgress.get(target); - targetHints.incrementAndGet(); - Runnable runnable = new WrappedRunnable() + HintRunnable runnable = new HintRunnable(target) { public void runMayThrow() throws IOException { logger.debug("Adding hint for {}", target); - try - { - writeHintForMutation(mutation, target); - // Notify the handler only for CL == ANY - if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY) - responseHandler.response(null); - } - finally - { - totalHintsInProgress.decrementAndGet(); - targetHints.decrementAndGet(); - } + writeHintForMutation(mutation, target); + // Notify the handler only for CL == ANY + if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY) + responseHandler.response(null); } }; + return submitHint(runnable); + } + + private static Future<Void> submitHint(HintRunnable runnable) + { + totalHintsInProgress.incrementAndGet(); + hintsInProgress.get(runnable.target).incrementAndGet(); return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); } @@ -598,7 +582,6 @@ public class StorageProxy implements StorageProxyMBean * for each datacenter, send a message to one node to relay the write to other replicas */ private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler) - throws IOException { for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet()) { @@ -615,7 +598,19 @@ public class StorageProxy implements StorageProxyMBean } } - private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException + private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) + { + try + { + sendMessagesToOneDCInternal(message, targets, localDC, handler); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException { Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); @@ -769,7 +764,7 @@ public class StorageProxy implements StorageProxyMBean final String localDataCenter, final ConsistencyLevel consistency_level) { - return new DroppableRunnable(MessagingService.Verb.MUTATION) + return new LocalMutationRunnable() { public void runMayThrow() throws IOException { @@ -935,7 +930,6 @@ public class StorageProxy implements StorageProxyMBean ReadCommand command = commands.get(i); try { - long startTime2 = System.currentTimeMillis(); Row row = handler.get(); if (row != null) { @@ -1181,14 +1175,6 @@ public class StorageProxy implements StorageProxyMBean return trim(command, rows); } - private static IDiskAtomFilter getEmptySlicePredicate() - { - return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, - ByteBufferUtil.EMPTY_BYTE_BUFFER, - false, - -1); - } - private static List<Row> trim(RangeSliceCommand command, List<Row> rows) { // When countCQL3Rows, we let the caller trim the result. @@ -1483,6 +1469,9 @@ public class StorageProxy implements StorageProxyMBean public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException; } + /** + * A Runnable that aborts if it doesn't start running before it times out + */ private static abstract class DroppableRunnable implements Runnable { private final long constructionTime = System.currentTimeMillis(); @@ -1514,6 +1503,76 @@ public class StorageProxy implements StorageProxyMBean abstract protected void runMayThrow() throws Exception; } + /** + * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after + * marking itself as a hint in progress so that the hint backpressure mechanism can function. + */ + private static abstract class LocalMutationRunnable implements Runnable + { + private final long constructionTime = System.currentTimeMillis(); + + public final void run() + { + if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION)) + { + MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION); + HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress()) + { + protected void runMayThrow() throws Exception + { + LocalMutationRunnable.this.runMayThrow(); + } + }; + submitHint(runnable); + return; + } + + try + { + runMayThrow(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + abstract protected void runMayThrow() throws Exception; + } + + /** + * HintRunnable will decrease totalHintsInProgress and targetHints when finished. + * It is the caller's responsibility to increment them initially. + */ + private abstract static class HintRunnable implements Runnable + { + public final InetAddress target; + + protected HintRunnable(InetAddress target) + { + this.target = target; + } + + public void run() + { + try + { + runMayThrow(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + totalHintsInProgress.decrementAndGet(); + hintsInProgress.get(target).decrementAndGet(); + } + } + + abstract protected void runMayThrow() throws Exception; + } + public long getTotalHints() { return totalHints.get();
