Repository: cassandra Updated Branches: refs/heads/trunk 1657b4fbf -> c6525da86
Add ReadFailureException, better TombstoneOE logging Patch by Christian Spriegel; reviewed by Tyler Hobbs for CASSANDRA-7886 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6525da8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6525da8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6525da8 Branch: refs/heads/trunk Commit: c6525da86eb1ac668206553336056f90e7bfcdaa Parents: 1657b4f Author: Christian Spriegel <[email protected]> Authored: Fri Jan 9 13:30:22 2015 -0600 Committer: Tyler Hobbs <[email protected]> Committed: Fri Jan 9 13:30:22 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 3 + doc/native_protocol_v4.spec | 17 +++- .../apache/cassandra/db/ReadVerbHandler.java | 17 +--- .../apache/cassandra/db/RowIteratorFactory.java | 21 +++-- .../cassandra/db/filter/ExtendedFilter.java | 10 +- .../cassandra/db/filter/SliceQueryFilter.java | 46 +++++---- .../filter/TombstoneOverwhelmingException.java | 42 +++++++++ .../cassandra/exceptions/ExceptionCode.java | 1 + .../exceptions/ReadFailureException.java | 31 +++++++ .../exceptions/RequestFailureException.java | 37 ++++++++ .../cassandra/metrics/ClientRequestMetrics.java | 4 + .../cassandra/net/MessageDeliveryTask.java | 6 +- .../cassandra/service/AbstractReadExecutor.java | 9 +- .../service/RangeSliceVerbHandler.java | 24 ++--- .../apache/cassandra/service/ReadCallback.java | 37 +++++++- .../apache/cassandra/service/StorageProxy.java | 98 ++++++++++++++------ .../cassandra/thrift/CassandraServer.java | 24 ++--- .../cassandra/thrift/ThriftConversion.java | 10 +- .../org/apache/cassandra/transport/Server.java | 1 + .../transport/messages/ErrorMessage.java | 75 +++++++++++---- 20 files changed, 374 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c91632..fc9ec7f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,7 @@ 3.0 + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) * Allow mixing token and partition key restrictions (CASSANDRA-7016) * Support index key/value entries on map collections (CASSANDRA-8473) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/doc/native_protocol_v4.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec index 3764e91..0806944 100644 --- a/doc/native_protocol_v4.spec +++ b/doc/native_protocol_v4.spec @@ -880,7 +880,21 @@ Table of Contents <data_present> is a single byte. If its value is 0, it means the replica that was asked for data has not responded. Otherwise, the value is != 0. - + 0x1300 Read_failure: A non-timeout exception during a read request. The rest + of the ERROR message body will be + <cl><received><blockfor><numfailures><data_present> + where: + <cl> is the [consistency] level of the query having triggered + the exception. + <received> is an [int] representing the number of nodes having + answered the request. + <blockfor> is the number of replicas whose response is + required to achieve <cl>. + <numfailures> is an [int] representing the number of nodes that + experience a failure while executing the request. + <data_present> is a single byte. If its value is 0, it means + the replica that was asked for data had not + responded. Otherwise, the value is != 0. 0x2000 Syntax_error: The submitted query has a syntax error. 0x2100 Unauthorized: The logged user doesn't have the right to perform the query. @@ -905,4 +919,5 @@ Table of Contents * The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5)) has been modified, and now includes changes related to user defined functions and user defined aggregates. + * Read_failure error code was added. http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java index 35082e6..8c167ed 100644 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java @@ -17,10 +17,6 @@ */ package org.apache.cassandra.db; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -30,8 +26,6 @@ import org.apache.cassandra.tracing.Tracing; public class ReadVerbHandler implements IVerbHandler<ReadCommand> { - private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class ); - public void doVerb(MessageIn<ReadCommand> message, int id) { if (StorageService.instance.isBootstrapMode()) @@ -41,16 +35,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand> ReadCommand command = message.payload; Keyspace keyspace = Keyspace.open(command.ksName); - Row row; - try - { - row = command.getRow(keyspace); - } - catch (TombstoneOverwhelmingException e) - { - // error already logged. Drop the request - return; - } + Row row = command.getRow(keyspace); MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE, getResponse(command, row), http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/RowIteratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java index 6ac74ae..ef514ea 100644 --- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java +++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.MergeIterator; @@ -93,15 +94,23 @@ public class RowIteratorFactory ColumnFamily cached = cfs.getRawCachedRow(key); IDiskAtomFilter filter = range.columnFilter(key.getKey()); - if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now)) + try { - // not cached: collate - QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now); + if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now)) + { + // not cached: collate + QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now); + } + else + { + QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now); + returnCF = cfs.filterColumnFamily(cached, keyFilter); + } } - else + catch(TombstoneOverwhelmingException e) { - QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now); - returnCF = cfs.filterColumnFamily(cached, keyFilter); + e.setKey(key); + throw e; } Row rv = new Row(key, returnCF); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index da9a1d7..fc2ff93 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -304,7 +304,15 @@ public abstract class ExtendedFilter ColumnFamily pruned = data.cloneMeShallow(); IDiskAtomFilter filter = dataRange.columnFilter(rowKey.getKey()); Iterator<Cell> iter = filter.getColumnIterator(data); - filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp); + try + { + filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp); + } + catch (TombstoneOverwhelmingException e) + { + e.setKey(rowKey); + throw e; + } return pruned; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 6e8fde6..453191e 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -38,6 +38,7 @@ import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; public class SliceQueryFilter implements IDiskAtomFilter @@ -213,34 +214,41 @@ public class SliceQueryFilter implements IDiskAtomFilter if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneFailureThreshold()) { - Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold)", DatabaseDescriptor.getTombstoneFailureThreshold()); - logger.error("Scanned over {} tombstones in {}.{}; query aborted (see tombstone_failure_threshold)", - DatabaseDescriptor.getTombstoneFailureThreshold(), container.metadata().ksName, container.metadata().cfName); - throw new TombstoneOverwhelmingException(); + Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}", + DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container)); + throw new TombstoneOverwhelmingException(columnCounter.ignored(), count, container.metadata().ksName, container.metadata().cfName, + container.getComparator().getString(cell.name()), getSlicesInfo(container), container.deletionInfo().toString()); } container.maybeAppendColumn(cell, tester, gcBefore); } - Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored()); - if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold()) + boolean warnTombstones = respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold(); + if (warnTombstones) { - StringBuilder sb = new StringBuilder(); - CellNameType type = container.metadata().comparator; - for (ColumnSlice sl : slices) - { - assert sl != null; + logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns were requested, slices={}, delInfo={}", + columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count, + getSlicesInfo(container), container.deletionInfo()); + } + Tracing.trace("Read {} live and {} tombstoned cells{}", + new Object[]{ columnCounter.live(), columnCounter.ignored(), (warnTombstones ? " (see tombstone_warn_threshold)" : "") }); + } - sb.append('['); - sb.append(type.getString(sl.start)); - sb.append('-'); - sb.append(type.getString(sl.finish)); - sb.append(']'); - } + private String getSlicesInfo(ColumnFamily container) + { + StringBuilder sb = new StringBuilder(); + CellNameType type = container.metadata().comparator; + for (ColumnSlice sl : slices) + { + assert sl != null; - logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns was requested, slices={}, delInfo={}", - columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count, sb, container.deletionInfo()); + sb.append('['); + sb.append(type.getString(sl.start)); + sb.append('-'); + sb.append(type.getString(sl.finish)); + sb.append(']'); } + return sb.toString(); } protected boolean respectTombstoneThresholds() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java index 6a6b0f6..04d440d 100644 --- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java +++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java @@ -18,6 +18,48 @@ */ package org.apache.cassandra.db.filter; +import org.apache.cassandra.db.DecoratedKey; + + public class TombstoneOverwhelmingException extends RuntimeException { + private final int numTombstones; + private final int numRequested; + private final String ksName; + private final String cfName; + private final String lastCellName; + private final String slicesInfo; + private final String deletionInfo; + private String partitionKey = null; + + public TombstoneOverwhelmingException(int numTombstones, int numRequested, String ksName, String cfName, + String lastCellName, String slicesInfo, String deletionInfo) + { + this.numTombstones = numTombstones; + this.numRequested = numRequested; + this.ksName = ksName; + this.cfName = cfName; + this.lastCellName = lastCellName; + this.slicesInfo = slicesInfo; + this.deletionInfo = deletionInfo; + } + + public void setKey(DecoratedKey key) + { + if(key != null) + this.partitionKey = key.toString(); + } + + public String getLocalizedMessage() + { + return getMessage(); + } + + public String getMessage() + { + return String.format( + "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " + + "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; delInfo=%s; slices=%s", + numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, deletionInfo, slicesInfo); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ExceptionCode.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index ce082a7..7fcb2d2 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -39,6 +39,7 @@ public enum ExceptionCode TRUNCATE_ERROR (0x1003), WRITE_TIMEOUT (0x1100), READ_TIMEOUT (0x1200), + READ_FAILURE (0x1300), // 2xx: problem validating the request SYNTAX_ERROR (0x2000), http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ReadFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java new file mode 100644 index 0000000..91cf580 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.db.ConsistencyLevel; + +public class ReadFailureException extends RequestFailureException +{ + public final boolean dataPresent; + + public ReadFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, boolean dataPresent) + { + super(ExceptionCode.READ_FAILURE, consistency, received, failures, blockFor); + this.dataPresent = dataPresent; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/RequestFailureException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java new file mode 100644 index 0000000..1ff44d9 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.db.ConsistencyLevel; + +public class RequestFailureException extends RequestExecutionException +{ + public final ConsistencyLevel consistency; + public final int received; + public final int failures; + public final int blockFor; + + protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor) + { + super(code, String.format("Operation failed - received %d responses and %d failures.", received, failures)); + this.consistency = consistency; + this.received = received; + this.failures = failures; + this.blockFor = blockFor; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java index 1ac3482..68a2d21 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java @@ -32,9 +32,11 @@ public class ClientRequestMetrics extends LatencyMetrics @Deprecated public static final Counter writeTimeouts = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteTimeouts", null)); @Deprecated public static final Counter readUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadUnavailables", null)); @Deprecated public static final Counter writeUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteUnavailables", null)); + @Deprecated public static final Counter readFailures = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadFailures", null)); public final Meter timeouts; public final Meter unavailables; + public final Meter failures; public ClientRequestMetrics(String scope) { @@ -42,6 +44,7 @@ public class ClientRequestMetrics extends LatencyMetrics timeouts = Metrics.newMeter(factory.createMetricName("Timeouts"), "timeouts", TimeUnit.SECONDS); unavailables = Metrics.newMeter(factory.createMetricName("Unavailables"), "unavailables", TimeUnit.SECONDS); + failures = Metrics.newMeter(factory.createMetricName("Failures"), "failures", TimeUnit.SECONDS); } public void release() @@ -49,5 +52,6 @@ public class ClientRequestMetrics extends LatencyMetrics super.release(); Metrics.defaultRegistry().removeMetric(factory.createMetricName("Timeouts")); Metrics.defaultRegistry().removeMetric(factory.createMetricName("Unavailables")); + Metrics.defaultRegistry().removeMetric(factory.createMetricName("Failures")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 982f17e..da12d7a 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.net; import java.util.EnumSet; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,10 @@ public class MessageDeliveryTask implements Runnable MessagingService.instance().sendReply(response, id, message.from); } - throw t; + if (t instanceof TombstoneOverwhelmingException) + logger.error(t.getMessage()); + else + throw t; } if (GOSSIP_VERBS.contains(message.verb)) Gossiper.instance.setLastProcessedMessageAt(constructionTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 061a01b..d76a2cc 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.metrics.ReadRepairMetrics; @@ -87,7 +88,7 @@ public abstract class AbstractReadExecutor else { logger.trace("reading data from {}", endpoint); - MessagingService.instance().sendRR(command.createMessage(), endpoint, handler); + MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, handler); } } if (readLocal) @@ -112,7 +113,7 @@ public abstract class AbstractReadExecutor else { logger.trace("reading digest from {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, handler); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } } } @@ -139,7 +140,7 @@ public abstract class AbstractReadExecutor * wait for an answer. Blocks until success or timeout, so it is caller's * responsibility to call maybeTryAdditionalReplicas first. */ - public Row get() throws ReadTimeoutException, DigestMismatchException + public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException { return handler.get(); } @@ -280,7 +281,7 @@ public abstract class AbstractReadExecutor InetAddress extraReplica = Iterables.getLast(targetReplicas); logger.trace("speculating read retry on {}", extraReplica); - MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); speculated = true; cfs.metric.speculativeRetries.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index f1fd1f9..0f3726c 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -19,7 +19,6 @@ package org.apache.cassandra.service; import org.apache.cassandra.db.AbstractRangeCommand; import org.apache.cassandra.db.RangeSliceReply; -import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -29,24 +28,13 @@ public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand> { public void doVerb(MessageIn<AbstractRangeCommand> message, int id) { - try + if (StorageService.instance.isBootstrapMode()) { - if (StorageService.instance.isBootstrapMode()) - { - /* Don't service reads! */ - throw new RuntimeException("Cannot service reads while bootstrapping!"); - } - RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally()); - Tracing.trace("Enqueuing response to {}", message.from); - MessagingService.instance().sendReply(reply.createMessage(), id, message.from); - } - catch (TombstoneOverwhelmingException e) - { - // error already logged. Drop the request - } - catch (Exception ex) - { - throw new RuntimeException(ex); + /* Don't service reads! */ + throw new RuntimeException("Cannot service reads while bootstrapping!"); } + RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally()); + Tracing.trace("Enqueuing response to {}", message.from); + MessagingService.instance().sendReply(reply.createMessage(), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 51e1818..9371568 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -32,10 +32,13 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -43,7 +46,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; -public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage> +public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFailure<TMessage> { protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); @@ -57,6 +60,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received"); private volatile int received = 0; + private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater + = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures"); + private volatile int failures = 0; + private final Keyspace keyspace; // TODO push this into ConsistencyLevel? /** @@ -95,7 +102,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag } } - public TResolved get() throws ReadTimeoutException, DigestMismatchException + public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException { if (!await(command.getTimeout(), TimeUnit.MILLISECONDS)) { @@ -107,13 +114,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag throw ex; } + if (blockfor + failures > endpoints.size()) + { + ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()); + + if (logger.isDebugEnabled()) + logger.debug("Read failure: {}", ex.toString()); + throw ex; + } + return blockfor == 1 ? resolver.getData() : resolver.resolve(); } public void response(MessageIn<TMessage> message) { resolver.preprocess(message); - int n = waitingFor(message) + int n = waitingFor(message.from) ? recievedUpdater.incrementAndGet(this) : received; if (n >= blockfor && resolver.isDataPresent()) @@ -129,10 +145,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag /** * @return true if the message counts towards the blockfor threshold */ - private boolean waitingFor(MessageIn message) + private boolean waitingFor(InetAddress from) { return consistencyLevel.isDatacenterLocal() - ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(message.from)) + ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)) : true; } @@ -194,4 +210,15 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag } } } + + @Override + public void onFailure(InetAddress from) + { + int n = waitingFor(from) + ? failuresUpdater.incrementAndGet(this) + : failures; + + if (blockfor + n > endpoints.size()) + condition.signalAll(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 378b3f0..f00db76 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -42,7 +42,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.marshal.UUIDType; @@ -203,7 +203,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, ClientState state) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException + throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { final long start = System.nanoTime(); int contentions = 0; @@ -1164,7 +1164,7 @@ public class StorageProxy implements StorageProxyMBean } public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException + throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { // When using serial CL, the ClientState should be provided assert !consistencyLevel.isSerialConsistency(); @@ -1176,7 +1176,7 @@ public class StorageProxy implements StorageProxyMBean * a specific set of column names from a given column family. */ public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) - throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException + throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands)) { @@ -1191,7 +1191,7 @@ public class StorageProxy implements StorageProxyMBean } private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) - throws InvalidRequestException, UnavailableException, ReadTimeoutException + throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { assert state != null; @@ -1241,6 +1241,13 @@ public class StorageProxy implements StorageProxyMBean casReadMetrics.timeouts.mark(); throw e; } + catch (ReadFailureException e) + { + readMetrics.failures.mark(); + ClientRequestMetrics.readFailures.inc(); + casReadMetrics.failures.mark(); + throw e; + } finally { long latency = System.nanoTime() - start; @@ -1255,7 +1262,7 @@ public class StorageProxy implements StorageProxyMBean } private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) - throws UnavailableException, ReadTimeoutException + throws UnavailableException, ReadFailureException, ReadTimeoutException { long start = System.nanoTime(); List<Row> rows = null; @@ -1276,6 +1283,12 @@ public class StorageProxy implements StorageProxyMBean ClientRequestMetrics.readTimeouts.inc(); throw e; } + catch (ReadFailureException e) + { + readMetrics.failures.mark(); + ClientRequestMetrics.readFailures.inc(); + throw e; + } finally { long latency = System.nanoTime() - start; @@ -1300,7 +1313,7 @@ public class StorageProxy implements StorageProxyMBean * 5. else carry out read repair by getting data from all the nodes. */ private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel) - throws UnavailableException, ReadTimeoutException + throws UnavailableException, ReadFailureException, ReadTimeoutException { List<Row> rows = new ArrayList<>(initialCommands.size()); // (avoid allocating a new list in the common case of nothing-to-retry) @@ -1345,7 +1358,7 @@ public class StorageProxy implements StorageProxyMBean if (logger.isDebugEnabled()) logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start)); } - catch (ReadTimeoutException ex) + catch (ReadTimeoutException|ReadFailureException ex) { int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace())); int responseCount = exec.handler.getReceivedCount(); @@ -1353,14 +1366,15 @@ public class StorageProxy implements StorageProxyMBean ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)" : ""; + boolean isTimeout = ex instanceof ReadTimeoutException; if (Tracing.isTracing()) { - Tracing.trace("Timed out; received {} of {} responses{}", - new Object[]{ responseCount, blockFor, gotData }); + Tracing.trace("{}; received {} of {} responses{}", + new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData }); } else if (logger.isDebugEnabled()) { - logger.debug("Read timeout; received {} of {} responses{}", responseCount, blockFor, gotData); + logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData); } throw ex; } @@ -1391,7 +1405,7 @@ public class StorageProxy implements StorageProxyMBean for (InetAddress endpoint : exec.getContactedReplicas()) { Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, repairHandler); + MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); } } } @@ -1482,11 +1496,22 @@ public class StorageProxy implements StorageProxyMBean protected void runMayThrow() { - Keyspace keyspace = Keyspace.open(command.ksName); - Row r = command.getRow(keyspace); - ReadResponse result = ReadVerbHandler.getResponse(command, r); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); + try + { + Keyspace keyspace = Keyspace.open(command.ksName); + Row r = command.getRow(keyspace); + ReadResponse result = ReadVerbHandler.getResponse(command, r); + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + handler.response(result); + } + catch (Throwable t) + { + handler.onFailure(FBUtilities.getBroadcastAddress()); + if (t instanceof TombstoneOverwhelmingException) + logger.error(t.getMessage()); + else + throw t; + } } } @@ -1505,9 +1530,20 @@ public class StorageProxy implements StorageProxyMBean protected void runMayThrow() { - RangeSliceReply result = new RangeSliceReply(command.executeLocally()); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); + try + { + RangeSliceReply result = new RangeSliceReply(command.executeLocally()); + MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + handler.response(result); + } + catch (Throwable t) + { + handler.onFailure(FBUtilities.getBroadcastAddress()); + if (t instanceof TombstoneOverwhelmingException) + logger.error(t.getMessage()); + else + throw t; + } } } @@ -1591,7 +1627,7 @@ public class StorageProxy implements StorageProxyMBean } public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadTimeoutException + throws UnavailableException, ReadFailureException, ReadTimeoutException { Tracing.trace("Computing ranges to query"); long startTime = System.nanoTime(); @@ -1705,7 +1741,7 @@ public class StorageProxy implements StorageProxyMBean for (InetAddress endpoint : filteredEndpoints) { Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRR(message, endpoint, handler); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } } scanHandlers.add(Pair.create(nodeCmd, handler)); @@ -1729,24 +1765,25 @@ public class StorageProxy implements StorageProxyMBean } repairResponses.addAll(resolver.repairResults); } - catch (ReadTimeoutException ex) + catch (ReadTimeoutException|ReadFailureException ex) { - // we timed out waiting for responses + // we timed out or failed waiting for responses int blockFor = consistency_level.blockFor(keyspace); int responseCount = resolver.responses.size(); String gotData = responseCount > 0 ? resolver.isDataPresent() ? " (including data)" : " (only digests)" : ""; + boolean isTimeout = ex instanceof ReadTimeoutException; if (Tracing.isTracing()) { - Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}", - new Object[]{ responseCount, blockFor, gotData, i, ranges.size() }); + Tracing.trace("{}; received {} of {} responses{} for range {} of {}", + new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size() }); } else if (logger.isDebugEnabled()) { - logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}", - responseCount, blockFor, gotData, i, ranges.size()); + logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", + (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); } throw ex; } @@ -2135,7 +2172,7 @@ public class StorageProxy implements StorageProxyMBean { return !Gossiper.instance.getUnreachableTokenOwners().isEmpty(); } - + public interface WritePerformer { public void apply(IMutation mutation, @@ -2169,7 +2206,8 @@ public class StorageProxy implements StorageProxyMBean try { runMayThrow(); - } catch (Exception e) + } + catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index dd461f3..7d89049 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1202,13 +1202,9 @@ public class CassandraServer implements Cassandra.Iface { throw ThriftConversion.toThrift(e); } - catch (ReadTimeoutException e) - { - throw ThriftConversion.toThrift(e); - } - catch (org.apache.cassandra.exceptions.UnavailableException e) + catch (RequestExecutionException e) { - throw ThriftConversion.toThrift(e); + throw ThriftConversion.rethrow(e); } finally { @@ -1288,13 +1284,9 @@ public class CassandraServer implements Cassandra.Iface { throw ThriftConversion.toThrift(e); } - catch (ReadTimeoutException e) - { - throw ThriftConversion.toThrift(e); - } - catch (org.apache.cassandra.exceptions.UnavailableException e) + catch (RequestExecutionException e) { - throw ThriftConversion.toThrift(e); + throw ThriftConversion.rethrow(e); } finally { @@ -1364,13 +1356,9 @@ public class CassandraServer implements Cassandra.Iface { throw ThriftConversion.toThrift(e); } - catch (ReadTimeoutException e) - { - throw ThriftConversion.toThrift(e); - } - catch (org.apache.cassandra.exceptions.UnavailableException e) + catch (RequestExecutionException e) { - throw ThriftConversion.toThrift(e); + throw ThriftConversion.rethrow(e); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/ThriftConversion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java index d408767..066ddb8 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java +++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java @@ -90,7 +90,9 @@ public class ThriftConversion // for methods that have a return value. public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException { - if (e instanceof RequestTimeoutException) + if (e instanceof RequestFailureException) + throw toThrift((RequestFailureException)e); + else if (e instanceof RequestTimeoutException) throw toThrift((RequestTimeoutException)e); else throw new UnavailableException(); @@ -128,6 +130,12 @@ public class ThriftConversion return toe; } + // Thrift does not support RequestFailureExceptions, so we translate them into timeouts + public static TimedOutException toThrift(RequestFailureException e) + { + return new TimedOutException(); + } + public static List<org.apache.cassandra.db.IndexExpression> indexExpressionsFromThrift(List<IndexExpression> exprs) { if (exprs == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index d1fc744..99601a6 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -68,6 +68,7 @@ public class Server implements CassandraDaemon.Server public static final int VERSION_2 = 2; public static final int VERSION_3 = 3; + public static final int VERSION_4 = 4; public static final int CURRENT_VERSION = VERSION_3; private final ConnectionTracker connectionTracker = new ConnectionTracker(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 7e4a3a9..3097c5b 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.transport.CBUtil; -import org.apache.cassandra.transport.Message; -import org.apache.cassandra.transport.ProtocolException; -import org.apache.cassandra.transport.ServerError; +import org.apache.cassandra.transport.*; import org.apache.cassandra.utils.MD5Digest; /** @@ -75,6 +72,16 @@ public class ErrorMessage extends Message.Response case TRUNCATE_ERROR: te = new TruncateException(msg); break; + case READ_FAILURE: + { + ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); + int received = body.readInt(); + int blockFor = body.readInt(); + int failure = body.readInt(); + byte dataPresent = body.readByte(); + te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0); + } + break; case WRITE_TIMEOUT: case READ_TIMEOUT: ConsistencyLevel cl = CBUtil.readConsistencyLevel(body); @@ -123,21 +130,33 @@ public class ErrorMessage extends Message.Response public void encode(ErrorMessage msg, ByteBuf dest, int version) { - dest.writeInt(msg.error.code().value); - CBUtil.writeString(msg.error.getMessage(), dest); + final TransportException err = getBackwardsCompatibleException(msg, version); + dest.writeInt(err.code().value); + CBUtil.writeString(err.getMessage(), dest); - switch (msg.error.code()) + switch (err.code()) { case UNAVAILABLE: - UnavailableException ue = (UnavailableException)msg.error; + UnavailableException ue = (UnavailableException)err; CBUtil.writeConsistencyLevel(ue.consistency, dest); dest.writeInt(ue.required); dest.writeInt(ue.alive); break; + case READ_FAILURE: + { + RequestFailureException rfe = (RequestFailureException)err; + + CBUtil.writeConsistencyLevel(rfe.consistency, dest); + dest.writeInt(rfe.received); + dest.writeInt(rfe.blockFor); + dest.writeInt(rfe.failures); + dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0)); + } + break; case WRITE_TIMEOUT: case READ_TIMEOUT: - RequestTimeoutException rte = (RequestTimeoutException)msg.error; - boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT; + RequestTimeoutException rte = (RequestTimeoutException)err; + boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT; CBUtil.writeConsistencyLevel(rte.consistency, dest); dest.writeInt(rte.received); @@ -148,11 +167,11 @@ public class ErrorMessage extends Message.Response dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0)); break; case UNPREPARED: - PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error; + PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err; CBUtil.writeBytes(pqnfe.id.bytes, dest); break; case ALREADY_EXISTS: - AlreadyExistsException aee = (AlreadyExistsException)msg.error; + AlreadyExistsException aee = (AlreadyExistsException)err; CBUtil.writeString(aee.ksName, dest); CBUtil.writeString(aee.cfName, dest); break; @@ -161,26 +180,33 @@ public class ErrorMessage extends Message.Response public int encodedSize(ErrorMessage msg, int version) { - int size = 4 + CBUtil.sizeOfString(msg.error.getMessage()); - switch (msg.error.code()) + final TransportException err = getBackwardsCompatibleException(msg, version); + int size = 4 + CBUtil.sizeOfString(err.getMessage()); + switch (err.code()) { case UNAVAILABLE: - UnavailableException ue = (UnavailableException)msg.error; + UnavailableException ue = (UnavailableException)err; size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8; break; + case READ_FAILURE: + { + ReadFailureException rfe = (ReadFailureException)err; + size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4 + 1; + } + break; case WRITE_TIMEOUT: case READ_TIMEOUT: - RequestTimeoutException rte = (RequestTimeoutException)msg.error; - boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT; + RequestTimeoutException rte = (RequestTimeoutException)err; + boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT; size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8; size += isWrite ? CBUtil.sizeOfString(((WriteTimeoutException)rte).writeType.toString()) : 1; break; case UNPREPARED: - PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error; + PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err; size += CBUtil.sizeOfBytes(pqnfe.id.bytes); break; case ALREADY_EXISTS: - AlreadyExistsException aee = (AlreadyExistsException)msg.error; + AlreadyExistsException aee = (AlreadyExistsException)err; size += CBUtil.sizeOfString(aee.ksName); size += CBUtil.sizeOfString(aee.cfName); break; @@ -189,6 +215,17 @@ public class ErrorMessage extends Message.Response } }; + private static TransportException getBackwardsCompatibleException(ErrorMessage msg, int version) + { + if (msg.error.code() == ExceptionCode.READ_FAILURE && version < Server.VERSION_4) + { + ReadFailureException rfe = (ReadFailureException) msg.error; + return new ReadTimeoutException(rfe.consistency, rfe.received, rfe.blockFor, rfe.dataPresent); + } + + return msg.error; + } + // We need to figure error codes out (#3979) public final TransportException error;
