Fix broken paging state with prepared statement patch by slebresne; reviewed by thobbs for CASSANDRA-7120
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35801be6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35801be6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35801be6 Branch: refs/heads/trunk Commit: 35801be6f4c85328fe8e73054eed5f91f3df115f Parents: e5ab470 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri May 23 09:33:25 2014 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri May 23 18:02:36 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/ResultSet.java | 38 +++++++++++--------- .../cql3/statements/SelectStatement.java | 5 ++- .../cassandra/service/pager/PagingState.java | 6 ++++ .../transport/messages/ResultMessage.java | 11 +++++- 5 files changed, 40 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6c05bf5..06b042e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,7 @@ * Fix nodetool netstats (CASSANDRA-7270) * Fix potential ClassCastException in HintedHandoffManager (CASSANDRA-7284) * Use prepared statements internally (CASSANDRA-6975) + * Fix broken paging state with prepared statement (CASSANDRA-7120) Merged from 2.0: * Always reallocate buffers in HSHA (CASSANDRA-6285) * (Hadoop) support authentication in CqlRecordReader (CASSANDRA-7221) http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/cql3/ResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java index eea0475..25635fa 100644 --- a/src/java/org/apache/cassandra/cql3/ResultSet.java +++ b/src/java/org/apache/cassandra/cql3/ResultSet.java @@ -92,6 +92,16 @@ public class ResultSet } } + public ResultSet withPagingState(PagingState state) + { + if (state == null) + return this; + + // The metadata is shared by all execution of a given statement. So if there is a paging state + // we need to copy the metadata + return new ResultSet(metadata.withPagingState(state), rows); + } + public ResultSet makeCountResult(ColumnIdentifier alias) { assert metadata.names != null; @@ -238,7 +248,7 @@ public class ResultSet { public static final CBCodec<Metadata> codec = new Codec(); - public static final Metadata EMPTY = new Metadata(EnumSet.of(Flag.NO_METADATA), 0); + public static final Metadata EMPTY = new Metadata(EnumSet.of(Flag.NO_METADATA), null, 0, null); public final EnumSet<Flag> flags; // Please note that columnCount can actually be smaller than names, even if names is not null. This is @@ -247,27 +257,21 @@ public class ResultSet // (CASSANDRA-4911). So the serialization code will exclude any columns in name whose index is >= columnCount. public final List<ColumnSpecification> names; public final int columnCount; - public PagingState pagingState; + public final PagingState pagingState; public Metadata(List<ColumnSpecification> names) { - this(EnumSet.noneOf(Flag.class), names); + this(EnumSet.noneOf(Flag.class), names, names.size(), null); if (!names.isEmpty() && allInSameCF()) flags.add(Flag.GLOBAL_TABLES_SPEC); } - private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names) + private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names, int columnCount, PagingState pagingState) { this.flags = flags; this.names = names; - this.columnCount = names.size(); - } - - private Metadata(EnumSet<Flag> flags, int columnCount) - { - this.flags = flags; - this.names = null; this.columnCount = columnCount; + this.pagingState = pagingState; } // The maximum number of values that the ResultSet can hold. This can be bigger than columnCount due to CASSANDRA-4911 @@ -301,14 +305,14 @@ public class ResultSet return true; } - public Metadata setHasMorePages(PagingState pagingState) + public Metadata withPagingState(PagingState pagingState) { if (pagingState == null) return this; - flags.add(Flag.HAS_MORE_PAGES); - this.pagingState = pagingState; - return this; + EnumSet<Flag> newFlags = EnumSet.copyOf(flags); + newFlags.add(Flag.HAS_MORE_PAGES); + return new Metadata(newFlags, names, columnCount, pagingState); } public void setSkipMetadata() @@ -354,7 +358,7 @@ public class ResultSet state = PagingState.deserialize(CBUtil.readValue(body)); if (flags.contains(Flag.NO_METADATA)) - return new Metadata(flags, columnCount).setHasMorePages(state); + return new Metadata(flags, null, columnCount, state); boolean globalTablesSpec = flags.contains(Flag.GLOBAL_TABLES_SPEC); @@ -376,7 +380,7 @@ public class ResultSet AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version)); names.add(new ColumnSpecification(ksName, cfName, colName, type)); } - return new Metadata(flags, names).setHasMorePages(state); + return new Metadata(flags, names, names.size(), state); } public void encode(Metadata m, ByteBuf dest, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 1f9688a..765cbac 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -223,9 +223,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache List<Row> page = pager.fetchPage(pageSize); ResultMessage.Rows msg = processResults(page, options, limit, now); - if (!pager.isExhausted()) - msg.result.metadata.setHasMorePages(pager.state()); - return msg; + + return pager.isExhausted() ? msg : msg.withPagingState(pager.state()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index 9d42d5c..bbae921 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -79,4 +79,10 @@ public class PagingState + 2 + cellName.remaining() + 4; } + + @Override + public String toString() + { + return String.format("PagingState(key=%s, cellname=%s, remaining=%d", ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), remaining); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index 7ca9251..9fe1d40 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.ResultSet; import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.service.pager.PagingState; import org.apache.cassandra.transport.*; import org.apache.cassandra.thrift.CqlPreparedResult; import org.apache.cassandra.thrift.CqlResult; @@ -219,6 +220,11 @@ public abstract class ResultMessage extends Message.Response this.result = result; } + public Rows withPagingState(PagingState state) + { + return new Rows(result.withPagingState(state)); + } + public CqlResult toThriftResult() { return result.toThriftResult(); @@ -229,7 +235,6 @@ public abstract class ResultMessage extends Message.Response { return "ROWS " + result; } - } public static class Prepared extends ResultMessage @@ -276,7 +281,11 @@ public abstract class ResultMessage extends Message.Response }; public final MD5Digest statementId; + + /** Describes the variables to be bound in the prepared statement */ public final ResultSet.Metadata metadata; + + /** Describes the results of executing this prepared statement */ public final ResultSet.Metadata resultMetadata; // statement id for CQL-over-thrift compatibility. The binary protocol ignore that.