This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 46d10778da CEP-15: (C*) Enhance returning SELECT to allow partition
and clustering IN clauses to return multiple partitions/rows
46d10778da is described below
commit 46d10778da6d1a76454ff7bb404c232bcd7255ae
Author: David Capwell <[email protected]>
AuthorDate: Thu Jan 12 14:24:32 2023 -0800
CEP-15: (C*) Enhance returning SELECT to allow partition and clustering IN
clauses to return multiple partitions/rows
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18154
---
.../cql3/statements/TransactionStatement.java | 43 +++++++++++++++--
.../cassandra/service/accord/txn/TxnDataName.java | 5 ++
.../distributed/test/accord/AccordCQLTest.java | 56 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 5 deletions(-)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 4d3c732da0..e4893154b2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -182,6 +182,24 @@ public class TransactionStatement implements CQLStatement
return new TxnNamedRead(namedSelect.name,
Iterables.getOnlyElement(selectQuery.queries));
}
+ List<TxnNamedRead> createNamedReads(NamedSelect namedSelect, QueryOptions
options)
+ {
+ SelectStatement select = namedSelect.select;
+ ReadQuery readQuery = select.getQuery(options, 0);
+
+ // We reject reads from both LET and SELECT that do not specify a
single row.
+ @SuppressWarnings("unchecked")
+ SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery
= (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
+
+ if (selectQuery.queries.size() == 1)
+ return Collections.singletonList(new
TxnNamedRead(namedSelect.name, Iterables.getOnlyElement(selectQuery.queries)));
+
+ List<TxnNamedRead> list = new ArrayList<>(selectQuery.queries.size());
+ for (int i = 0; i < selectQuery.queries.size(); i++)
+ list.add(new TxnNamedRead(TxnDataName.returning(i),
selectQuery.queries.get(i)));
+ return list;
+ }
+
private List<TxnNamedRead> createNamedReads(QueryOptions options,
Consumer<Key> keyConsumer)
{
List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1);
@@ -195,9 +213,11 @@ public class TransactionStatement implements CQLStatement
if (returningSelect != null)
{
- TxnNamedRead read = createNamedRead(returningSelect, options);
- keyConsumer.accept(read.key());
- reads.add(read);
+ for (TxnNamedRead read : createNamedReads(returningSelect,
options))
+ {
+ keyConsumer.accept(read.key());
+ reads.add(read);
+ }
}
for (NamedSelect select : autoReads.values())
@@ -314,10 +334,23 @@ public class TransactionStatement implements CQLStatement
if (returningSelect != null)
{
- FilteredPartition partition =
data.get(TxnDataName.returning());
+ SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>)
returningSelect.select.getQuery(options, 0);
Selection.Selectors selectors =
returningSelect.select.getSelection().newSelectors(options);
ResultSetBuilder result = new
ResultSetBuilder(returningSelect.select.getResultMetadata(), selectors, null);
-
returningSelect.select.processPartition(partition.rowIterator(), options,
result, FBUtilities.nowInSeconds());
+ if (selectQuery.queries.size() == 1)
+ {
+ FilteredPartition partition =
data.get(TxnDataName.returning());
+
returningSelect.select.processPartition(partition.rowIterator(), options,
result, FBUtilities.nowInSeconds());
+ }
+ else
+ {
+ int nowInSec = FBUtilities.nowInSeconds();
+ for (int i = 0; i < selectQuery.queries.size(); i++)
+ {
+ FilteredPartition partition =
data.get(TxnDataName.returning(i));
+
returningSelect.select.processPartition(partition.rowIterator(), options,
result, nowInSec);
+ }
+ }
return new ResultMessage.Rows(result.build());
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index 65b659747f..de672cad2e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -90,6 +90,11 @@ public class TxnDataName implements Comparable<TxnDataName>
return RETURNING;
}
+ public static TxnDataName returning(int index)
+ {
+ return new TxnDataName(Kind.RETURNING, Integer.toString(index));
+ }
+
public static TxnDataName partitionRead(TableMetadata metadata,
DecoratedKey key, int index)
{
return new TxnDataName(Kind.AUTO_READ, metadata.keyspace,
metadata.name, bytesToString(key.getKey()), String.valueOf(index));
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1315bed14f..aa3a19f95b 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.assertj.core.api.Assertions;
@@ -55,6 +56,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.service.accord.AccordService;
+import static org.apache.cassandra.cql3.CQLTester.row;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -77,6 +79,60 @@ public class AccordCQLTest extends AccordTestBase
SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person
(height int, age int)");
}
+ @Test
+ public void testMultiPartitionReturn() throws Exception
+ {
+ test(cluster -> {
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ cluster.coordinator(1).execute("INSERT INTO " +
currentTable + "(k, c, v) VALUES (?, ?, ?);", ConsistencyLevel.ALL, i, j, i +
j);
+ }
+ // multi row
+ String cql = "BEGIN TRANSACTION\n" +
+ " SELECT * FROM " + currentTable + " WHERE k=? AND c
IN (?, ?);\n" +
+ "COMMIT TRANSACTION";
+ SimpleQueryResult result =
cluster.coordinator(1).executeWithResult(cql, ConsistencyLevel.ANY, 0, 0, 1);
+ assertThat(result).isEqualTo(QueryResults.builder()
+ .columns("k", "c", "v")
+ .row(0, 0, 0)
+ .row(0, 1, 1)
+ .build());
+ // Results should be in Partiton/Clustering order, so make sure
+ // multi partition
+ cql = "BEGIN TRANSACTION\n" +
+ " SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND
c = ?;\n" +
+ "COMMIT TRANSACTION";
+ for (boolean asc : Arrays.asList(true, false))
+ {
+ Object[] binds = asc ? row(0, 1, 0) : row(1, 0, 0);
+ result = cluster.coordinator(1).executeWithResult(cql,
ConsistencyLevel.ANY, binds);
+ assertThat(result).isEqualTo(QueryResults.builder()
+ .columns("k", "c",
"v")
+ .row(0, 0, 0)
+ .row(1, 0, 1)
+ .build());
+ }
+
+ // multi-partition, multi-clustering
+ cql = "BEGIN TRANSACTION\n" +
+ " SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND
c IN (?, ?);\n" +
+ "COMMIT TRANSACTION";
+ for (boolean asc : Arrays.asList(true, false))
+ {
+ Object[] binds = asc ? row(0, 1, 0, 1) : row(1, 0, 1, 0);
+ result = cluster.coordinator(1).executeWithResult(cql,
ConsistencyLevel.ANY, binds);
+ assertThat(result).isEqualTo(QueryResults.builder()
+ .columns("k", "c",
"v")
+ .row(0, 0, 0)
+ .row(0, 1, 1)
+ .row(1, 0, 1)
+ .row(1, 1, 2)
+ .build());
+ }
+ });
+ }
+
@Test
public void testMultipleShards() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]