Repository: cassandra Updated Branches: refs/heads/trunk fa1131679 -> 39df31a06
Add error code map to read/write failure responses Patch by Geoffrey Yu; reviewed by Tyler Hobbs for CASSANDRA-12311 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39df31a0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39df31a0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39df31a0 Branch: refs/heads/trunk Commit: 39df31a06a35b221f55f17ed20947a1a2e33ee1a Parents: fa11316 Author: Geoffrey Yu <[email protected]> Authored: Fri Aug 19 16:04:39 2016 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Fri Aug 19 16:04:39 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + doc/native_protocol_v5.spec | 27 ++++- doc/source/operating/error_codes.txt | 31 +++++ .../exceptions/ReadFailureException.java | 7 +- .../exceptions/RequestFailureException.java | 19 ++- .../exceptions/RequestFailureReason.java | 51 ++++++++ .../exceptions/WriteFailureException.java | 7 +- .../apache/cassandra/hints/HintsDispatcher.java | 3 +- .../net/IAsyncCallbackWithFailure.java | 4 +- .../cassandra/net/MessageDeliveryTask.java | 16 +++ .../org/apache/cassandra/net/MessageIn.java | 26 ++++ .../apache/cassandra/net/MessagingService.java | 4 +- .../cassandra/net/ResponseVerbHandler.java | 2 +- .../cassandra/repair/AnticompactionTask.java | 3 +- .../apache/cassandra/repair/SnapshotTask.java | 3 +- .../service/AbstractWriteResponseHandler.java | 10 +- .../cassandra/service/ActiveRepairService.java | 3 +- .../service/BatchlogResponseHandler.java | 6 +- .../apache/cassandra/service/ReadCallback.java | 11 +- .../apache/cassandra/service/StorageProxy.java | 19 +-- .../org/apache/cassandra/transport/CBUtil.java | 29 ++++- .../transport/messages/ErrorMessage.java | 43 ++++++- .../cassandra/transport/ErrorMessageTest.java | 121 +++++++++++++++++++ 23 files changed, 408 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e1e118..c123d17 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.10 + * Extend read/write failure messages with a map of replica addresses + to error codes in the v5 native protocol (CASSANDRA-12311) * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374) * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054) * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378) http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/doc/native_protocol_v5.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index edf3093..bbde714 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -204,6 +204,7 @@ Table of Contents [int] A 4 bytes integer [long] A 8 bytes integer + [byte] A 1 byte unsigned integer [short] A 2 bytes unsigned integer [string] A [short] n, followed by n bytes representing an UTF-8 string. @@ -229,6 +230,9 @@ Table of Contents [byte] representing the IP address (in practice n can only be either 4 (IPv4) or 16 (IPv6)), following by one [int] representing the port. + [inetaddr] An IP address (without a port) to a node. It consists of one + [byte] n, that represents the address size, followed by n + [byte] representing the IP address. [consistency] A consistency level specification. This is a [short] representing a consistency level with the following correspondance: @@ -1088,7 +1092,7 @@ Table of Contents responded. Otherwise, the value is != 0. 0x1300 Read_failure: A non-timeout exception during a read request. The rest of the ERROR message body will be - <cl><received><blockfor><numfailures><data_present> + <cl><received><blockfor><reasonmap><data_present> where: <cl> is the [consistency] level of the query having triggered the exception. @@ -1096,8 +1100,12 @@ Table of Contents answered the request. <blockfor> is an [int] representing the number of replicas whose acknowledgement is required to achieve <cl>. - <numfailures> is an [int] representing the number of nodes that - experience a failure while executing the request. + <reasonmap> is a map of endpoint to failure reason codes. This maps + the endpoints of the replica nodes that failed when + executing the request to a code representing the reason + for the failure. The map is encoded starting with an [int] n + followed by n pairs of <endpoint><failurecode> where + <endpoint> is an [inetaddr] and <failurecode> is a [short]. <data_present> is a single byte. If its value is 0, it means the replica that was asked for data had not responded. Otherwise, the value is != 0. @@ -1110,7 +1118,7 @@ Table of Contents <arg_types> [string list] one string for each argument type (as CQL type) of the failed function 0x1500 Write_failure: A non-timeout exception during a write request. The rest of the ERROR message body will be - <cl><received><blockfor><numfailures><write_type> + <cl><received><blockfor><reasonmap><write_type> where: <cl> is the [consistency] level of the query having triggered the exception. @@ -1118,8 +1126,12 @@ Table of Contents answered the request. <blockfor> is an [int] representing the number of replicas whose acknowledgement is required to achieve <cl>. - <numfailures> is an [int] representing the number of nodes that - experience a failure while executing the request. + <reasonmap> is a map of endpoint to failure reason codes. This maps + the endpoints of the replica nodes that failed when + executing the request to a code representing the reason + for the failure. The map is encoded starting with an [int] n + followed by n pairs of <endpoint><failurecode> where + <endpoint> is an [inetaddr] and <failurecode> is a [short]. <writeType> is a [string] that describes the type of the write that failed. The value of that string can be one of: @@ -1160,3 +1172,6 @@ Table of Contents 10. Changes from v4 * Beta protocol flag for v5 native protocol is added (Section 2.2) + * <numfailures> in Read_failure and Write_failure error message bodies (Section 9) + has been replaced with <reasonmap>. The <reasonmap> maps node IP addresses to + a failure reason code which indicates why the request failed on that node. http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/doc/source/operating/error_codes.txt ---------------------------------------------------------------------- diff --git a/doc/source/operating/error_codes.txt b/doc/source/operating/error_codes.txt new file mode 100644 index 0000000..279fe40 --- /dev/null +++ b/doc/source/operating/error_codes.txt @@ -0,0 +1,31 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 +.. +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +.. highlight:: none + +Error Codes +----------- + +In Cassandra 3.10 and higher, when the v5 native protocol (or a higher version) is used, +``ReadFailure`` and ``WriteFailure`` errors will contain a map of replica addresses +to error codes. Those error codes are explained here: + +``0x0000`` + The error does not have a specific code assigned yet, or the cause is unknown. + +``0x0001`` + The read operation scanned too many tombstones (as defined by ``tombstone_failure_threshold`` in ``cassandra.yaml``), + causing a TombstoneOverwhelmingException. http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/ReadFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java index 91cf580..82885e3 100644 --- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java @@ -17,15 +17,18 @@ */ package org.apache.cassandra.exceptions; +import java.net.InetAddress; +import java.util.Map; + import org.apache.cassandra.db.ConsistencyLevel; public class ReadFailureException extends RequestFailureException { public final boolean dataPresent; - public ReadFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, boolean dataPresent) + public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) { - super(ExceptionCode.READ_FAILURE, consistency, received, failures, blockFor); + super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, failureReasonByEndpoint); this.dataPresent = dataPresent; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/RequestFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java index 6b8b40f..1a5289c 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java @@ -17,21 +17,32 @@ */ package org.apache.cassandra.exceptions; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + import org.apache.cassandra.db.ConsistencyLevel; public class RequestFailureException extends RequestExecutionException { public final ConsistencyLevel consistency; public final int received; - public final int failures; public final int blockFor; + public final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; - protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor) + protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) { - super(code, String.format("Operation failed - received %d responses and %d failures", received, failures)); + super(code, String.format("Operation failed - received %d responses and %d failures", received, failureReasonByEndpoint.size())); this.consistency = consistency; this.received = received; - this.failures = failures; this.blockFor = blockFor; + + // It is possible for the passed in failureReasonByEndpoint map + // to have new entries added after this exception is constructed + // (e.g. a delayed failure response from a replica). So to be safe + // we make a copy of the map at this point to ensure it will not be + // modified any further. Otherwise, there could be implications when + // we encode this map for transport. + this.failureReasonByEndpoint = new HashMap<>(failureReasonByEndpoint); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java new file mode 100644 index 0000000..96ab7b5 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +public enum RequestFailureReason +{ + /** + * The reason for the failure was none of the below reasons or was not recorded by the data node. + */ + UNKNOWN (0x0000), + + /** + * The data node read too many tombstones when attempting to execute a read query (see tombstone_failure_threshold). + */ + READ_TOO_MANY_TOMBSTONES (0x0001); + + /** The code to be serialized as an unsigned 16 bit integer */ + public final int code; + public static final RequestFailureReason[] VALUES = values(); + + RequestFailureReason(final int code) + { + this.code = code; + } + + public static RequestFailureReason fromCode(final int code) + { + for (RequestFailureReason reasonCode : VALUES) + { + if (reasonCode.code == code) + return reasonCode; + } + throw new IllegalArgumentException("Unknown request failure reason error code: " + code); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/WriteFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java index 24de9b1..1a857fe 100644 --- a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.exceptions; +import java.net.InetAddress; +import java.util.Map; + import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -24,9 +27,9 @@ public class WriteFailureException extends RequestFailureException { public final WriteType writeType; - public WriteFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, WriteType writeType) + public WriteFailureException(ConsistencyLevel consistency, int received, int blockFor, WriteType writeType, Map<InetAddress, RequestFailureReason> failureReasonByEndpoint) { - super(ExceptionCode.WRITE_FAILURE, consistency, received, failures, blockFor); + super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, failureReasonByEndpoint); this.writeType = writeType; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 478a76b..29bab80 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -27,6 +27,7 @@ import java.util.function.Function; import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -192,7 +193,7 @@ final class HintsDispatcher implements AutoCloseable return timedOut ? Outcome.TIMEOUT : outcome; } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { outcome = Outcome.FAILURE; condition.signalAll(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java index 546a416..1cd27b6 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java @@ -19,11 +19,13 @@ package org.apache.cassandra.net; import java.net.InetAddress; +import org.apache.cassandra.exceptions.RequestFailureReason; + public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T> { /** * Called when there is an exception on the remote node or timeout happens */ - void onFailure(InetAddress from); + void onFailure(InetAddress from, RequestFailureReason failureReason); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index d9f8b7c..c97a98f 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -24,8 +24,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.io.util.DataOutputBuffer; public class MessageDeliveryTask implements Runnable { @@ -89,6 +91,20 @@ public class MessageDeliveryTask implements Runnable { MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + + if (t instanceof TombstoneOverwhelmingException) + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code); + response = response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData()); + } + catch (IOException ex) + { + throw new RuntimeException(ex); + } + } + MessagingService.instance().sendReply(response, id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index a122b61..0562df6 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -27,7 +27,9 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.monitoring.ConstructionTime; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; public class MessageIn<T> @@ -152,6 +154,30 @@ public class MessageIn<T> return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM); } + public boolean containsFailureReason() + { + return parameters.containsKey(MessagingService.FAILURE_REASON_PARAM); + } + + public RequestFailureReason getFailureReason() + { + if (containsFailureReason()) + { + try (DataInputBuffer in = new DataInputBuffer(parameters.get(MessagingService.FAILURE_REASON_PARAM))) + { + return RequestFailureReason.fromCode(in.readUnsignedShort()); + } + catch (IOException ex) + { + throw new RuntimeException(ex); + } + } + else + { + return RequestFailureReason.UNKNOWN; + } + } + public long getTimeout() { return verb.getTimeout(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 459c7e6..fea5a8f 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -54,6 +54,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.EchoMessage; import org.apache.cassandra.gms.GossipDigestAck; import org.apache.cassandra.gms.GossipDigestAck2; @@ -93,6 +94,7 @@ public final class MessagingService implements MessagingServiceMBean public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC"; public static final byte[] ONE_BYTE = new byte[1]; public static final String FAILURE_RESPONSE_PARAM = "FAIL"; + public static final String FAILURE_REASON_PARAM = "FAIL_REASON"; /** * we preface every message with this number so the recipient can validate the sender is sane @@ -510,7 +512,7 @@ public final class MessagingService implements MessagingServiceMBean StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() { @Override public void run() { - ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target); + ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target, RequestFailureReason.UNKNOWN); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 28ed365..89e1051 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -44,7 +44,7 @@ public class ResponseVerbHandler implements IVerbHandler IAsyncCallback cb = callbackInfo.callback; if (message.isFailureResponse()) { - ((IAsyncCallbackWithFailure) cb).onFailure(message.from); + ((IAsyncCallbackWithFailure) cb).onFailure(message.from, message.getFailureReason()); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java index 8ecae23..16a0a12 100644 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@ -99,7 +100,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R return false; } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { task.setException(new RuntimeException("Anticompaction failed or timed out in " + from)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/repair/SnapshotTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java index 94361d8..2b267a7 100644 --- a/src/java/org/apache/cassandra/repair/SnapshotTask.java +++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.AbstractFuture; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -73,7 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl public boolean isLatencyForSnitch() { return false; } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { //listener.failedSnapshot(); task.setException(new RuntimeException("Could not create snapshot at " + from)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index f412515..c82d58b 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -19,6 +19,8 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -49,6 +51,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; + private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; private final long queryStartNanoTime; /** @@ -69,6 +72,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW this.naturalEndpoints = naturalEndpoints; this.callback = callback; this.writeType = writeType; + this.failureReasonByEndpoint = new ConcurrentHashMap<>(); this.queryStartNanoTime = queryStartNanoTime; } @@ -104,7 +108,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW if (totalBlockFor() + failures > totalEndpoints()) { - throw new WriteFailureException(consistencyLevel, ackCount(), failures, totalBlockFor(), writeType); + throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint); } } @@ -155,7 +159,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW } @Override - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { logger.trace("Got failure from {}", from); @@ -163,6 +167,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW ? failuresUpdater.incrementAndGet(this) : failures; + failureReasonByEndpoint.put(from, failureReason); + if (totalBlockFor() + n > totalEndpoints()) signal(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 4699ae1..b69c24a 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; @@ -279,7 +280,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return false; } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { status.set(false); failedNodes.add(from.getHostAddress()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index 3b31794..aabf7dd 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -21,9 +21,11 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> { @@ -58,9 +60,9 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> return wrapped.isLatencyForSnitch(); } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { - wrapped.onFailure(from); + wrapped.onFailure(from, failureReason); } public void assureSufficientLiveNodes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 3f1ff3c..ad80913 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -20,6 +20,8 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -32,6 +34,7 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; @@ -63,6 +66,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures"); private volatile int failures = 0; + private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; private final Keyspace keyspace; // TODO push this into ConsistencyLevel? @@ -89,6 +93,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> this.resolver = resolver; this.queryStartNanoTime = queryStartNanoTime; this.endpoints = endpoints; + this.failureReasonByEndpoint = new ConcurrentHashMap<>(); // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size(); @@ -129,7 +134,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> // Same as for writes, see AbstractWriteResponseHandler throw failed - ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()) + ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint) : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); } @@ -252,12 +257,14 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> } @Override - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { int n = waitingFor(from) ? failuresUpdater.incrementAndGet(this) : failures; + failureReasonByEndpoint.put(from, failureReason); + if (blockfor + n > endpoints.size()) condition.signalAll(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index a0f39af..f180bdf 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -579,7 +579,7 @@ public class StorageProxy implements StorageProxyMBean { if (!(ex instanceof WriteTimeoutException)) logger.error("Failed to apply paxos commit locally : {}", ex); - responseHandler.onFailure(FBUtilities.getBroadcastAddress()); + responseHandler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); } } @@ -645,7 +645,7 @@ public class StorageProxy implements StorageProxyMBean writeMetricsMap.get(consistency_level).failures.mark(); WriteFailureException fe = (WriteFailureException)ex; Tracing.trace("Write failure; received {} of {} required replies, failed {} requests", - fe.received, fe.blockFor, fe.failures); + fe.received, fe.blockFor, fe.failureReasonByEndpoint.size()); } else { @@ -1033,7 +1033,7 @@ public class StorageProxy implements StorageProxyMBean } catch (OverloadedException | WriteTimeoutException e) { - wrapper.handler.onFailure(FBUtilities.getBroadcastAddress()); + wrapper.handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); } } } @@ -1387,7 +1387,7 @@ public class StorageProxy implements StorageProxyMBean { if (!(ex instanceof WriteTimeoutException)) logger.error("Failed to apply mutation locally : {}", ex); - handler.onFailure(FBUtilities.getBroadcastAddress()); + handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); } } @@ -1601,7 +1601,7 @@ public class StorageProxy implements StorageProxyMBean } catch (WriteFailureException e) { - throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); + throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint); } result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime); @@ -1858,18 +1858,23 @@ public class StorageProxy implements StorageProxyMBean else { MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime); - handler.onFailure(FBUtilities.getBroadcastAddress()); + handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); } MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } catch (Throwable t) { - handler.onFailure(FBUtilities.getBroadcastAddress()); if (t instanceof TombstoneOverwhelmingException) + { + handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); logger.error(t.getMessage()); + } else + { + handler.onFailure(FBUtilities.getBroadcastAddress(), RequestFailureReason.UNKNOWN); throw t; + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 570fd6b..25409ae 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -496,7 +496,7 @@ public abstract class CBUtil public static InetSocketAddress readInet(ByteBuf cb) { - int addrSize = cb.readByte(); + int addrSize = cb.readByte() & 0xFF; byte[] address = new byte[addrSize]; cb.readBytes(address); int port = cb.readInt(); @@ -525,6 +525,33 @@ public abstract class CBUtil return 1 + address.length + 4; } + public static InetAddress readInetAddr(ByteBuf cb) + { + int addressSize = cb.readByte() & 0xFF; + byte[] address = new byte[addressSize]; + cb.readBytes(address); + try + { + return InetAddress.getByAddress(address); + } + catch (UnknownHostException e) + { + throw new ProtocolException("Invalid IP address while deserializing inet address"); + } + } + + public static void writeInetAddr(InetAddress inetAddr, ByteBuf cb) + { + byte[] address = inetAddr.getAddress(); + cb.writeByte(address.length); + cb.writeBytes(address); + } + + public static int sizeOfInetAddr(InetAddress inetAddr) + { + return 1 + inetAddr.getAddress().length; + } + /* * Reads *all* readable bytes from {@code cb} and return them. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 8f45d4d..5ce248f 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -17,7 +17,10 @@ */ package org.apache.cassandra.transport.messages; +import java.net.InetAddress; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.CodecException; @@ -76,22 +79,35 @@ public class ErrorMessage extends Message.Response case TRUNCATE_ERROR: te = new TruncateException(msg); break; - case WRITE_FAILURE: + case WRITE_FAILURE: case READ_FAILURE: { ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); int received = body.readInt(); int blockFor = body.readInt(); + // The number of failures is also present in protocol v5, but used instead to specify the size of the failure map int failure = body.readInt(); + + Map<InetAddress, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>(); + if (version >= Server.VERSION_5) + { + for (int i = 0; i < failure; i++) + { + InetAddress endpoint = CBUtil.readInetAddr(body); + RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort()); + failureReasonByEndpoint.put(endpoint, failureReason); + } + } + if (code == ExceptionCode.WRITE_FAILURE) { WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); - te = new WriteFailureException(cl, received, failure, blockFor, writeType); + te = new WriteFailureException(cl, received, blockFor, writeType, failureReasonByEndpoint); } else { byte dataPresent = body.readByte(); - te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0); + te = new ReadFailureException(cl, received, blockFor, dataPresent != 0, failureReasonByEndpoint); } } break; @@ -171,7 +187,17 @@ public class ErrorMessage extends Message.Response CBUtil.writeConsistencyLevel(rfe.consistency, dest); dest.writeInt(rfe.received); dest.writeInt(rfe.blockFor); - dest.writeInt(rfe.failures); + // The number of failures is also present in protocol v5, but used instead to specify the size of the failure map + dest.writeInt(rfe.failureReasonByEndpoint.size()); + + if (version >= Server.VERSION_5) + { + for (Map.Entry<InetAddress, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) + { + CBUtil.writeInetAddr(entry.getKey(), dest); + dest.writeShort(entry.getValue().code); + } + } if (isWrite) CBUtil.writeString(((WriteFailureException)rfe).writeType.toString(), dest); @@ -228,6 +254,15 @@ public class ErrorMessage extends Message.Response boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE; size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4; size += isWrite ? CBUtil.sizeOfString(((WriteFailureException)rfe).writeType.toString()) : 1; + + if (version >= Server.VERSION_5) + { + for (Map.Entry<InetAddress, RequestFailureReason> entry : rfe.failureReasonByEndpoint.entrySet()) + { + size += CBUtil.sizeOfInetAddr(entry.getKey()); + size += 2; // RequestFailureReason code + } + } } break; case WRITE_TIMEOUT: http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java new file mode 100644 index 0000000..a145d18 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.transport.messages.ErrorMessage; + +import static org.junit.Assert.assertEquals; + +public class ErrorMessageTest +{ + private static Map<InetAddress, RequestFailureReason> failureReasonMap1; + private static Map<InetAddress, RequestFailureReason> failureReasonMap2; + + @BeforeClass + public static void setUpFixtures() throws UnknownHostException + { + failureReasonMap1 = new HashMap<>(); + failureReasonMap1.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + failureReasonMap1.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + failureReasonMap1.put(InetAddress.getByName("127.0.0.3"), RequestFailureReason.UNKNOWN); + + failureReasonMap2 = new HashMap<>(); + failureReasonMap2.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.UNKNOWN); + failureReasonMap2.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.UNKNOWN); + } + + @Test + public void testV5ReadFailureSerDeser() + { + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.ALL; + boolean dataPresent = false; + ReadFailureException rfe = new ReadFailureException(consistencyLevel, receivedBlockFor, receivedBlockFor, dataPresent, failureReasonMap1); + + ErrorMessage deserialized = serializeAndGetDeserializedErrorMessage(ErrorMessage.fromException(rfe), 5); + ReadFailureException deserializedRfe = (ReadFailureException) deserialized.error; + + assertEquals(failureReasonMap1, deserializedRfe.failureReasonByEndpoint); + assertEquals(receivedBlockFor, deserializedRfe.received); + assertEquals(receivedBlockFor, deserializedRfe.blockFor); + assertEquals(consistencyLevel, deserializedRfe.consistency); + assertEquals(dataPresent, deserializedRfe.dataPresent); + } + + @Test + public void testV5WriteFailureSerDeser() + { + int receivedBlockFor = 3; + ConsistencyLevel consistencyLevel = ConsistencyLevel.ALL; + WriteType writeType = WriteType.SIMPLE; + WriteFailureException wfe = new WriteFailureException(consistencyLevel, receivedBlockFor, receivedBlockFor, writeType, failureReasonMap2); + + ErrorMessage deserialized = serializeAndGetDeserializedErrorMessage(ErrorMessage.fromException(wfe), 5); + WriteFailureException deserializedWfe = (WriteFailureException) deserialized.error; + + assertEquals(failureReasonMap2, deserializedWfe.failureReasonByEndpoint); + assertEquals(receivedBlockFor, deserializedWfe.received); + assertEquals(receivedBlockFor, deserializedWfe.blockFor); + assertEquals(consistencyLevel, deserializedWfe.consistency); + assertEquals(writeType, deserializedWfe.writeType); + } + + /** + * Make sure that the map passed in to create a Read/WriteFailureException is copied + * so later modifications to the map passed in don't affect the map in the exception. + * + * This is to prevent potential issues in serialization if the map created in + * ReadCallback/AbstractWriteResponseHandler is modified due to a delayed failure + * response after the exception is created. + */ + @Test + public void testRequestFailureExceptionMakesCopy() throws UnknownHostException + { + Map<InetAddress, RequestFailureReason> modifiableFailureReasons = new HashMap<>(failureReasonMap1); + ReadFailureException rfe = new ReadFailureException(ConsistencyLevel.ALL, 3, 3, false, modifiableFailureReasons); + WriteFailureException wfe = new WriteFailureException(ConsistencyLevel.ALL, 3, 3, WriteType.SIMPLE, modifiableFailureReasons); + + modifiableFailureReasons.put(InetAddress.getByName("127.0.0.4"), RequestFailureReason.UNKNOWN); + + assertEquals(failureReasonMap1, rfe.failureReasonByEndpoint); + assertEquals(failureReasonMap1, wfe.failureReasonByEndpoint); + } + + private ErrorMessage serializeAndGetDeserializedErrorMessage(ErrorMessage message, int version) + { + ByteBuf buffer = Unpooled.buffer(ErrorMessage.codec.encodedSize(message, version)); + ErrorMessage.codec.encode(message, buffer, version); + return ErrorMessage.codec.decode(buffer, version); + } +}
