This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4e596230abece703a6f735ca33ec64c5c10b263b Author: David Capwell <[email protected]> AuthorDate: Thu Jan 12 14:24:32 2023 -0800 CEP-15: (C*) Enhance returning SELECT to allow partition and clustering IN clauses to return multiple partitions/rows patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18154 --- .../cql3/statements/TransactionStatement.java | 43 +++++++++++++++-- .../cassandra/service/accord/txn/TxnDataName.java | 5 ++ .../distributed/test/accord/AccordCQLTest.java | 56 ++++++++++++++++++++++ 3 files changed, 99 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java index 4d3c732da0..e4893154b2 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java @@ -182,6 +182,24 @@ public class TransactionStatement implements CQLStatement return new TxnNamedRead(namedSelect.name, Iterables.getOnlyElement(selectQuery.queries)); } + List<TxnNamedRead> createNamedReads(NamedSelect namedSelect, QueryOptions options) + { + SelectStatement select = namedSelect.select; + ReadQuery readQuery = select.getQuery(options, 0); + + // We reject reads from both LET and SELECT that do not specify a single row. + @SuppressWarnings("unchecked") + SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery; + + if (selectQuery.queries.size() == 1) + return Collections.singletonList(new TxnNamedRead(namedSelect.name, Iterables.getOnlyElement(selectQuery.queries))); + + List<TxnNamedRead> list = new ArrayList<>(selectQuery.queries.size()); + for (int i = 0; i < selectQuery.queries.size(); i++) + list.add(new TxnNamedRead(TxnDataName.returning(i), selectQuery.queries.get(i))); + return list; + } + private List<TxnNamedRead> createNamedReads(QueryOptions options, Consumer<Key> keyConsumer) { List<TxnNamedRead> reads = new ArrayList<>(assignments.size() + 1); @@ -195,9 +213,11 @@ public class TransactionStatement implements CQLStatement if (returningSelect != null) { - TxnNamedRead read = createNamedRead(returningSelect, options); - keyConsumer.accept(read.key()); - reads.add(read); + for (TxnNamedRead read : createNamedReads(returningSelect, options)) + { + keyConsumer.accept(read.key()); + reads.add(read); + } } for (NamedSelect select : autoReads.values()) @@ -314,10 +334,23 @@ public class TransactionStatement implements CQLStatement if (returningSelect != null) { - FilteredPartition partition = data.get(TxnDataName.returning()); + SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) returningSelect.select.getQuery(options, 0); Selection.Selectors selectors = returningSelect.select.getSelection().newSelectors(options); ResultSetBuilder result = new ResultSetBuilder(returningSelect.select.getResultMetadata(), selectors, null); - returningSelect.select.processPartition(partition.rowIterator(), options, result, FBUtilities.nowInSeconds()); + if (selectQuery.queries.size() == 1) + { + FilteredPartition partition = data.get(TxnDataName.returning()); + returningSelect.select.processPartition(partition.rowIterator(), options, result, FBUtilities.nowInSeconds()); + } + else + { + int nowInSec = FBUtilities.nowInSeconds(); + for (int i = 0; i < selectQuery.queries.size(); i++) + { + FilteredPartition partition = data.get(TxnDataName.returning(i)); + returningSelect.select.processPartition(partition.rowIterator(), options, result, nowInSec); + } + } return new ResultMessage.Rows(result.build()); } diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java index 65b659747f..de672cad2e 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java @@ -90,6 +90,11 @@ public class TxnDataName implements Comparable<TxnDataName> return RETURNING; } + public static TxnDataName returning(int index) + { + return new TxnDataName(Kind.RETURNING, Integer.toString(index)); + } + public static TxnDataName partitionRead(TableMetadata metadata, DecoratedKey key, int index) { return new TxnDataName(Kind.AUTO_READ, metadata.keyspace, metadata.name, bytesToString(key.getKey()), String.valueOf(index)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java index 1315bed14f..aa3a19f95b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.utils.ByteBufferUtil; import org.assertj.core.api.Assertions; @@ -55,6 +56,7 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.service.accord.AccordService; +import static org.apache.cassandra.cql3.CQLTester.row; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -77,6 +79,60 @@ public class AccordCQLTest extends AccordTestBase SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person (height int, age int)"); } + @Test + public void testMultiPartitionReturn() throws Exception + { + test(cluster -> { + for (int i = 0; i < 10; i++) + { + for (int j = 0; j < 10; j++) + cluster.coordinator(1).execute("INSERT INTO " + currentTable + "(k, c, v) VALUES (?, ?, ?);", ConsistencyLevel.ALL, i, j, i + j); + } + // multi row + String cql = "BEGIN TRANSACTION\n" + + " SELECT * FROM " + currentTable + " WHERE k=? AND c IN (?, ?);\n" + + "COMMIT TRANSACTION"; + SimpleQueryResult result = cluster.coordinator(1).executeWithResult(cql, ConsistencyLevel.ANY, 0, 0, 1); + assertThat(result).isEqualTo(QueryResults.builder() + .columns("k", "c", "v") + .row(0, 0, 0) + .row(0, 1, 1) + .build()); + // Results should be in Partiton/Clustering order, so make sure + // multi partition + cql = "BEGIN TRANSACTION\n" + + " SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND c = ?;\n" + + "COMMIT TRANSACTION"; + for (boolean asc : Arrays.asList(true, false)) + { + Object[] binds = asc ? row(0, 1, 0) : row(1, 0, 0); + result = cluster.coordinator(1).executeWithResult(cql, ConsistencyLevel.ANY, binds); + assertThat(result).isEqualTo(QueryResults.builder() + .columns("k", "c", "v") + .row(0, 0, 0) + .row(1, 0, 1) + .build()); + } + + // multi-partition, multi-clustering + cql = "BEGIN TRANSACTION\n" + + " SELECT * FROM " + currentTable + " WHERE k IN (?, ?) AND c IN (?, ?);\n" + + "COMMIT TRANSACTION"; + for (boolean asc : Arrays.asList(true, false)) + { + Object[] binds = asc ? row(0, 1, 0, 1) : row(1, 0, 1, 0); + result = cluster.coordinator(1).executeWithResult(cql, ConsistencyLevel.ANY, binds); + assertThat(result).isEqualTo(QueryResults.builder() + .columns("k", "c", "v") + .row(0, 0, 0) + .row(0, 1, 1) + .row(1, 0, 1) + .row(1, 1, 2) + .build()); + } + }); + } + @Test public void testMultipleShards() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
