Repository: cassandra Updated Branches: refs/heads/trunk e5d119aab -> c059a5689
Add WriteFailureException to native protocol Patch by Stefania Alborghetti; reviewed by Tyler Hobbs for CASSANDRA-8592 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c059a568 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c059a568 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c059a568 Branch: refs/heads/trunk Commit: c059a56890dd7b9aca0addca75a05bcce6b65a77 Parents: e5d119a Author: Stefania Alborghetti <[email protected]> Authored: Thu Mar 12 11:40:22 2015 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Thu Mar 12 11:40:22 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + doc/native_protocol_v4.spec | 32 +++- .../apache/cassandra/db/BatchlogManager.java | 36 +++-- .../db/CounterMutationVerbHandler.java | 40 ++--- .../cassandra/db/HintedHandOffManager.java | 12 +- src/java/org/apache/cassandra/db/Keyspace.java | 6 + .../cassandra/db/MutationVerbHandler.java | 15 +- .../cassandra/exceptions/ExceptionCode.java | 1 + .../exceptions/RequestFailureException.java | 2 +- .../exceptions/WriteFailureException.java | 32 ++++ .../locator/AbstractReplicationStrategy.java | 12 +- .../org/apache/cassandra/net/CallbackInfo.java | 6 +- .../org/apache/cassandra/net/IVerbHandler.java | 4 +- .../cassandra/net/MessageDeliveryTask.java | 37 +++-- .../apache/cassandra/net/MessagingService.java | 4 +- .../apache/cassandra/net/WriteCallbackInfo.java | 2 +- .../service/AbstractWriteResponseHandler.java | 57 ++++++- .../DatacenterSyncWriteResponseHandler.java | 4 +- .../service/DatacenterWriteResponseHandler.java | 12 +- .../apache/cassandra/service/StorageProxy.java | 157 ++++++++++++------- .../cassandra/service/WriteResponseHandler.java | 4 +- .../transport/messages/ErrorMessage.java | 31 +++- .../apache/cassandra/db/SerializationsTest.java | 4 +- 23 files changed, 354 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c67acd1..49f6358 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0 + * Add WriteFailureException to native protocol, notify coordinator of + write failures (CASSANDRA-8592) * Convert SequentialWriter to nio (CASSANDRA-8709) * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850) * Record client ip address in tracing sessions (CASSANDRA-8162) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/doc/native_protocol_v4.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec index 03a5a50..69adc17 100644 --- a/doc/native_protocol_v4.spec +++ b/doc/native_protocol_v4.spec @@ -1010,9 +1010,9 @@ Table of Contents - "BATCH": the write was a (logged) batch write. If this type is received, it means the batch log has been successfully written (otherwise a - "BATCH_LOG" type would have been send instead). + "BATCH_LOG" type would have been sent instead). - "UNLOGGED_BATCH": the write was an unlogged - batch. Not batch log write has been attempted. + batch. No batch log write has been attempted. - "COUNTER": the write was a counter write (batched or not). - "BATCH_LOG": the timeout occured during the @@ -1058,6 +1058,34 @@ Table of Contents <keyspace> is the keyspace [string] of the failed function <function> is the name [string] of the failed function <arg_types> [string list] one string for each argument type (as CQL type) of the failed function + 0x1500 Write_failure: A non-timeout exception during a write request. The rest + of the ERROR message body will be + <cl><received><blockfor><numfailures><write_type> + where: + <cl> is the [consistency] level of the query having triggered + the exception. + <received> is an [int] representing the number of nodes having + answered the request. + <blockfor> is the number of replicas whose response is + required to achieve <cl>. + <numfailures> is an [int] representing the number of nodes that + experience a failure while executing the request. + <writeType> is a [string] that describe the type of the write + that failed. The value of that string can be one + of: + - "SIMPLE": the write was a non-batched + non-counter write. + - "BATCH": the write was a (logged) batch write. + If this type is received, it means the batch log + has been successfully written (otherwise a + "BATCH_LOG" type would have been sent instead). + - "UNLOGGED_BATCH": the write was an unlogged + batch. No batch log write has been attempted. + - "COUNTER": the write was a counter write + (batched or not). + - "BATCH_LOG": the failure occured during the + write to the batch log when a (logged) batch + write was requested. 0x2000 Syntax_error: The submitted query has a syntax error. 0x2100 Unauthorized: The logged user doesn't have the right to perform http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index e71a62c..8eaea52 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -31,7 +31,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import com.google.common.util.concurrent.RateLimiter; -import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +41,11 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -54,7 +56,6 @@ import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; - import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; public class BatchlogManager implements BatchlogManagerMBean @@ -264,7 +265,7 @@ public class BatchlogManager implements BatchlogManagerMBean private final ByteBuffer data; private final int version; - private List<ReplayWriteResponseHandler> replayHandlers; + private List<ReplayWriteResponseHandler<Mutation>> replayHandlers; public Batch(UUID id, long writtenAt, ByteBuffer data, int version) { @@ -298,14 +299,15 @@ public class BatchlogManager implements BatchlogManagerMBean { for (int i = 0; i < replayHandlers.size(); i++) { - ReplayWriteResponseHandler handler = replayHandlers.get(i); + ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); try { handler.get(); } - catch (WriteTimeoutException e) + catch (WriteTimeoutException|WriteFailureException e) { - logger.debug("Timed out replaying a batched mutation to a node, will write a hint"); + logger.debug("Failed replaying a batched mutation to a node, will write a hint"); + logger.debug("Failure was : {}", e.getMessage()); // writing hints for the rest to hints, starting from i writeHintsForUndeliveredEndpoints(i); return; @@ -348,7 +350,7 @@ public class BatchlogManager implements BatchlogManagerMBean { Mutation undeliveredMutation = replayingMutations.get(i); int ttl = calculateHintTTL(replayingMutations); - ReplayWriteResponseHandler handler = replayHandlers.get(i); + ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i); if (ttl > 0 && handler != null) for (InetAddress endpoint : handler.undelivered) @@ -361,12 +363,12 @@ public class BatchlogManager implements BatchlogManagerMBean } } - private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) + private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl) { - List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size()); + List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size()); for (Mutation mutation : mutations) { - ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl); + ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, ttl); if (handler != null) handlers.add(handler); } @@ -379,7 +381,7 @@ public class BatchlogManager implements BatchlogManagerMBean * * @return direct delivery handler to wait on or null, if no live nodes found */ - private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) + private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl) { Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); @@ -399,7 +401,7 @@ public class BatchlogManager implements BatchlogManagerMBean if (liveEndpoints.isEmpty()) return null; - ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler(liveEndpoints); + ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints); MessageOut<Mutation> message = mutation.createMessage(); for (InetAddress endpoint : liveEndpoints) MessagingService.instance().sendRR(message, endpoint, handler, false); @@ -418,7 +420,11 @@ public class BatchlogManager implements BatchlogManagerMBean return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); } - private static class ReplayWriteResponseHandler extends WriteResponseHandler + /** + * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from + * which we did not receive a successful reply. + */ + private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T> { private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>()); @@ -435,9 +441,9 @@ public class BatchlogManager implements BatchlogManagerMBean } @Override - public void response(MessageIn m) + public void response(MessageIn<T> m) { - boolean removed = undelivered.remove(m.from); + boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from); assert removed; super.response(m); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index d65fbd7..d9ee38a 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -34,31 +34,23 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> public void doVerb(final MessageIn<CounterMutation> message, final int id) { - try - { - final CounterMutation cm = message.payload; - logger.debug("Applying forwarded {}", cm); + final CounterMutation cm = message.payload; + logger.debug("Applying forwarded {}", cm); - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - // We should not wait for the result of the write in this thread, - // otherwise we could have a distributed deadlock between replicas - // running this VerbHandler (see #4578). - // Instead, we use a callback to send the response. Note that the callback - // will not be called if the request timeout, but this is ok - // because the coordinator of the counter mutation will timeout on - // it's own in that case. - StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable() - { - public void run() - { - MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from); - } - }); - } - catch (RequestExecutionException e) + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + // We should not wait for the result of the write in this thread, + // otherwise we could have a distributed deadlock between replicas + // running this VerbHandler (see #4578). + // Instead, we use a callback to send the response. Note that the callback + // will not be called if the request timeout, but this is ok + // because the coordinator of the counter mutation will timeout on + // it's own in that case. + StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable() { - // The coordinator will timeout on it's own so ignore - logger.debug("counter error", e); - } + public void run() + { + MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from); + } + }); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 324943a..589958e 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -49,6 +49,7 @@ import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; @@ -389,7 +390,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean break; } - List<WriteResponseHandler> responseHandlers = Lists.newArrayList(); + List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList(); for (final Cell hint : hintsPage) { // check if hints delivery has been paused during the process @@ -452,20 +453,21 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean deleteHint(hostIdBytes, hint.name(), hint.timestamp()); } }; - WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback); + WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback); MessagingService.instance().sendRR(message, endpoint, responseHandler, false); responseHandlers.add(responseHandler); } - for (WriteResponseHandler handler : responseHandlers) + for (WriteResponseHandler<Mutation> handler : responseHandlers) { try { handler.get(); } - catch (WriteTimeoutException e) + catch (WriteTimeoutException|WriteFailureException e) { - logger.info("Timed out replaying hints to {}; aborting ({} delivered)", endpoint, rowsReplayed); + logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}", + endpoint, rowsReplayed, e.getMessage()); break delivery; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index e3301b1..b0cc25d 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -58,6 +58,9 @@ public class Keyspace private static final Logger logger = LoggerFactory.getLogger(Keyspace.class); + private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", ""); + private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty(); + public final KeyspaceMetrics metric; // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure @@ -357,6 +360,9 @@ public class Keyspace */ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { + if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) + throw new RuntimeException("Testing write failures"); + try (OpOrder.Group opGroup = writeOrder.start()) { // write the mutation to the commitlog and memtables http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 43ffeae..92bfdb5 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -18,24 +18,20 @@ package org.apache.cassandra.db; import java.io.DataInputStream; +import java.io.IOError; import java.io.IOException; import java.net.InetAddress; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; public class MutationVerbHandler implements IVerbHandler<Mutation> { - private static final Logger logger = LoggerFactory.getLogger(MutationVerbHandler.class); + private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true"); - public void doVerb(MessageIn<Mutation> message, int id) + public void doVerb(MessageIn<Mutation> message, int id) throws IOException { - try - { // Check if there were any forwarding headers in this message byte[] from = message.parameters.get(Mutation.FORWARD_FROM); InetAddress replyTo; @@ -55,11 +51,6 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> WriteResponse response = new WriteResponse(); Tracing.trace("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(response.createMessage(), id, replyTo); - } - catch (IOException e) - { - logger.error("Error in mutation", e); - } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/ExceptionCode.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index 80cd4df..6ad0577 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -41,6 +41,7 @@ public enum ExceptionCode READ_TIMEOUT (0x1200), READ_FAILURE (0x1300), FUNCTION_FAILURE(0x1400), + WRITE_FAILURE (0x1500), // 2xx: problem validating the request SYNTAX_ERROR (0x2000), http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/RequestFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java index 1ff44d9..6b8b40f 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java @@ -28,7 +28,7 @@ public class RequestFailureException extends RequestExecutionException protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor) { - super(code, String.format("Operation failed - received %d responses and %d failures.", received, failures)); + super(code, String.format("Operation failed - received %d responses and %d failures", received, failures)); this.consistency = consistency; this.received = received; this.failures = failures; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/WriteFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java new file mode 100644 index 0000000..24de9b1 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; + +public class WriteFailureException extends RequestFailureException +{ + public final WriteType writeType; + + public WriteFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, WriteType writeType) + { + super(ExceptionCode.WRITE_FAILURE, consistency, received, failures, blockFor); + this.writeType = writeType; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index d3b8571..461265c 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -128,18 +128,22 @@ public abstract class AbstractReplicationStrategy */ public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); - public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType) + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, + Collection<InetAddress> pendingEndpoints, + ConsistencyLevel consistency_level, + Runnable callback, + WriteType writeType) { if (consistency_level.isDatacenterLocal()) { // block for in this context will be localnodes block. - return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } - return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); + return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType); } private Keyspace getKeyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java index b61210c..ea000ae 100644 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@ -33,17 +33,13 @@ public class CallbackInfo protected final IVersionedSerializer<?> serializer; private final boolean failureCallback; - public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer) - { - this(target, callback, serializer, false); - } - /** * Create CallbackInfo without sent message * * @param target target to send message * @param callback * @param serializer serializer to deserialize response message + * @param failureCallback True when we have a callback to handle failures */ public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/IVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java index 7f835c0..574f30f 100644 --- a/src/java/org/apache/cassandra/net/IVerbHandler.java +++ b/src/java/org/apache/cassandra/net/IVerbHandler.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.net; +import java.io.IOException; + /** * IVerbHandler provides the method that all verb handlers need to implement. * The concrete implementation of this interface would provide the functionality @@ -34,5 +36,5 @@ public interface IVerbHandler<T> * @param message - incoming message that needs handling. * @param id */ - public void doVerb(MessageIn<T> message, int id); + public void doVerb(MessageIn<T> message, int id) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index da12d7a..f160464 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -17,12 +17,13 @@ */ package org.apache.cassandra.net; +import java.io.IOException; import java.util.EnumSet; -import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.gms.Gossiper; public class MessageDeliveryTask implements Runnable @@ -62,24 +63,36 @@ public class MessageDeliveryTask implements Runnable { verbHandler.doVerb(message, id); } + catch (IOException ioe) + { + handleFailure(ioe); + throw new RuntimeException(ioe); + } + catch (TombstoneOverwhelmingException toe) + { + handleFailure(toe); + logger.error(toe.getMessage()); + } catch (Throwable t) { - if (message.doCallbackOnFailure()) - { - MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); - MessagingService.instance().sendReply(response, id, message.from); - } - - if (t instanceof TombstoneOverwhelmingException) - logger.error(t.getMessage()); - else - throw t; + handleFailure(t); + throw t; } + if (GOSSIP_VERBS.contains(message.verb)) Gossiper.instance.setLastProcessedMessageAt(constructionTime); } + private void handleFailure(Throwable t) + { + if (message.doCallbackOnFailure()) + { + MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) + .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + MessagingService.instance().sendReply(response, id, message.from); + } + } + EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.Verb.GOSSIP_DIGEST_ACK2, MessagingService.Verb.GOSSIP_DIGEST_SYN); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c333b04..a5cbfa7 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -639,11 +639,11 @@ public final class MessagingService implements MessagingServiceMBean */ public int sendRR(MessageOut<? extends IMutation> message, InetAddress to, - AbstractWriteResponseHandler handler, + AbstractWriteResponseHandler<? extends IMutation> handler, boolean allowHints) { int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints); - sendOneWay(message, id, to); + sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to); return id; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java index 987ec15..9322631 100644 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@ -37,7 +37,7 @@ public class WriteCallbackInfo extends CallbackInfo ConsistencyLevel consistencyLevel, boolean allowHints) { - super(target, callback, serializer); + super(target, callback, serializer, true); assert message != null; this.sentMessage = message; this.consistencyLevel = consistencyLevel; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 72e5b9c..8978034 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -20,8 +20,11 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; @@ -29,11 +32,14 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.concurrent.SimpleCondition; -public abstract class AbstractWriteResponseHandler implements IAsyncCallback +public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T> { + protected static final Logger logger = LoggerFactory.getLogger( AbstractWriteResponseHandler.class ); + private final SimpleCondition condition = new SimpleCondition(); protected final Keyspace keyspace; protected final long start; @@ -42,6 +48,9 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback protected final Runnable callback; protected final Collection<InetAddress> pendingEndpoints; private final WriteType writeType; + private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater + = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); + private volatile int failures = 0; /** * @param callback A callback to be called when the write is successful. @@ -62,7 +71,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback this.writeType = writeType; } - public void get() throws WriteTimeoutException + public void get() throws WriteTimeoutException, WriteFailureException { long requestTimeout = writeType == WriteType.COUNTER ? DatabaseDescriptor.getCounterWriteRpcTimeout() @@ -82,8 +91,8 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback if (!success) { - int acks = ackCount(); int blockedFor = totalBlockFor(); + int acks = ackCount(); // It's pretty unlikely, but we can race between exiting await above and here, so // that we could now have enough acks. In that case, we "lie" on the acks count to // avoid sending confusing info to the user (see CASSANDRA-6491). @@ -91,8 +100,16 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback acks = blockedFor - 1; throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor); } + + if (totalBlockFor() + failures > totalEndpoints()) + { + throw new WriteFailureException(consistencyLevel, ackCount(), failures, totalBlockFor(), writeType); + } } + /** + * @return the minimum number of endpoints that must reply. + */ protected int totalBlockFor() { // During bootstrap, we have to include the pending endpoints or we may fail the consistency level @@ -100,10 +117,29 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback return consistencyLevel.blockFor(keyspace) + pendingEndpoints.size(); } + /** + * @return the total number of endpoints the request has been sent to. + */ + protected int totalEndpoints() + { + return naturalEndpoints.size() + pendingEndpoints.size(); + } + + /** + * @return true if the message counts towards the totalBlockFor() threshold + */ + protected boolean waitingFor(InetAddress from) + { + return true; + } + + /** + * @return number of responses received + */ protected abstract int ackCount(); /** null message means "response from local write" */ - public abstract void response(MessageIn msg); + public abstract void response(MessageIn<T> msg); public void assureSufficientLiveNodes() throws UnavailableException { @@ -116,4 +152,17 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback if (callback != null) callback.run(); } + + @Override + public void onFailure(InetAddress from) + { + logger.trace("Got failure from {}", from); + + int n = waitingFor(from) + ? failuresUpdater.incrementAndGet(this) + : failures; + + if (totalBlockFor() + n > totalEndpoints()) + signal(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 511a122..b095c7f 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -34,7 +34,7 @@ import org.apache.cassandra.db.WriteType; /** * This class blocks for a quorum of responses _in all datacenters_ (CL.EACH_QUORUM). */ -public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHandler +public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponseHandler<T> { private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); @@ -68,7 +68,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan } } - public void response(MessageIn message) + public void response(MessageIn<T> message) { String dataCenter = message == null ? DatabaseDescriptor.getLocalDataCenter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index fb8f992..b1b7b10 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -28,7 +28,7 @@ import org.apache.cassandra.db.WriteType; /** * This class blocks for a quorum of responses _in the local datacenter only_ (CL.LOCAL_QUORUM). */ -public class DatacenterWriteResponseHandler extends WriteResponseHandler +public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> { public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, @@ -42,9 +42,9 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler } @Override - public void response(MessageIn message) + public void response(MessageIn<T> message) { - if (message == null || consistencyLevel.isLocal(message.from)) + if (message == null || waitingFor(message.from)) super.response(message); } @@ -55,4 +55,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler // or we may fail the consistency level guarantees (see #833, #8058) return consistencyLevel.blockFor(keyspace) + consistencyLevel.countLocalEndpoints(pendingEndpoints); } + + @Override + protected boolean waitingFor(InetAddress from) + { + return consistencyLevel.isLocal(from); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 80b22f4..d667b1e 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,7 +31,6 @@ import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.metrics.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +57,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; import org.apache.cassandra.sink.SinkManager; @@ -113,7 +113,7 @@ public class StorageProxy implements StorageProxyMBean { public void apply(IMutation mutation, Iterable<InetAddress> targets, - AbstractWriteResponseHandler responseHandler, + AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws OverloadedException @@ -133,7 +133,7 @@ public class StorageProxy implements StorageProxyMBean { public void apply(IMutation mutation, Iterable<InetAddress> targets, - AbstractWriteResponseHandler responseHandler, + AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) { @@ -145,7 +145,7 @@ public class StorageProxy implements StorageProxyMBean { public void apply(IMutation mutation, Iterable<InetAddress> targets, - AbstractWriteResponseHandler responseHandler, + AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) { @@ -203,7 +203,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, ClientState state) - throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException + throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { final long start = System.nanoTime(); int contentions = 0; @@ -274,6 +274,11 @@ public class StorageProxy implements StorageProxyMBean casWriteMetrics.timeouts.mark(); throw e; } + catch (WriteFailureException|ReadFailureException e) + { + casWriteMetrics.failures.mark(); + throw e; + } catch(UnavailableException e) { casWriteMetrics.unavailables.mark(); @@ -346,7 +351,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForCommit, final boolean isWrite, ClientState state) - throws WriteTimeoutException + throws WriteTimeoutException, WriteFailureException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); @@ -469,7 +474,7 @@ public class StorageProxy implements StorageProxyMBean return false; } - private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException + private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException, WriteFailureException { boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); @@ -478,7 +483,7 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); - AbstractWriteResponseHandler responseHandler = null; + AbstractWriteResponseHandler<Commit> responseHandler = null; if (shouldBlock) { AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); @@ -491,7 +496,7 @@ public class StorageProxy implements StorageProxyMBean if (FailureDetector.instance.isAlive(destination)) { if (shouldBlock) - MessagingService.instance().sendRR(message, destination, responseHandler); + MessagingService.instance().sendRRWithFailure(message, destination, responseHandler); else MessagingService.instance().sendOneWay(message, destination); } @@ -511,13 +516,13 @@ public class StorageProxy implements StorageProxyMBean * @param consistency_level the consistency level for the operation */ public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level) - throws UnavailableException, OverloadedException, WriteTimeoutException + throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException { Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); long startTime = System.nanoTime(); - List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<>(mutations.size()); + List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size()); try { @@ -535,40 +540,32 @@ public class StorageProxy implements StorageProxyMBean } // wait for writes. throws TimeoutException if necessary - for (AbstractWriteResponseHandler responseHandler : responseHandlers) + for (AbstractWriteResponseHandler<IMutation> responseHandler : responseHandlers) { responseHandler.get(); } } - catch (WriteTimeoutException ex) + catch (WriteTimeoutException|WriteFailureException ex) { if (consistency_level == ConsistencyLevel.ANY) { - // hint all the mutations (except counters, which can't be safely retried). This means - // we'll re-hint any successful ones; doesn't seem worth it to track individual success - // just for this unusual case. - for (IMutation mutation : mutations) - { - if (mutation instanceof CounterMutation) - continue; - - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName()); - for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) - { - // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and - // CASSANDRA-6510), so there is no need to hint or retry - if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target)) - submitHint((Mutation) mutation, target, null); - } - } - Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); + hintMutations(mutations); } else { - writeMetrics.timeouts.mark(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + if (ex instanceof WriteFailureException) + { + writeMetrics.failures.mark(); + WriteFailureException fe = (WriteFailureException)ex; + Tracing.trace("Write failure; received {} of {} required replies, failed {} requests", + new Object[] {fe.received, fe.blockFor, fe.failures}); + } + else + { + writeMetrics.timeouts.mark(); + WriteTimeoutException te = (WriteTimeoutException)ex; + Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor); + } throw ex; } } @@ -590,11 +587,39 @@ public class StorageProxy implements StorageProxyMBean } } + /** hint all the mutations (except counters, which can't be safely retried). This means + * we'll re-hint any successful ones; doesn't seem worth it to track individual success + * just for this unusual case. + + * @param mutations the mutations that require hints + */ + private static void hintMutations(Collection<? extends IMutation> mutations) + { + for (IMutation mutation : mutations) + { + if (mutation instanceof CounterMutation) + continue; + + Token tk = StorageService.getPartitioner().getToken(mutation.key()); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName()); + for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) + { + // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and + // CASSANDRA-6510), so there is no need to hint or retry + if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target)) + submitHint((Mutation) mutation, target, null); + } + } + + Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); + } + @SuppressWarnings("unchecked") public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) - throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException + throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException { Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); @@ -658,6 +683,12 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Write timeout; received {} of {} required replies", e.received, e.blockFor); throw e; } + catch (WriteFailureException e) + { + writeMetrics.failures.mark(); + Tracing.trace("Write failure; received {} of {} required replies", e.received, e.blockFor); + throw e; + } finally { writeMetrics.addNano(System.nanoTime() - startTime); @@ -665,9 +696,9 @@ public class StorageProxy implements StorageProxyMBean } private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid) - throws WriteTimeoutException + throws WriteTimeoutException, WriteFailureException { - AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, + AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, Keyspace.open(SystemKeyspace.NAME), @@ -702,7 +733,7 @@ public class StorageProxy implements StorageProxyMBean private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) { - AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, + AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Keyspace.open(SystemKeyspace.NAME), @@ -747,7 +778,7 @@ public class StorageProxy implements StorageProxyMBean * @param callback an optional callback to be run if and when the write is * successful. */ - public static AbstractWriteResponseHandler performWrite(IMutation mutation, + public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation, ConsistencyLevel consistency_level, String localDataCenter, WritePerformer performer, @@ -762,7 +793,7 @@ public class StorageProxy implements StorageProxyMBean List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); @@ -779,17 +810,17 @@ public class StorageProxy implements StorageProxyMBean Token tk = StorageService.getPartitioner().getToken(mutation.key()); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); return new WriteResponseHandlerWrapper(responseHandler, mutation); } // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints. private static class WriteResponseHandlerWrapper { - final AbstractWriteResponseHandler handler; + final AbstractWriteResponseHandler<IMutation> handler; final Mutation mutation; - WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, Mutation mutation) + WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation) { this.handler = handler; this.mutation = mutation; @@ -841,7 +872,7 @@ public class StorageProxy implements StorageProxyMBean */ public static void sendToHintedEndpoints(final Mutation mutation, Iterable<InetAddress> targets, - AbstractWriteResponseHandler responseHandler, + AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter) throws OverloadedException { @@ -933,7 +964,7 @@ public class StorageProxy implements StorageProxyMBean public static Future<Void> submitHint(final Mutation mutation, final InetAddress target, - final AbstractWriteResponseHandler responseHandler) + final AbstractWriteResponseHandler<IMutation> responseHandler) { // local write that time out should be handled by LocalMutationRunnable assert !target.equals(FBUtilities.getBroadcastAddress()) : target; @@ -979,7 +1010,9 @@ public class StorageProxy implements StorageProxyMBean StorageMetrics.totalHints.inc(); } - private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler) + private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, + Collection<InetAddress> targets, + AbstractWriteResponseHandler<IMutation> handler) { Iterator<InetAddress> iter = targets.iterator(); InetAddress target = iter.next(); @@ -1014,7 +1047,7 @@ public class StorageProxy implements StorageProxyMBean } } - private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler) + private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) { StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable() @@ -1024,8 +1057,16 @@ public class StorageProxy implements StorageProxyMBean IMutation processed = SinkManager.processWriteRequest(mutation); if (processed != null) { - ((Mutation) processed).apply(); - responseHandler.response(null); + try + { + ((Mutation) processed).apply(); + responseHandler.response(null); + } + catch (Exception ex) + { + logger.error("Failed to apply mutation locally : {}", ex.getMessage()); + responseHandler.onFailure(FBUtilities.getBroadcastAddress()); + } } } }); @@ -1045,7 +1086,7 @@ public class StorageProxy implements StorageProxyMBean * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. */ - public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException + public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException { InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency()); @@ -1065,7 +1106,7 @@ public class StorageProxy implements StorageProxyMBean rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes(); // Forward the actual update to the chosen leader replica - AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER); + AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER); Tracing.trace("Enqueuing counter update to {}", endpoint); MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false); @@ -1112,7 +1153,7 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) + public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, OverloadedException { return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER); @@ -1120,7 +1161,7 @@ public class StorageProxy implements StorageProxyMBean // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) - public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) + public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException { return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER); @@ -1128,7 +1169,7 @@ public class StorageProxy implements StorageProxyMBean private static Runnable counterWriteTask(final IMutation mutation, final Iterable<InetAddress> targets, - final AbstractWriteResponseHandler responseHandler, + final AbstractWriteResponseHandler<IMutation> responseHandler, final String localDataCenter) { return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION) @@ -1222,6 +1263,10 @@ public class StorageProxy implements StorageProxyMBean { throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(command.ksName)), false); } + catch (WriteFailureException e) + { + throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); + } rows = fetchRows(commands, consistencyForCommitOrFetch); } @@ -2094,7 +2139,7 @@ public class StorageProxy implements StorageProxyMBean { public void apply(IMutation mutation, Iterable<InetAddress> targets, - AbstractWriteResponseHandler responseHandler, + AbstractWriteResponseHandler<IMutation> responseHandler, String localDataCenter, ConsistencyLevel consistencyLevel) throws OverloadedException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index df23b19..1dc03e0 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -34,7 +34,7 @@ import org.apache.cassandra.db.WriteType; /** * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. */ -public class WriteResponseHandler extends AbstractWriteResponseHandler +public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> { protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); @@ -63,7 +63,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler this(endpoint, writeType, null); } - public void response(MessageIn m) + public void response(MessageIn<T> m) { if (responsesUpdater.decrementAndGet(this) == 0) signal(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 1e9564c..d6d901b 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -76,14 +76,23 @@ public class ErrorMessage extends Message.Response case TRUNCATE_ERROR: te = new TruncateException(msg); break; + case WRITE_FAILURE: case READ_FAILURE: { ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); int received = body.readInt(); int blockFor = body.readInt(); int failure = body.readInt(); - byte dataPresent = body.readByte(); - te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0); + if (code == ExceptionCode.WRITE_FAILURE) + { + WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); + te = new WriteFailureException(cl, received, failure, blockFor, writeType); + } + else + { + byte dataPresent = body.readByte(); + te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0); + } } break; case WRITE_TIMEOUT: @@ -152,15 +161,21 @@ public class ErrorMessage extends Message.Response dest.writeInt(ue.required); dest.writeInt(ue.alive); break; + case WRITE_FAILURE: case READ_FAILURE: { RequestFailureException rfe = (RequestFailureException)err; + boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE; CBUtil.writeConsistencyLevel(rfe.consistency, dest); dest.writeInt(rfe.received); dest.writeInt(rfe.blockFor); dest.writeInt(rfe.failures); - dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0)); + + if (isWrite) + CBUtil.writeString(((WriteFailureException)rfe).writeType.toString(), dest); + else + dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0)); } break; case WRITE_TIMEOUT: @@ -204,10 +219,13 @@ public class ErrorMessage extends Message.Response UnavailableException ue = (UnavailableException)err; size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8; break; + case WRITE_FAILURE: case READ_FAILURE: { - ReadFailureException rfe = (ReadFailureException)err; - size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4 + 1; + RequestFailureException rfe = (RequestFailureException)err; + boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE; + size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4; + size += isWrite ? CBUtil.sizeOfString(((WriteFailureException)rfe).writeType.toString()) : 1; } break; case WRITE_TIMEOUT: @@ -246,6 +264,9 @@ public class ErrorMessage extends Message.Response case READ_FAILURE: ReadFailureException rfe = (ReadFailureException) msg.error; return new ReadTimeoutException(rfe.consistency, rfe.received, rfe.blockFor, rfe.dataPresent); + case WRITE_FAILURE: + WriteFailureException wfe = (WriteFailureException) msg.error; + return new WriteTimeoutException(wfe.writeType, wfe.consistency, wfe.received, wfe.blockFor); case FUNCTION_FAILURE: return new InvalidRequestException(msg.toString()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/test/unit/org/apache/cassandra/db/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java index a720608..f8e757a 100644 --- a/test/unit/org/apache/cassandra/db/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java @@ -311,8 +311,8 @@ public class SerializationsTest extends AbstractSerializationsTester assert MessageIn.read(in, getVersion(), -1) != null; // set up some fake callbacks so deserialization knows that what it's deserializing is a TruncateResponse - MessagingService.instance().setCallbackForTests(1, new CallbackInfo(null, null, TruncateResponse.serializer)); - MessagingService.instance().setCallbackForTests(2, new CallbackInfo(null, null, TruncateResponse.serializer)); + MessagingService.instance().setCallbackForTests(1, new CallbackInfo(null, null, TruncateResponse.serializer, false)); + MessagingService.instance().setCallbackForTests(2, new CallbackInfo(null, null, TruncateResponse.serializer, false)); assert MessageIn.read(in, getVersion(), 1) != null; assert MessageIn.read(in, getVersion(), 2) != null;
