This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4e596230abece703a6f735ca33ec64c5c10b263b
Author: David Capwell <[email protected]>
AuthorDate: Thu Jan 12 14:24:32 2023 -0800

    CEP-15: (C*) Enhance returning SELECT to allow partition and clustering IN 
clauses to return multiple partitions/rows
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18154
---
 .../cql3/statements/TransactionStatement.java      | 43 +++++++++++++++--
 .../cassandra/service/accord/txn/TxnDataName.java  |  5 ++
 .../distributed/test/accord/AccordCQLTest.java     | 56 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 5 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 4d3c732da0..e4893154b2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -182,6 +182,24 @@ public class TransactionStatement implements CQLStatement
         return new TxnNamedRead(namedSelect.name, 
Iterables.getOnlyElement(selectQuery.queries));
     }
 
+    List<TxnNamedRead> createNamedReads(NamedSelect namedSelect, QueryOptions 
options)
+    {
+        SelectStatement select = namedSelect.select;
+        ReadQuery readQuery = select.getQuery(options, 0);
+
+        // We reject reads from both LET and SELECT that do not specify a 
single row.
+        @SuppressWarnings("unchecked")
+        SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery 
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+        if (selectQuery.queries.size() == 1)
+            return Collections.singletonList(new 
TxnNamedRead(namedSelect.name, Iterables.getOnlyElement(selectQuery.queries)));
+
+        List<TxnNamedRead> list = new ArrayList<>(selectQuery.queries.size());
+        for (int i = 0; i < selectQuery.queries.size(); i++)
+            list.add(new TxnNamedRead(TxnDataName.returning(i), 
selectQuery.queries.get(i)));
+        return list;
+    }
+
     private List<TxnNamedRead> createNamedReads(QueryOptions options, 
Consumer<Key> keyConsumer)
     {
         List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
@@ -195,9 +213,11 @@ public class TransactionStatement implements CQLStatement
 
         if (returningSelect != null)
         {
-            TxnNamedRead read = createNamedRead(returningSelect, options);
-            keyConsumer.accept(read.key());
-            reads.add(read);
+            for (TxnNamedRead read : createNamedReads(returningSelect, 
options))
+            {
+                keyConsumer.accept(read.key());
+                reads.add(read);
+            }
         }
 
         for (NamedSelect select : autoReads.values())
@@ -314,10 +334,23 @@ public class TransactionStatement implements CQLStatement
 
             if (returningSelect != null)
             {
-                FilteredPartition partition = 
data.get(TxnDataName.returning());
+                SinglePartitionReadQuery.Group<SinglePartitionReadCommand> 
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) 
returningSelect.select.getQuery(options, 0);
                 Selection.Selectors selectors = 
returningSelect.select.getSelection().newSelectors(options);
                 ResultSetBuilder result = new 
ResultSetBuilder(returningSelect.select.getResultMetadata(), selectors, null);
-                
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, FBUtilities.nowInSeconds());
+                if (selectQuery.queries.size() == 1)
+                {
+                    FilteredPartition partition = 
data.get(TxnDataName.returning());
+                    
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, FBUtilities.nowInSeconds());
+                }
+                else
+                {
+                    int nowInSec = FBUtilities.nowInSeconds();
+                    for (int i = 0; i < selectQuery.queries.size(); i++)
+                    {
+                        FilteredPartition partition = 
data.get(TxnDataName.returning(i));
+                        
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, nowInSec);
+                    }
+                }
                 return new ResultMessage.Rows(result.build());
             }
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index 65b659747f..de672cad2e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -90,6 +90,11 @@ public class TxnDataName implements Comparable<TxnDataName>
         return RETURNING;
     }
 
+    public static TxnDataName returning(int index)
+    {
+        return new TxnDataName(Kind.RETURNING, Integer.toString(index));
+    }
+
     public static TxnDataName partitionRead(TableMetadata metadata, 
DecoratedKey key, int index)
     {
         return new TxnDataName(Kind.AUTO_READ, metadata.keyspace, 
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1315bed14f..aa3a19f95b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.service.accord.AccordTestUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.assertj.core.api.Assertions;
@@ -55,6 +56,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.service.accord.AccordService;
 
+import static org.apache.cassandra.cql3.CQLTester.row;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -77,6 +79,60 @@ public class AccordCQLTest extends AccordTestBase
         SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person 
(height int, age int)");
     }
 
+    @Test
+    public void testMultiPartitionReturn() throws Exception
+    {
+        test(cluster -> {
+            for (int i = 0; i < 10; i++)
+            {
+                for (int j = 0; j < 10; j++)
+                    cluster.coordinator(1).execute("INSERT INTO " + 
currentTable + "(k, c, v) VALUES (?, ?, ?);", ConsistencyLevel.ALL, i, j, i + 
j);
+            }
+            // multi row
+            String cql = "BEGIN TRANSACTION\n" +
+                         "  SELECT * FROM " + currentTable + " WHERE k=? AND c 
IN (?, ?);\n" +
+                         "COMMIT TRANSACTION";
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult(cql, ConsistencyLevel.ANY, 0, 0, 1);
+            assertThat(result).isEqualTo(QueryResults.builder()
+                                                     .columns("k", "c", "v")
+                                                     .row(0, 0, 0)
+                                                     .row(0, 1, 1)
+                                                     .build());
+            // Results should be in Partiton/Clustering order, so make sure
+            // multi partition
+            cql = "BEGIN TRANSACTION\n" +
+                  "  SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND 
c = ?;\n" +
+                  "COMMIT TRANSACTION";
+            for (boolean asc : Arrays.asList(true, false))
+            {
+                Object[] binds = asc ? row(0, 1, 0) : row(1, 0, 0);
+                result = cluster.coordinator(1).executeWithResult(cql, 
ConsistencyLevel.ANY, binds);
+                assertThat(result).isEqualTo(QueryResults.builder()
+                                                         .columns("k", "c", 
"v")
+                                                         .row(0, 0, 0)
+                                                         .row(1, 0, 1)
+                                                         .build());
+            }
+
+            // multi-partition, multi-clustering
+            cql = "BEGIN TRANSACTION\n" +
+                  "  SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND 
c IN (?, ?);\n" +
+                  "COMMIT TRANSACTION";
+            for (boolean asc : Arrays.asList(true, false))
+            {
+                Object[] binds = asc ? row(0, 1, 0, 1) : row(1, 0, 1, 0);
+                result = cluster.coordinator(1).executeWithResult(cql, 
ConsistencyLevel.ANY, binds);
+                assertThat(result).isEqualTo(QueryResults.builder()
+                                                         .columns("k", "c", 
"v")
+                                                         .row(0, 0, 0)
+                                                         .row(0, 1, 1)
+                                                         .row(1, 0, 1)
+                                                         .row(1, 1, 2)
+                                                         .build());
+            }
+        });
+    }
+
     @Test
     public void testMultipleShards() throws Exception
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to