Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 652ec6a5c -> 44cf4a66d
Fix count(*) queries in a mixed cluster patch by Tyler Hobbs; reviewed by Piotr KoÅaczkowski for CASSANDRA-6707 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44cf4a66 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44cf4a66 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44cf4a66 Branch: refs/heads/cassandra-2.0 Commit: 44cf4a66d157643297b7ab791a57f323432e28c5 Parents: 652ec6a Author: Aleksey Yeschenko <alek...@apache.org> Authored: Mon Feb 17 16:39:29 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Feb 17 16:39:29 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 3 ++- .../cql3/statements/SelectStatement.java | 4 ++- .../apache/cassandra/net/MessagingService.java | 26 +++++++++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fd3b1b7..c9fabd2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,7 +12,8 @@ * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652) * Make commitlog failure handling configurable (CASSANDRA-6364) * Avoid overlaps in LCS (CASSANDRA-6688) - * improve support for paginating over composites (4851) + * Improve support for paginating over composites (CASSANDRA-4851) + * Fix count(*) queries in a mixed cluster (CASSANDRA-6707) Merged from 1.2: * Fix broken streams when replacing with same IP (CASSANDRA-6622) * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645) http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index d42fd76..52a7c70 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; @@ -165,7 +166,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache int pageSize = options.getPageSize(); // A count query will never be paged for the user, but we always page it internally to avoid OOM. // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default - if (parameters.isCount && pageSize <= 0) + // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). + if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20) pageSize = DEFAULT_COUNT_PAGE_SIZE; if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 232cf6a..ad86bbd 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -73,6 +73,8 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_20 = 7; public static final int current_version = VERSION_20; + public boolean allNodesAtLeast20 = true; + /** * we preface every message with this number so the recipient can validate the sender is sane */ @@ -742,14 +744,36 @@ public final class MessagingService implements MessagingServiceMBean public int setVersion(InetAddress endpoint, int version) { logger.debug("Setting version {} for {}", version, endpoint); + if (version < VERSION_20) + allNodesAtLeast20 = false; Integer v = versions.put(endpoint, version); + + // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now + if (v != null && v < VERSION_20 && version >= VERSION_20) + refreshAllNodesAtLeast20(); + return v == null ? version : v; } public void resetVersion(InetAddress endpoint) { logger.debug("Reseting version for {}", endpoint); - versions.remove(endpoint); + Integer removed = versions.remove(endpoint); + if (removed != null && removed <= VERSION_20) + refreshAllNodesAtLeast20(); + } + + private void refreshAllNodesAtLeast20() + { + for (Integer version: versions.values()) + { + if (version < VERSION_20) + { + allNodesAtLeast20 = false; + return; + } + } + allNodesAtLeast20 = true; } public int getVersion(InetAddress endpoint)