Make hint delivery async so MV hint is deferred on failure to acquire lock Patch by Paulo Motta; Reviewed by Benjamin Roth for CASSANDRA-12905
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48abc036 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48abc036 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48abc036 Branch: refs/heads/trunk Commit: 48abc0369799acca0521150e2c88d4032e01c3b5 Parents: 3faa0d9 Author: Paulo Motta <[email protected]> Authored: Thu Dec 15 12:49:38 2016 -0200 Committer: Paulo Motta <[email protected]> Committed: Thu Dec 15 16:46:05 2016 -0200 ---------------------------------------------------------------------- src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++++++++++------ .../apache/cassandra/hints/HintVerbHandler.java | 4 +- 2 files changed, 30 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/Hint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java index cbb5e74..17fbf5d 100644 --- a/src/java/org/apache/cassandra/hints/Hint.java +++ b/src/java/org/apache/cassandra/hints/Hint.java @@ -19,8 +19,12 @@ package org.apache.cassandra.hints; import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.base.Throwables; + import org.apache.cassandra.db.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -68,7 +72,7 @@ public final class Hint return new Hint(mutation, creationTime, mutation.smallestGCGS()); } - /** + /* * @param mutation the hinted mutation * @param creationTime time of this hint's creation (in milliseconds since epoch) * @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds) @@ -81,19 +85,33 @@ public final class Hint /** * Applies the contained mutation unless it's expired, filtering out any updates for truncated tables */ - void apply() + CompletableFuture<?> applyFuture() { - if (!isLive()) - return; + if (isLive()) + { + // filter out partition update for table that have been truncated since hint's creation + Mutation filtered = mutation; + for (UUID id : mutation.getColumnFamilyIds()) + if (creationTime <= SystemKeyspace.getTruncatedAt(id)) + filtered = filtered.without(id); - // filter out partition update for table that have been truncated since hint's creation - Mutation filtered = mutation; - for (UUID id : mutation.getColumnFamilyIds()) - if (creationTime <= SystemKeyspace.getTruncatedAt(id)) - filtered = filtered.without(id); + if (!filtered.isEmpty()) + return filtered.applyFuture(); + } + + return CompletableFuture.completedFuture(null); + } - if (!filtered.isEmpty()) - filtered.apply(); + void apply() + { + try + { + applyFuture().get(); + } + catch (Exception e) + { + throw Throwables.propagate(e.getCause()); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/HintVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java index d8838a9..abcd1f9 100644 --- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java +++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java @@ -88,10 +88,8 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage> else { // the common path - the node is both the destination and a valid replica for the hint. - hint.apply(); + hint.applyFuture().thenAccept(o -> reply(id, message.from)).exceptionally(e -> {logger.debug("Failed to apply hint", e); return null;}); } - - reply(id, message.from); } private static void reply(int id, InetAddress to)
