This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 7f54fe0 Fix failure handling in inter-node communication
7f54fe0 is described below
commit 7f54fe02298b90e6152acc026384c033a96ce621
Author: Aleksandr Sorokoumov <[email protected]>
AuthorDate: Tue Oct 26 18:14:35 2021 +0100
Fix failure handling in inter-node communication
patch by Aleksandr Sorokoumov; reviewed by Andrés de la Peña and Paulo
Motta for CASSANDRA-16334
---
CHANGES.txt | 1 +
.../apache/cassandra/db/MutationVerbHandler.java | 3 ++-
.../apache/cassandra/net/MessageDeliveryTask.java | 22 ++++++++++++++++++----
.../service/AbstractWriteResponseHandler.java | 3 +++
.../org/apache/cassandra/service/StorageProxy.java | 2 +-
5 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c79746f..7dd58ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Fix failure handling in inter-node communication (CASSANDRA-16334)
* Log more information when a node runs out of commitlog space
(CASSANDRA-11323)
* Don't take snapshots when truncating system tables (CASSANDRA-16839)
* Make -Dtest.methods consistently optional in all Ant test targets
(CASSANDRA-17014)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 5888438..10b77f2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -87,7 +87,8 @@ public class MutationVerbHandler implements
IVerbHandler<Mutation>
int size = in.readInt();
// tell the recipients who to send their ack to
- MessageOut<Mutation> message = new MessageOut<>(verb, mutation,
Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
+ MessageOut<Mutation> message = new MessageOut<>(verb, mutation,
Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress())
+
.withParameter(MessagingService.FAILURE_CALLBACK_PARAM,
MessagingService.ONE_BYTE);
// Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index ce6eebc..26e780f 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,11 +18,14 @@
package org.apache.cassandra.net;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -31,12 +34,12 @@ public class MessageDeliveryTask implements Runnable
{
private static final Logger logger =
LoggerFactory.getLogger(MessageDeliveryTask.class);
- private final MessageIn message;
+ private final MessageIn<?> message;
private final int id;
private final long constructionTime;
private final boolean isCrossNodeTimestamp;
- public MessageDeliveryTask(MessageIn message, int id, long timestamp,
boolean isCrossNodeTimestamp)
+ public MessageDeliveryTask(MessageIn<?> message, int id, long timestamp,
boolean isCrossNodeTimestamp)
{
assert message != null;
this.message = message;
@@ -90,9 +93,20 @@ public class MessageDeliveryTask implements Runnable
{
if (message.doCallbackOnFailure())
{
- MessageOut response = new
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+ MessageOut<?> response = new
MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE)
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM,
MessagingService.ONE_BYTE);
- MessagingService.instance().sendReply(response, id, message.from);
+
+ InetAddress from;
+ byte[] fromBytes = message.parameters.get(Mutation.FORWARD_FROM);
+ try
+ {
+ from = fromBytes != null ? InetAddress.getByAddress(fromBytes)
: message.from;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ MessagingService.instance().sendReply(response, id, from);
}
}
diff --git
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e3ba66e..bf099e4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -122,6 +122,9 @@ public abstract class AbstractWriteResponseHandler<T>
implements IAsyncCallbackW
*/
protected int totalEndpoints()
{
+ if (consistencyLevel != null && consistencyLevel.isDatacenterLocal())
+ return
consistencyLevel.countLocalEndpoints(Iterables.concat(naturalEndpoints,
pendingEndpoints));
+
return naturalEndpoints.size() + pendingEndpoints.size();
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index a6d35f4..89f93f6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1445,7 +1445,7 @@ public class StorageProxy implements StorageProxyMBean
catch (Exception ex)
{
if (!(ex instanceof WriteTimeoutException))
- logger.error("Failed to apply mutation locally : {}",
ex);
+ logger.error("Failed to apply mutation locally : ",
ex);
handler.onFailure(FBUtilities.getBroadcastAddress());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]