Repository: cassandra
Updated Branches:
  refs/heads/trunk fa1131679 -> 39df31a06


Add error code map to read/write failure responses

Patch by Geoffrey Yu; reviewed by Tyler Hobbs for CASSANDRA-12311


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39df31a0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39df31a0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39df31a0

Branch: refs/heads/trunk
Commit: 39df31a06a35b221f55f17ed20947a1a2e33ee1a
Parents: fa11316
Author: Geoffrey Yu <[email protected]>
Authored: Fri Aug 19 16:04:39 2016 -0500
Committer: Tyler Hobbs <[email protected]>
Committed: Fri Aug 19 16:04:39 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 doc/native_protocol_v5.spec                     |  27 ++++-
 doc/source/operating/error_codes.txt            |  31 +++++
 .../exceptions/ReadFailureException.java        |   7 +-
 .../exceptions/RequestFailureException.java     |  19 ++-
 .../exceptions/RequestFailureReason.java        |  51 ++++++++
 .../exceptions/WriteFailureException.java       |   7 +-
 .../apache/cassandra/hints/HintsDispatcher.java |   3 +-
 .../net/IAsyncCallbackWithFailure.java          |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |  16 +++
 .../org/apache/cassandra/net/MessageIn.java     |  26 ++++
 .../apache/cassandra/net/MessagingService.java  |   4 +-
 .../cassandra/net/ResponseVerbHandler.java      |   2 +-
 .../cassandra/repair/AnticompactionTask.java    |   3 +-
 .../apache/cassandra/repair/SnapshotTask.java   |   3 +-
 .../service/AbstractWriteResponseHandler.java   |  10 +-
 .../cassandra/service/ActiveRepairService.java  |   3 +-
 .../service/BatchlogResponseHandler.java        |   6 +-
 .../apache/cassandra/service/ReadCallback.java  |  11 +-
 .../apache/cassandra/service/StorageProxy.java  |  19 +--
 .../org/apache/cassandra/transport/CBUtil.java  |  29 ++++-
 .../transport/messages/ErrorMessage.java        |  43 ++++++-
 .../cassandra/transport/ErrorMessageTest.java   | 121 +++++++++++++++++++
 23 files changed, 408 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e1e118..c123d17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.10
+ * Extend read/write failure messages with a map of replica addresses
+   to error codes in the v5 native protocol (CASSANDRA-12311)
  * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
  * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054)
  * Fix clustering indexes in presence of static columns in SASI 
(CASSANDRA-12378)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index edf3093..bbde714 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -204,6 +204,7 @@ Table of Contents
 
     [int]          A 4 bytes integer
     [long]         A 8 bytes integer
+    [byte]         A 1 byte unsigned integer
     [short]        A 2 bytes unsigned integer
     [string]       A [short] n, followed by n bytes representing an UTF-8
                    string.
@@ -229,6 +230,9 @@ Table of Contents
                    [byte] representing the IP address (in practice n can only 
be
                    either 4 (IPv4) or 16 (IPv6)), following by one [int]
                    representing the port.
+    [inetaddr]     An IP address (without a port) to a node. It consists of one
+                   [byte] n, that represents the address size, followed by n
+                   [byte] representing the IP address.
     [consistency]  A consistency level specification. This is a [short]
                    representing a consistency level with the following
                    correspondance:
@@ -1088,7 +1092,7 @@ Table of Contents
                                responded. Otherwise, the value is != 0.
     0x1300    Read_failure: A non-timeout exception during a read request. The 
rest
               of the ERROR message body will be
-                <cl><received><blockfor><numfailures><data_present>
+                <cl><received><blockfor><reasonmap><data_present>
               where:
                 <cl> is the [consistency] level of the query having triggered
                      the exception.
@@ -1096,8 +1100,12 @@ Table of Contents
                            answered the request.
                 <blockfor> is an [int] representing the number of replicas 
whose
                            acknowledgement is required to achieve <cl>.
-                <numfailures> is an [int] representing the number of nodes that
-                              experience a failure while executing the request.
+                <reasonmap> is a map of endpoint to failure reason codes. This 
maps
+                            the endpoints of the replica nodes that failed when
+                            executing the request to a code representing the 
reason
+                            for the failure. The map is encoded starting with 
an [int] n
+                            followed by n pairs of <endpoint><failurecode> 
where
+                            <endpoint> is an [inetaddr] and <failurecode> is a 
[short].
                 <data_present> is a single byte. If its value is 0, it means
                                the replica that was asked for data had not
                                responded. Otherwise, the value is != 0.
@@ -1110,7 +1118,7 @@ Table of Contents
                 <arg_types> [string list] one string for each argument type 
(as CQL type) of the failed function
     0x1500    Write_failure: A non-timeout exception during a write request. 
The rest
               of the ERROR message body will be
-                <cl><received><blockfor><numfailures><write_type>
+                <cl><received><blockfor><reasonmap><write_type>
               where:
                 <cl> is the [consistency] level of the query having triggered
                      the exception.
@@ -1118,8 +1126,12 @@ Table of Contents
                            answered the request.
                 <blockfor> is an [int] representing the number of replicas 
whose
                            acknowledgement is required to achieve <cl>.
-                <numfailures> is an [int] representing the number of nodes that
-                              experience a failure while executing the request.
+                <reasonmap> is a map of endpoint to failure reason codes. This 
maps
+                            the endpoints of the replica nodes that failed when
+                            executing the request to a code representing the 
reason
+                            for the failure. The map is encoded starting with 
an [int] n
+                            followed by n pairs of <endpoint><failurecode> 
where
+                            <endpoint> is an [inetaddr] and <failurecode> is a 
[short].
                 <writeType> is a [string] that describes the type of the write
                             that failed. The value of that string can be one
                             of:
@@ -1160,3 +1172,6 @@ Table of Contents
 10. Changes from v4
 
   * Beta protocol flag for v5 native protocol is added (Section 2.2)
+  * <numfailures> in Read_failure and Write_failure error message bodies 
(Section 9)
+    has been replaced with <reasonmap>. The <reasonmap> maps node IP addresses 
to
+    a failure reason code which indicates why the request failed on that node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/doc/source/operating/error_codes.txt
----------------------------------------------------------------------
diff --git a/doc/source/operating/error_codes.txt 
b/doc/source/operating/error_codes.txt
new file mode 100644
index 0000000..279fe40
--- /dev/null
+++ b/doc/source/operating/error_codes.txt
@@ -0,0 +1,31 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+..
+..     http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+Error Codes
+-----------
+
+In Cassandra 3.10 and higher, when the v5 native protocol (or a higher 
version) is used,
+``ReadFailure`` and ``WriteFailure`` errors will contain a map of replica 
addresses
+to error codes.  Those error codes are explained here:
+
+``0x0000``
+  The error does not have a specific code assigned yet, or the cause is 
unknown.
+
+``0x0001``
+  The read operation scanned too many tombstones (as defined by 
``tombstone_failure_threshold`` in ``cassandra.yaml``),
+  causing a TombstoneOverwhelmingException.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java 
b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
index 91cf580..82885e3 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -17,15 +17,18 @@
  */
 package org.apache.cassandra.exceptions;
 
+import java.net.InetAddress;
+import java.util.Map;
+
 import org.apache.cassandra.db.ConsistencyLevel;
 
 public class ReadFailureException extends RequestFailureException
 {
     public final boolean dataPresent;
 
-    public ReadFailureException(ConsistencyLevel consistency, int received, 
int failures, int blockFor, boolean dataPresent)
+    public ReadFailureException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, failures, 
blockFor);
+        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, 
failureReasonByEndpoint);
         this.dataPresent = dataPresent;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
index 6b8b40f..1a5289c 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -17,21 +17,32 @@
  */
 package org.apache.cassandra.exceptions;
 
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.cassandra.db.ConsistencyLevel;
 
 public class RequestFailureException extends RequestExecutionException
 {
     public final ConsistencyLevel consistency;
     public final int received;
-    public final int failures;
     public final int blockFor;
+    public final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
 
-    protected RequestFailureException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int failures, int blockFor)
+    protected RequestFailureException(ExceptionCode code, ConsistencyLevel 
consistency, int received, int blockFor, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
     {
-        super(code, String.format("Operation failed - received %d responses 
and %d failures", received, failures));
+        super(code, String.format("Operation failed - received %d responses 
and %d failures", received, failureReasonByEndpoint.size()));
         this.consistency = consistency;
         this.received = received;
-        this.failures = failures;
         this.blockFor = blockFor;
+
+        // It is possible for the passed in failureReasonByEndpoint map
+        // to have new entries added after this exception is constructed
+        // (e.g. a delayed failure response from a replica). So to be safe
+        // we make a copy of the map at this point to ensure it will not be
+        // modified any further. Otherwise, there could be implications when
+        // we encode this map for transport.
+        this.failureReasonByEndpoint = new HashMap<>(failureReasonByEndpoint);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
new file mode 100644
index 0000000..96ab7b5
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+public enum RequestFailureReason
+{
+    /**
+     * The reason for the failure was none of the below reasons or was not 
recorded by the data node.
+     */
+    UNKNOWN                  (0x0000),
+
+    /**
+     * The data node read too many tombstones when attempting to execute a 
read query (see tombstone_failure_threshold).
+     */
+    READ_TOO_MANY_TOMBSTONES (0x0001);
+
+    /** The code to be serialized as an unsigned 16 bit integer */
+    public final int code;
+    public static final RequestFailureReason[] VALUES = values();
+
+    RequestFailureReason(final int code)
+    {
+        this.code = code;
+    }
+
+    public static RequestFailureReason fromCode(final int code)
+    {
+        for (RequestFailureReason reasonCode : VALUES)
+        {
+            if (reasonCode.code == code)
+                return reasonCode;
+        }
+        throw new IllegalArgumentException("Unknown request failure reason 
error code: " + code);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java 
b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
index 24de9b1..1a857fe 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.exceptions;
 
+import java.net.InetAddress;
+import java.util.Map;
+
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 
@@ -24,9 +27,9 @@ public class WriteFailureException extends 
RequestFailureException
 {
     public final WriteType writeType;
 
-    public WriteFailureException(ConsistencyLevel consistency, int received, 
int failures, int blockFor, WriteType writeType)
+    public WriteFailureException(ConsistencyLevel consistency, int received, 
int blockFor, WriteType writeType, Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint)
     {
-        super(ExceptionCode.WRITE_FAILURE, consistency, received, failures, 
blockFor);
+        super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, 
failureReasonByEndpoint);
         this.writeType = writeType;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java 
b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 478a76b..29bab80 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -192,7 +193,7 @@ final class HintsDispatcher implements AutoCloseable
             return timedOut ? Outcome.TIMEOUT : outcome;
         }
 
-        public void onFailure(InetAddress from)
+        public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
         {
             outcome = Outcome.FAILURE;
             condition.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java 
b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
index 546a416..1cd27b6 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -19,11 +19,13 @@ package org.apache.cassandra.net;
 
 import java.net.InetAddress;
 
+import org.apache.cassandra.exceptions.RequestFailureReason;
+
 public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
 {
 
     /**
      * Called when there is an exception on the remote node or timeout happens
      */
-    void onFailure(InetAddress from);
+    void onFailure(InetAddress from, RequestFailureReason failureReason);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java 
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index d9f8b7c..c97a98f 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,8 +24,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 
 public class MessageDeliveryTask implements Runnable
 {
@@ -89,6 +91,20 @@ public class MessageDeliveryTask implements Runnable
         {
             MessageOut response = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
                                                 
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM, 
MessagingService.ONE_BYTE);
+
+            if (t instanceof TombstoneOverwhelmingException)
+            {
+                try (DataOutputBuffer out = new DataOutputBuffer())
+                {
+                    
out.writeShort(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code);
+                    response = 
response.withParameter(MessagingService.FAILURE_REASON_PARAM, out.getData());
+                }
+                catch (IOException ex)
+                {
+                    throw new RuntimeException(ex);
+                }
+            }
+
             MessagingService.instance().sendReply(response, id, message.from);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java 
b/src/java/org/apache/cassandra/net/MessageIn.java
index a122b61..0562df6 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -27,7 +27,9 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.monitoring.ConstructionTime;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 
 public class MessageIn<T>
@@ -152,6 +154,30 @@ public class MessageIn<T>
         return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
     }
 
+    public boolean containsFailureReason()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_REASON_PARAM);
+    }
+
+    public RequestFailureReason getFailureReason()
+    {
+        if (containsFailureReason())
+        {
+            try (DataInputBuffer in = new 
DataInputBuffer(parameters.get(MessagingService.FAILURE_REASON_PARAM)))
+            {
+                return RequestFailureReason.fromCode(in.readUnsignedShort());
+            }
+            catch (IOException ex)
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+        else
+        {
+            return RequestFailureReason.UNKNOWN;
+        }
+    }
+
     public long getTimeout()
     {
         return verb.getTimeout();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 459c7e6..fea5a8f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
@@ -93,6 +94,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];
     public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+    public static final String FAILURE_REASON_PARAM = "FAIL_REASON";
 
     /**
      * we preface every message with this number so the recipient can validate 
the sender is sane
@@ -510,7 +512,7 @@ public final class MessagingService implements 
MessagingServiceMBean
                     StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new 
Runnable() {
                         @Override
                         public void run() {
-                            
((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+                            
((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target,
 RequestFailureReason.UNKNOWN);
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 28ed365..89e1051 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -44,7 +44,7 @@ public class ResponseVerbHandler implements IVerbHandler
         IAsyncCallback cb = callbackInfo.callback;
         if (message.isFailureResponse())
         {
-            ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+            ((IAsyncCallbackWithFailure) cb).onFailure(message.from, 
message.getFailureReason());
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java 
b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..16a0a12 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -99,7 +100,7 @@ public class AnticompactionTask extends 
AbstractFuture<InetAddress> implements R
             return false;
         }
 
-        public void onFailure(InetAddress from)
+        public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
         {
             task.setException(new RuntimeException("Anticompaction failed or 
timed out in " + from));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java 
b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 94361d8..2b267a7 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -73,7 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> 
implements Runnabl
 
         public boolean isLatencyForSnitch() { return false; }
 
-        public void onFailure(InetAddress from)
+        public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
         {
             //listener.failedSnapshot();
             task.setException(new RuntimeException("Could not create snapshot 
at " + from));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index f412515..c82d58b 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -49,6 +51,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
         = 
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;
+    private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
     private final long queryStartNanoTime;
 
     /**
@@ -69,6 +72,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
         this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
         this.writeType = writeType;
+        this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         this.queryStartNanoTime = queryStartNanoTime;
     }
 
@@ -104,7 +108,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
         if (totalBlockFor() + failures > totalEndpoints())
         {
-            throw new WriteFailureException(consistencyLevel, ackCount(), 
failures, totalBlockFor(), writeType);
+            throw new WriteFailureException(consistencyLevel, ackCount(), 
totalBlockFor(), writeType, failureReasonByEndpoint);
         }
     }
 
@@ -155,7 +159,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     }
 
     @Override
-    public void onFailure(InetAddress from)
+    public void onFailure(InetAddress from, RequestFailureReason failureReason)
     {
         logger.trace("Got failure from {}", from);
 
@@ -163,6 +167,8 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
               ? failuresUpdater.incrementAndGet(this)
               : failures;
 
+        failureReasonByEndpoint.put(from, failureReason);
+
         if (totalBlockFor() + n > totalEndpoints())
             signal();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 4699ae1..b69c24a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
@@ -279,7 +280,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                 return false;
             }
 
-            public void onFailure(InetAddress from)
+            public void onFailure(InetAddress from, RequestFailureReason 
failureReason)
             {
                 status.set(false);
                 failedNodes.add(from.getHostAddress());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index 3b31794..aabf7dd 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 
 public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
 {
@@ -58,9 +60,9 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
         return wrapped.isLatencyForSnitch();
     }
 
-    public void onFailure(InetAddress from)
+    public void onFailure(InetAddress from, RequestFailureReason failureReason)
     {
-        wrapped.onFailure(from);
+        wrapped.onFailure(from, failureReason);
     }
 
     public void assureSufficientLiveNodes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java 
b/src/java/org/apache/cassandra/service/ReadCallback.java
index 3f1ff3c..ad80913 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -32,6 +34,7 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -63,6 +66,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     private static final AtomicIntegerFieldUpdater<ReadCallback> 
failuresUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, 
"failures");
     private volatile int failures = 0;
+    private final Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint;
 
     private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 
@@ -89,6 +93,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
         this.resolver = resolver;
         this.queryStartNanoTime = queryStartNanoTime;
         this.endpoints = endpoints;
+        this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         // we don't support read repair (or rapid read protection) for range 
scans yet (CASSANDRA-6897)
         assert !(command instanceof PartitionRangeReadCommand) || blockfor >= 
endpoints.size();
 
@@ -129,7 +134,7 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
 
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
-            ? new ReadFailureException(consistencyLevel, received, failures, 
blockfor, resolver.isDataPresent())
+            ? new ReadFailureException(consistencyLevel, received, blockfor, 
resolver.isDataPresent(), failureReasonByEndpoint)
             : new ReadTimeoutException(consistencyLevel, received, blockfor, 
resolver.isDataPresent());
     }
 
@@ -252,12 +257,14 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
     }
 
     @Override
-    public void onFailure(InetAddress from)
+    public void onFailure(InetAddress from, RequestFailureReason failureReason)
     {
         int n = waitingFor(from)
               ? failuresUpdater.incrementAndGet(this)
               : failures;
 
+        failureReasonByEndpoint.put(from, failureReason);
+
         if (blockfor + n > endpoints.size())
             condition.signalAll();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index a0f39af..f180bdf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -579,7 +579,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply paxos commit locally : 
{}", ex);
-                    
responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                    
responseHandler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
                 }
             }
 
@@ -645,7 +645,7 @@ public class StorageProxy implements StorageProxyMBean
                     writeMetricsMap.get(consistency_level).failures.mark();
                     WriteFailureException fe = (WriteFailureException)ex;
                     Tracing.trace("Write failure; received {} of {} required 
replies, failed {} requests",
-                                  fe.received, fe.blockFor, fe.failures);
+                                  fe.received, fe.blockFor, 
fe.failureReasonByEndpoint.size());
                 }
                 else
                 {
@@ -1033,7 +1033,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
-                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
+                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
             }
         }
     }
@@ -1387,7 +1387,7 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply mutation locally : {}", 
ex);
-                    handler.onFailure(FBUtilities.getBroadcastAddress());
+                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
                 }
             }
 
@@ -1601,7 +1601,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             catch (WriteFailureException e)
             {
-                throw new ReadFailureException(consistencyLevel, e.received, 
e.failures, e.blockFor, false);
+                throw new ReadFailureException(consistencyLevel, e.received, 
e.blockFor, false, e.failureReasonByEndpoint);
             }
 
             result = fetchRows(group.commands, consistencyForCommitOrFetch, 
queryStartNanoTime);
@@ -1858,18 +1858,23 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     MessagingService.instance().incrementDroppedMessages(verb, 
System.currentTimeMillis() - constructionTime);
-                    handler.onFailure(FBUtilities.getBroadcastAddress());
+                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
                 }
 
                 
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }
             catch (Throwable t)
             {
-                handler.onFailure(FBUtilities.getBroadcastAddress());
                 if (t instanceof TombstoneOverwhelmingException)
+                {
+                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
                     logger.error(t.getMessage());
+                }
                 else
+                {
+                    handler.onFailure(FBUtilities.getBroadcastAddress(), 
RequestFailureReason.UNKNOWN);
                     throw t;
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java 
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 570fd6b..25409ae 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -496,7 +496,7 @@ public abstract class CBUtil
 
     public static InetSocketAddress readInet(ByteBuf cb)
     {
-        int addrSize = cb.readByte();
+        int addrSize = cb.readByte() & 0xFF;
         byte[] address = new byte[addrSize];
         cb.readBytes(address);
         int port = cb.readInt();
@@ -525,6 +525,33 @@ public abstract class CBUtil
         return 1 + address.length + 4;
     }
 
+    public static InetAddress readInetAddr(ByteBuf cb)
+    {
+        int addressSize = cb.readByte() & 0xFF;
+        byte[] address = new byte[addressSize];
+        cb.readBytes(address);
+        try
+        {
+            return InetAddress.getByAddress(address);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new ProtocolException("Invalid IP address while 
deserializing inet address");
+        }
+    }
+
+    public static void writeInetAddr(InetAddress inetAddr, ByteBuf cb)
+    {
+        byte[] address = inetAddr.getAddress();
+        cb.writeByte(address.length);
+        cb.writeBytes(address);
+    }
+
+    public static int sizeOfInetAddr(InetAddress inetAddr)
+    {
+        return 1 + inetAddr.getAddress().length;
+    }
+
     /*
      * Reads *all* readable bytes from {@code cb} and return them.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 8f45d4d..5ce248f 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.transport.messages;
 
+import java.net.InetAddress;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.CodecException;
@@ -76,22 +79,35 @@ public class ErrorMessage extends Message.Response
                 case TRUNCATE_ERROR:
                     te = new TruncateException(msg);
                     break;
-                case WRITE_FAILURE: 
+                case WRITE_FAILURE:
                 case READ_FAILURE:
                     {
                         ConsistencyLevel cl = 
CBUtil.readConsistencyLevel(body);
                         int received = body.readInt();
                         int blockFor = body.readInt();
+                        // The number of failures is also present in protocol 
v5, but used instead to specify the size of the failure map
                         int failure = body.readInt();
+
+                        Map<InetAddress, RequestFailureReason> 
failureReasonByEndpoint = new ConcurrentHashMap<>();
+                        if (version >= Server.VERSION_5)
+                        {
+                            for (int i = 0; i < failure; i++)
+                            {
+                                InetAddress endpoint = 
CBUtil.readInetAddr(body);
+                                RequestFailureReason failureReason = 
RequestFailureReason.fromCode(body.readUnsignedShort());
+                                failureReasonByEndpoint.put(endpoint, 
failureReason);
+                            }
+                        }
+
                         if (code == ExceptionCode.WRITE_FAILURE)
                         {
                             WriteType writeType = 
Enum.valueOf(WriteType.class, CBUtil.readString(body));
-                            te = new WriteFailureException(cl, received, 
failure, blockFor, writeType);
+                            te = new WriteFailureException(cl, received, 
blockFor, writeType, failureReasonByEndpoint);
                         }
                         else
                         {
                             byte dataPresent = body.readByte();
-                            te = new ReadFailureException(cl, received, 
failure, blockFor, dataPresent != 0);   
+                            te = new ReadFailureException(cl, received, 
blockFor, dataPresent != 0, failureReasonByEndpoint);
                         }
                     }
                     break;
@@ -171,7 +187,17 @@ public class ErrorMessage extends Message.Response
                         CBUtil.writeConsistencyLevel(rfe.consistency, dest);
                         dest.writeInt(rfe.received);
                         dest.writeInt(rfe.blockFor);
-                        dest.writeInt(rfe.failures);
+                        // The number of failures is also present in protocol 
v5, but used instead to specify the size of the failure map
+                        dest.writeInt(rfe.failureReasonByEndpoint.size());
+
+                        if (version >= Server.VERSION_5)
+                        {
+                            for (Map.Entry<InetAddress, RequestFailureReason> 
entry : rfe.failureReasonByEndpoint.entrySet())
+                            {
+                                CBUtil.writeInetAddr(entry.getKey(), dest);
+                                dest.writeShort(entry.getValue().code);
+                            }
+                        }
 
                         if (isWrite)
                             
CBUtil.writeString(((WriteFailureException)rfe).writeType.toString(), dest);
@@ -228,6 +254,15 @@ public class ErrorMessage extends Message.Response
                         boolean isWrite = err.code() == 
ExceptionCode.WRITE_FAILURE;
                         size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) 
+ 4 + 4 + 4;
                         size += isWrite ? 
CBUtil.sizeOfString(((WriteFailureException)rfe).writeType.toString()) : 1;
+
+                        if (version >= Server.VERSION_5)
+                        {
+                            for (Map.Entry<InetAddress, RequestFailureReason> 
entry : rfe.failureReasonByEndpoint.entrySet())
+                            {
+                                size += CBUtil.sizeOfInetAddr(entry.getKey());
+                                size += 2; // RequestFailureReason code
+                            }
+                        }
                     }
                     break;
                 case WRITE_TIMEOUT:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39df31a0/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java 
b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
new file mode 100644
index 0000000..a145d18
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+
+import static org.junit.Assert.assertEquals;
+
+public class ErrorMessageTest
+{
+    private static Map<InetAddress, RequestFailureReason> failureReasonMap1;
+    private static Map<InetAddress, RequestFailureReason> failureReasonMap2;
+
+    @BeforeClass
+    public static void setUpFixtures() throws UnknownHostException
+    {
+        failureReasonMap1 = new HashMap<>();
+        failureReasonMap1.put(InetAddress.getByName("127.0.0.1"), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+        failureReasonMap1.put(InetAddress.getByName("127.0.0.2"), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+        failureReasonMap1.put(InetAddress.getByName("127.0.0.3"), 
RequestFailureReason.UNKNOWN);
+
+        failureReasonMap2 = new HashMap<>();
+        failureReasonMap2.put(InetAddress.getByName("127.0.0.1"), 
RequestFailureReason.UNKNOWN);
+        failureReasonMap2.put(InetAddress.getByName("127.0.0.2"), 
RequestFailureReason.UNKNOWN);
+    }
+
+    @Test
+    public void testV5ReadFailureSerDeser()
+    {
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.ALL;
+        boolean dataPresent = false;
+        ReadFailureException rfe = new ReadFailureException(consistencyLevel, 
receivedBlockFor, receivedBlockFor, dataPresent, failureReasonMap1);
+
+        ErrorMessage deserialized = 
serializeAndGetDeserializedErrorMessage(ErrorMessage.fromException(rfe), 5);
+        ReadFailureException deserializedRfe = (ReadFailureException) 
deserialized.error;
+
+        assertEquals(failureReasonMap1, 
deserializedRfe.failureReasonByEndpoint);
+        assertEquals(receivedBlockFor, deserializedRfe.received);
+        assertEquals(receivedBlockFor, deserializedRfe.blockFor);
+        assertEquals(consistencyLevel, deserializedRfe.consistency);
+        assertEquals(dataPresent, deserializedRfe.dataPresent);
+    }
+
+    @Test
+    public void testV5WriteFailureSerDeser()
+    {
+        int receivedBlockFor = 3;
+        ConsistencyLevel consistencyLevel = ConsistencyLevel.ALL;
+        WriteType writeType = WriteType.SIMPLE;
+        WriteFailureException wfe = new 
WriteFailureException(consistencyLevel, receivedBlockFor, receivedBlockFor, 
writeType, failureReasonMap2);
+
+        ErrorMessage deserialized = 
serializeAndGetDeserializedErrorMessage(ErrorMessage.fromException(wfe), 5);
+        WriteFailureException deserializedWfe = (WriteFailureException) 
deserialized.error;
+
+        assertEquals(failureReasonMap2, 
deserializedWfe.failureReasonByEndpoint);
+        assertEquals(receivedBlockFor, deserializedWfe.received);
+        assertEquals(receivedBlockFor, deserializedWfe.blockFor);
+        assertEquals(consistencyLevel, deserializedWfe.consistency);
+        assertEquals(writeType, deserializedWfe.writeType);
+    }
+
+    /**
+     * Make sure that the map passed in to create a Read/WriteFailureException 
is copied
+     * so later modifications to the map passed in don't affect the map in the 
exception.
+     *
+     * This is to prevent potential issues in serialization if the map created 
in
+     * ReadCallback/AbstractWriteResponseHandler is modified due to a delayed 
failure
+     * response after the exception is created.
+     */
+    @Test
+    public void testRequestFailureExceptionMakesCopy() throws 
UnknownHostException
+    {
+        Map<InetAddress, RequestFailureReason> modifiableFailureReasons = new 
HashMap<>(failureReasonMap1);
+        ReadFailureException rfe = new 
ReadFailureException(ConsistencyLevel.ALL, 3, 3, false, 
modifiableFailureReasons);
+        WriteFailureException wfe = new 
WriteFailureException(ConsistencyLevel.ALL, 3, 3, WriteType.SIMPLE, 
modifiableFailureReasons);
+
+        modifiableFailureReasons.put(InetAddress.getByName("127.0.0.4"), 
RequestFailureReason.UNKNOWN);
+
+        assertEquals(failureReasonMap1, rfe.failureReasonByEndpoint);
+        assertEquals(failureReasonMap1, wfe.failureReasonByEndpoint);
+    }
+
+    private ErrorMessage serializeAndGetDeserializedErrorMessage(ErrorMessage 
message, int version)
+    {
+        ByteBuf buffer = 
Unpooled.buffer(ErrorMessage.codec.encodedSize(message, version));
+        ErrorMessage.codec.encode(message, buffer, version);
+        return ErrorMessage.codec.decode(buffer, version);
+    }
+}

Reply via email to