Fix broken paging state with prepared statement

patch by slebresne; reviewed by thobbs for CASSANDRA-7120


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35801be6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35801be6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35801be6

Branch: refs/heads/trunk
Commit: 35801be6f4c85328fe8e73054eed5f91f3df115f
Parents: e5ab470
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri May 23 09:33:25 2014 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri May 23 18:02:36 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/cql3/ResultSet.java    | 38 +++++++++++---------
 .../cql3/statements/SelectStatement.java        |  5 ++-
 .../cassandra/service/pager/PagingState.java    |  6 ++++
 .../transport/messages/ResultMessage.java       | 11 +++++-
 5 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c05bf5..06b042e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,7 @@
  * Fix nodetool netstats (CASSANDRA-7270)
  * Fix potential ClassCastException in HintedHandoffManager (CASSANDRA-7284)
  * Use prepared statements internally (CASSANDRA-6975)
+ * Fix broken paging state with prepared statement (CASSANDRA-7120)
 Merged from 2.0:
  * Always reallocate buffers in HSHA (CASSANDRA-6285)
  * (Hadoop) support authentication in CqlRecordReader (CASSANDRA-7221)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java 
b/src/java/org/apache/cassandra/cql3/ResultSet.java
index eea0475..25635fa 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -92,6 +92,16 @@ public class ResultSet
         }
     }
 
+    public ResultSet withPagingState(PagingState state)
+    {
+        if (state == null)
+            return this;
+
+        // The metadata is shared by all execution of a given statement. So if 
there is a paging state
+        // we need to copy the metadata
+        return new ResultSet(metadata.withPagingState(state), rows);
+    }
+
     public ResultSet makeCountResult(ColumnIdentifier alias)
     {
         assert metadata.names != null;
@@ -238,7 +248,7 @@ public class ResultSet
     {
         public static final CBCodec<Metadata> codec = new Codec();
 
-        public static final Metadata EMPTY = new 
Metadata(EnumSet.of(Flag.NO_METADATA), 0);
+        public static final Metadata EMPTY = new 
Metadata(EnumSet.of(Flag.NO_METADATA), null, 0, null);
 
         public final EnumSet<Flag> flags;
         // Please note that columnCount can actually be smaller than names, 
even if names is not null. This is
@@ -247,27 +257,21 @@ public class ResultSet
         // (CASSANDRA-4911). So the serialization code will exclude any 
columns in name whose index is >= columnCount.
         public final List<ColumnSpecification> names;
         public final int columnCount;
-        public PagingState pagingState;
+        public final PagingState pagingState;
 
         public Metadata(List<ColumnSpecification> names)
         {
-            this(EnumSet.noneOf(Flag.class), names);
+            this(EnumSet.noneOf(Flag.class), names, names.size(), null);
             if (!names.isEmpty() && allInSameCF())
                 flags.add(Flag.GLOBAL_TABLES_SPEC);
         }
 
-        private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names)
+        private Metadata(EnumSet<Flag> flags, List<ColumnSpecification> names, 
int columnCount, PagingState pagingState)
         {
             this.flags = flags;
             this.names = names;
-            this.columnCount = names.size();
-        }
-
-        private Metadata(EnumSet<Flag> flags, int columnCount)
-        {
-            this.flags = flags;
-            this.names = null;
             this.columnCount = columnCount;
+            this.pagingState = pagingState;
         }
 
         // The maximum number of values that the ResultSet can hold. This can 
be bigger than columnCount due to CASSANDRA-4911
@@ -301,14 +305,14 @@ public class ResultSet
             return true;
         }
 
-        public Metadata setHasMorePages(PagingState pagingState)
+        public Metadata withPagingState(PagingState pagingState)
         {
             if (pagingState == null)
                 return this;
 
-            flags.add(Flag.HAS_MORE_PAGES);
-            this.pagingState = pagingState;
-            return this;
+            EnumSet<Flag> newFlags = EnumSet.copyOf(flags);
+            newFlags.add(Flag.HAS_MORE_PAGES);
+            return new Metadata(newFlags, names, columnCount, pagingState);
         }
 
         public void setSkipMetadata()
@@ -354,7 +358,7 @@ public class ResultSet
                     state = PagingState.deserialize(CBUtil.readValue(body));
 
                 if (flags.contains(Flag.NO_METADATA))
-                    return new Metadata(flags, 
columnCount).setHasMorePages(state);
+                    return new Metadata(flags, null, columnCount, state);
 
                 boolean globalTablesSpec = 
flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -376,7 +380,7 @@ public class ResultSet
                     AbstractType type = 
DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, 
type));
                 }
-                return new Metadata(flags, names).setHasMorePages(state);
+                return new Metadata(flags, names, names.size(), state);
             }
 
             public void encode(Metadata m, ByteBuf dest, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1f9688a..765cbac 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -223,9 +223,8 @@ public class SelectStatement implements CQLStatement, 
MeasurableForPreparedCache
 
             List<Row> page = pager.fetchPage(pageSize);
             ResultMessage.Rows msg = processResults(page, options, limit, now);
-            if (!pager.isExhausted())
-                msg.result.metadata.setHasMorePages(pager.state());
-            return msg;
+
+            return pager.isExhausted() ? msg : 
msg.withPagingState(pager.state());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java 
b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 9d42d5c..bbae921 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -79,4 +79,10 @@ public class PagingState
              + 2 + cellName.remaining()
              + 4;
     }
+
+    @Override
+    public String toString()
+    {
+        return String.format("PagingState(key=%s, cellname=%s, remaining=%d", 
ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), 
remaining);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/35801be6/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 7ca9251..9fe1d40 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.thrift.CqlPreparedResult;
 import org.apache.cassandra.thrift.CqlResult;
@@ -219,6 +220,11 @@ public abstract class ResultMessage extends 
Message.Response
             this.result = result;
         }
 
+        public Rows withPagingState(PagingState state)
+        {
+            return new Rows(result.withPagingState(state));
+        }
+
         public CqlResult toThriftResult()
         {
             return result.toThriftResult();
@@ -229,7 +235,6 @@ public abstract class ResultMessage extends Message.Response
         {
             return "ROWS " + result;
         }
-
     }
 
     public static class Prepared extends ResultMessage
@@ -276,7 +281,11 @@ public abstract class ResultMessage extends 
Message.Response
         };
 
         public final MD5Digest statementId;
+
+        /** Describes the variables to be bound in the prepared statement */
         public final ResultSet.Metadata metadata;
+
+        /** Describes the results of executing this prepared statement */
         public final ResultSet.Metadata resultMetadata;
 
         // statement id for CQL-over-thrift compatibility. The binary protocol 
ignore that.

Reply via email to