Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d365faab Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d365faab Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d365faab Branch: refs/heads/trunk Commit: d365faab07f14d436bf827b3cf6dae5d25e4c9c1 Parents: 1a5ebd6 1727ea7 Author: Sylvain Lebresne <[email protected]> Authored: Wed Dec 18 13:51:48 2013 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Dec 18 13:51:48 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/AbstractWriteResponseHandler.java | 11 ++++++++++- src/java/org/apache/cassandra/service/ReadCallback.java | 4 ++++ src/java/org/apache/cassandra/service/StorageProxy.java | 4 ++-- 4 files changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d365faab/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d365faab/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 6f362db,2da9d38..e4741f6 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1664,33 -1577,17 +1664,33 @@@ public class StorageProxy implements St Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); else logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor, blockFor, true); + throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); } - catch (DigestMismatchException e) + + if (haveSufficientRows) + return trim(command, rows); + + // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor + // based on the results we've seen so far (as long as we still have ranges left to query) + if (i < ranges.size()) { - throw new AssertionError(e); // no digests in range slices yet + float fetchedRows = command.countCQL3Rows() ? cql3RowCount : rows.size(); + float remainingRows = command.limit() - fetchedRows; + float actualRowsPerRange; + if (fetchedRows == 0.0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + actualRowsPerRange = 0.0f; + concurrencyFactor = ranges.size() - i; + } + else + { + actualRowsPerRange = i / fetchedRows; + concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); + } + logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + actualRowsPerRange, (int) remainingRows, concurrencyFactor); } - - // if we're done, great, otherwise, move to the next range - int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size(); - if (count >= nodeCmd.limit()) - break; } } finally
