This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit db3c85287f8bfc4721b2d6199f9c44de2cbc2aaf
Merge: a9d1af9 11b06e9
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Thu Jan 17 10:30:03 2019 +0000

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |   1 +
 .../unit/org/apache/cassandra/cql3/PagingTest.java | 160 +++++++++++++++++++++
 2 files changed, 161 insertions(+)

diff --cc test/unit/org/apache/cassandra/cql3/PagingTest.java
index 0000000,9c7041b..ea1eb43
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/PagingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PagingTest.java
@@@ -1,0 -1,160 +1,160 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.cql3;
+ 
+ import java.net.InetAddress;
+ import java.util.Iterator;
+ import java.util.List;
+ 
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import com.datastax.driver.core.Cluster;
+ import com.datastax.driver.core.ResultSet;
+ import com.datastax.driver.core.Row;
+ import com.datastax.driver.core.Session;
+ import com.datastax.driver.core.SimpleStatement;
+ import com.datastax.driver.core.Statement;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ 
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+ import org.apache.cassandra.locator.AbstractEndpointSnitch;
+ import org.apache.cassandra.locator.IEndpointSnitch;
+ import org.apache.cassandra.service.EmbeddedCassandraService;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static junit.framework.Assert.assertFalse;
+ import static org.junit.Assert.assertEquals;
+ 
+ 
+ public class PagingTest
+ {
+     private static Cluster cluster;
+     private static Session session;
+ 
+     private static final String KEYSPACE = "paging_test";
+     private static final String createKsStatement = "CREATE KEYSPACE " + 
KEYSPACE +
+                                                     " WITH REPLICATION = { 
'class' : 'SimpleStrategy', 'replication_factor' : 2 };";
+ 
+     private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " 
+ KEYSPACE;
+ 
+     @BeforeClass
+     public static void setup() throws Exception
+     {
 -        DatabaseDescriptor.setPartitioner(new Murmur3Partitioner());
++        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+         EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+         cassandra.start();
+ 
+         // Currently the native server start method return before the server 
is fully binded to the socket, so we need
+         // to wait slightly before trying to connect to it. We should fix 
this but in the meantime using a sleep.
+         Thread.sleep(500);
+ 
+         cluster = Cluster.builder().addContactPoint("127.0.0.1")
+                                    
.withPort(DatabaseDescriptor.getNativeTransportPort())
+                                    .build();
+         session = cluster.connect();
+ 
+         session.execute(dropKsStatement);
+         session.execute(createKsStatement);
+     }
+ 
+     @AfterClass
+     public static void tearDown()
+     {
+         cluster.close();
+     }
+ 
+     /**
+      * Makes sure that we don't drop any live rows when paging with DISTINCT 
queries
+      *
+      * * We need to have more rows than fetch_size
+      * * The node must have a token within the first page (so that the range 
gets split up in StorageProxy#getRestrictedRanges)
+      *   - This means that the second read in the second range will read back 
too many rows
+      * * The extra rows are dropped (so that we only return fetch_size rows 
to client)
+      * * This means that the last row recorded in 
AbstractQueryPager#recordLast is a non-live one
+      * * For the next page, the first row returned will be the same non-live 
row as above
+      * * The bug in CASSANDRA-14956 caused us to drop that non-live row + the 
first live row in the next page
+      */
+     @Test
+     public void testPaging() throws InterruptedException
+     {
+         String table = KEYSPACE + ".paging";
+         String createTableStatement = "CREATE TABLE IF NOT EXISTS " + table + 
" (id int, id2 int, id3 int, val text, PRIMARY KEY ((id, id2), id3));";
+         String dropTableStatement = "DROP TABLE IF EXISTS " + table + ';';
+ 
+         // custom snitch to avoid merging ranges back together after 
StorageProxy#getRestrictedRanges splits them up
+         IEndpointSnitch snitch = new AbstractEndpointSnitch()
+         {
+             private IEndpointSnitch oldSnitch = 
DatabaseDescriptor.getEndpointSnitch();
+             public int compareEndpoints(InetAddress target, InetAddress a1, 
InetAddress a2)
+             {
+                 return oldSnitch.compareEndpoints(target, a1, a2);
+             }
+ 
+             public String getRack(InetAddress endpoint)
+             {
+                 return oldSnitch.getRack(endpoint);
+             }
+ 
+             public String getDatacenter(InetAddress endpoint)
+             {
+                 return oldSnitch.getDatacenter(endpoint);
+             }
+ 
+             @Override
+             public boolean isWorthMergingForRangeQuery(List<InetAddress> 
merged, List<InetAddress> l1, List<InetAddress> l2)
+             {
+                 return false;
+             }
+         };
+         DatabaseDescriptor.setEndpointSnitch(snitch);
+         StorageService.instance.getTokenMetadata().clearUnsafe();
+         StorageService.instance.getTokenMetadata().updateNormalToken(new 
LongToken(5097162189738624638L), FBUtilities.getBroadcastAddress());
+         session.execute(createTableStatement);
+ 
+         for (int i = 0; i < 110; i++)
+         {
+             // removing row with idx 10 causes the last row in the first page 
read to be empty
+             String ttlClause = i == 10 ? "USING TTL 1" : "";
+             session.execute(String.format("INSERT INTO %s (id, id2, id3, val) 
VALUES (%d, %d, %d, '%d') %s", table, i, i, i, i, ttlClause));
+         }
+         Thread.sleep(1500);
+ 
+         Statement stmt = new SimpleStatement(String.format("SELECT DISTINCT 
token(id, id2), id, id2 FROM %s", table));
+         stmt.setFetchSize(100);
+         ResultSet res = session.execute(stmt);
+         stmt.setFetchSize(200);
+         ResultSet res2 = session.execute(stmt);
+ 
+         Iterator<Row> iter1 = res.iterator();
+         Iterator<Row> iter2 = res2.iterator();
+ 
+         while (iter1.hasNext() && iter2.hasNext())
+         {
+             Row row1 = iter1.next();
+             Row row2 = iter2.next();
+             assertEquals(row1.getInt("id"), row2.getInt("id"));
+         }
+         assertFalse(iter1.hasNext());
+         assertFalse(iter2.hasNext());
+         session.execute(dropTableStatement);
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to