Backport CASSANDRA-6747 patch by yukim; reviewed by krummas for CASSANDRA-7560
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e1adb49 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e1adb49 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e1adb49 Branch: refs/heads/cassandra-2.0 Commit: 7e1adb4976470b48a361ec6dcca7cbbcdb86d85f Parents: b44bbb8 Author: Yuki Morishita <[email protected]> Authored: Tue Jul 29 16:02:38 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Tue Jul 29 16:02:38 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/net/CallbackInfo.java | 15 ++++++++- .../net/IAsyncCallbackWithFailure.java | 28 ++++++++++++++++ .../cassandra/net/MessageDeliveryTask.java | 16 ++++++++- .../org/apache/cassandra/net/MessageIn.java | 10 ++++++ .../apache/cassandra/net/MessagingService.java | 34 +++++++++++++++----- .../cassandra/net/ResponseVerbHandler.java | 12 +++++-- .../apache/cassandra/repair/SnapshotTask.java | 21 +++++++----- 8 files changed, 117 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 26d94f7..7f7d2bc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596) * Add option to disable STCS in L0 (CASSANDRA-6621) * Fix error when doing reversed queries with static columns (CASSANDRA-7490) + * Backport CASSANDRA-6747 (CASSANDRA-7560) Merged from 1.2: * Set correct stream ID on responses when non-Exception Throwables are thrown while handling native protocol messages (CASSANDRA-7470) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java index 3e584b4..b61210c 100644 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@ -31,6 +31,12 @@ public class CallbackInfo protected final InetAddress target; protected final IAsyncCallback callback; protected final IVersionedSerializer<?> serializer; + private final boolean failureCallback; + + public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer) + { + this(target, callback, serializer, false); + } /** * Create CallbackInfo without sent message @@ -39,11 +45,12 @@ public class CallbackInfo * @param callback * @param serializer serializer to deserialize response message */ - public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer) + public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback) { this.target = target; this.callback = callback; this.serializer = serializer; + this.failureCallback = failureCallback; } public boolean shouldHint() @@ -51,12 +58,18 @@ public class CallbackInfo return false; } + public boolean isFailureCallback() + { + return failureCallback; + } + public String toString() { return "CallbackInfo(" + "target=" + target + ", callback=" + callback + ", serializer=" + serializer + + ", failureCallback=" + failureCallback + ')'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java new file mode 100644 index 0000000..1f95579 --- /dev/null +++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T> +{ + /** + * Called when there is an exception on the remote node or timeout happens + */ + public void onFailure(InetAddress from); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index e49b93c..982f17e 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable return; } - verbHandler.doVerb(message, id); + try + { + verbHandler.doVerb(message, id); + } + catch (Throwable t) + { + if (message.doCallbackOnFailure()) + { + MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) + .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + MessagingService.instance().sendReply(response, id, message.from); + } + + throw t; + } if (GOSSIP_VERBS.contains(message.verb)) Gossiper.instance.setLastProcessedMessageAt(constructionTime); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index e0efefe..10260c2 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -105,6 +105,16 @@ public class MessageIn<T> return MessagingService.verbStages.get(verb); } + public boolean doCallbackOnFailure() + { + return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM); + } + + public boolean isFailureResponse() + { + return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM); + } + public long getTimeout() { return DatabaseDescriptor.getTimeout(verb); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 4a5df29..0bb1b17 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean public boolean allNodesAtLeast20 = true; + public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; + public static final byte[] ONE_BYTE = new byte[1]; + public static final String FAILURE_RESPONSE_PARAM = "FAIL"; + /** * we preface every message with this number so the recipient can validate the sender is sane */ @@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.MIGRATION_REQUEST, Stage.MIGRATION); put(Verb.INDEX_SCAN, Stage.READ); put(Verb.REPLICATION_FINISHED, Stage.MISC); - put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE); put(Verb.COUNTER_MUTATION, Stage.MUTATION); put(Verb.SNAPSHOT, Stage.MISC); put(Verb.ECHO, Stage.GOSSIP); @@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean { public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair) { - CallbackInfo expiredCallbackInfo = pair.right.value; + final CallbackInfo expiredCallbackInfo = pair.right.value; maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout); ConnectionMetrics.totalTimeouts.mark(); getConnectionPool(expiredCallbackInfo.target).incrementTimeout(); + if (expiredCallbackInfo.isFailureCallback()) + { + StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() { + @Override + public void run() { + ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target); + } + }); + } if (expiredCallbackInfo.shouldHint()) { @@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean return verbHandlers.get(type); } - public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout) + public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback) { assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel int messageId = nextId(); - CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout); assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous); return messageId; } @@ -576,7 +588,12 @@ public final class MessagingService implements MessagingServiceMBean public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb) { - return sendRR(message, to, cb, message.getTimeout()); + return sendRR(message, to, cb, message.getTimeout(), false); + } + + public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb) + { + return sendRR(message, to, cb, message.getTimeout(), true); } /** @@ -588,12 +605,13 @@ public final class MessagingService implements MessagingServiceMBean * @param cb callback interface which is used to pass the responses or * suggest that a timeout occurred to the invoker of the send(). * @param timeout the timeout used for expiration + * @param failureCallback true if given cb has failure callback * @return an reference to message id used to match with the result */ - public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout) + public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback) { - int id = addCallback(cb, message, to, timeout); - sendOneWay(message, id, to); + int id = addCallback(cb, message, to, timeout, failureCallback); + sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to); return id; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 132e574..1d9aa98 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler Tracing.trace("Processing response from {}", message.from); IAsyncCallback cb = callbackInfo.callback; - MessagingService.instance().maybeAddLatency(cb, message.from, latency); - cb.response(message); + if (message.isFailureResponse()) + { + ((IAsyncCallbackWithFailure) cb).onFailure(message.from); + } + else + { + //TODO: Should we add latency only in success cases? + MessagingService.instance().maybeAddLatency(cb, message.from, latency); + cb.response(message); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/repair/SnapshotTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java index 1a9d324..09e8104 100644 --- a/src/java/org/apache/cassandra/repair/SnapshotTask.java +++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java @@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture; import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.db.SnapshotCommand; -import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl public void run() { - MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, - desc.columnFamily, - desc.sessionId.toString(), - false).createMessage(), - endpoint, - new SnapshotCallback(this)); + MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace, + desc.columnFamily, + desc.sessionId.toString(), + false).createMessage(), + endpoint, + new SnapshotCallback(this)); } /** * Callback for snapshot request. Run on INTERNAL_RESPONSE stage. */ - static class SnapshotCallback implements IAsyncCallback + static class SnapshotCallback implements IAsyncCallbackWithFailure { final SnapshotTask task; @@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl } public boolean isLatencyForSnitch() { return false; } + + public void onFailure(InetAddress from) + { + task.setException(new RuntimeException("Could not create snapshot at " + from)); + } } }
