This is an automated email from the ASF dual-hosted git repository.
jlewandowski pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 1e04ea4418 Fix null value handling for static columns
1e04ea4418 is described below
commit 1e04ea44186b9bd22290db767a6c6ac7e8b05106
Author: Jacek Lewandowski <[email protected]>
AuthorDate: Fri Feb 10 15:29:05 2023 +0100
Fix null value handling for static columns
patch by <jacek-lewandowski>; reviewed by <maedhroz> and <dcapwell> for
CASSANDRA-18241
---
CHANGES.txt | 1 +
.../cql3/statements/TransactionStatement.java | 6 +-
.../org/apache/cassandra/service/StorageProxy.java | 7 +-
.../cassandra/service/accord/txn/TxnCondition.java | 2 +-
.../cassandra/service/accord/txn/TxnNamedRead.java | 9 +-
.../distributed/test/accord/AccordCQLTest.java | 122 +++++++++++++++++++--
.../distributed/test/accord/AccordTestBase.java | 55 ++++++++--
7 files changed, 176 insertions(+), 26 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9646a1ac5d..a7199d0f7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
accord
+ * Fix null value handling for static columns (CASSANDRA-18241)
* Feature Flag for Accord Transactions (CASSANDRA-18195)
* CEP-15: Multi-Partition Transaction CQL Support (Alpha) (CASSANDRA-17719)
* CEP-15 (C*): Messaging and storage engine integration (CASSANDRA-17103)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 7d99cfa9ea..0348ab9618 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -342,7 +342,8 @@ public class TransactionStatement implements CQLStatement
if (selectQuery.queries.size() == 1)
{
FilteredPartition partition =
data.get(TxnDataName.returning());
-
returningSelect.select.processPartition(partition.rowIterator(), options,
result, FBUtilities.nowInSeconds());
+ if (partition != null)
+
returningSelect.select.processPartition(partition.rowIterator(), options,
result, FBUtilities.nowInSeconds());
}
else
{
@@ -350,7 +351,8 @@ public class TransactionStatement implements CQLStatement
for (int i = 0; i < selectQuery.queries.size(); i++)
{
FilteredPartition partition =
data.get(TxnDataName.returning(i));
-
returningSelect.select.processPartition(partition.rowIterator(), options,
result, nowInSec);
+ if (partition != null)
+
returningSelect.select.processPartition(partition.rowIterator(), options,
result, nowInSec);
}
}
return new ResultMessage.Rows(result.build());
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4f85776625..713204ec2b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.MessageParams;
@@ -1904,7 +1905,11 @@ public class StorageProxy implements StorageProxyMBean
TxnRead read = TxnRead.createSerialRead(group.queries.get(0));
Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL);
TxnData data = AccordService.instance().coordinate(txn,
consistencyLevel);
- return
PartitionIterators.singletonIterator(data.get(TxnRead.SERIAL_READ).rowIterator());
+ FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
+ if (partition != null)
+ return
PartitionIterators.singletonIterator(partition.rowIterator());
+ else
+ return EmptyIterators.partition();
}
private static PartitionIterator
legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel
consistencyLevel, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index e0bfaa1f7b..7c60beae60 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -337,7 +337,7 @@ public abstract class TxnCondition
{
checkNotNull(data);
FilteredPartition partition = data.get(SERIAL_READ);
- Row row = partition.getRow(clustering);
+ Row row = partition != null ? partition.getRow(clustering) : null;
for (Bound bound : bounds)
{
if (!bound.appliesTo(row))
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index ea11312d72..534f4aa262 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -130,9 +129,13 @@ public class TxnNamedRead extends
AbstractSerialized<ReadCommand>
UnfilteredPartitionIterator partition =
read.executeLocally(controller);
PartitionIterator iterator =
UnfilteredPartitionIterators.filter(partition, read.nowInSec()))
{
- FilteredPartition filtered =
FilteredPartition.create(PartitionIterators.getOnlyElement(iterator, read));
TxnData result = new TxnData();
- result.put(name, filtered);
+ if (iterator.hasNext())
+ {
+ FilteredPartition filtered =
FilteredPartition.create(iterator.next());
+ if (filtered.hasRows() || read.selectsFullPartition())
+ result.put(name, filtered);
+ }
return result;
}
});
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 9d830b1d66..b0d61723e8 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -31,6 +32,8 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.distributed.Cluster;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -242,22 +245,22 @@ public class AccordCQLTest extends AccordTestBase
cluster ->
{
String insertNull = "BEGIN TRANSACTION\n" +
- " LET row0 = (SELECT v FROM " +
currentTable + " WHERE k = 0 LIMIT 1);\n" +
- " SELECT row0.v;\n" +
+ " LET row0 = (SELECT * FROM " +
currentTable + " WHERE k = 0 LIMIT 1);\n" +
+ " SELECT row0.k, row0.v;\n" +
" IF row0.v IS NULL THEN\n" +
" INSERT INTO " + currentTable + " (k,
c, v) VALUES (?, ?, null);\n" +
" END IF\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(cluster, new Object[] {
null }, insertNull, 0, 0);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[] {
null, null }, insertNull, 0, 0);
String insert = "BEGIN TRANSACTION\n" +
- " LET row0 = (SELECT v FROM " + currentTable
+ " WHERE k = 0 LIMIT 1);\n" +
- " SELECT row0.v;\n" +
+ " LET row0 = (SELECT * FROM " + currentTable
+ " WHERE k = 0 LIMIT 1);\n" +
+ " SELECT row0.k, row0.v;\n" +
" IF row0.v IS NULL THEN\n" +
" INSERT INTO " + currentTable + " (k, c,
v) VALUES (?, ?, ?);\n" +
" END IF\n" +
"COMMIT TRANSACTION";
- assertRowEqualsWithPreemptedRetry(cluster, new Object[] {
null }, insert, 0, 0, 1);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0,
null }, insert, 0, 0, 1);
String check = "BEGIN TRANSACTION\n" +
" SELECT k, c, v FROM " + currentTable + "
WHERE k=0 AND c=0;\n" +
@@ -266,6 +269,111 @@ public class AccordCQLTest extends AccordTestBase
});
}
+ @Test
+ public void testQueryStaticColumn() throws Exception
+ {
+ test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v
int, primary key (k, c))",
+ cluster ->
+ {
+ // select partition key, clustering key and static column,
restrict on partition and clustering
+ testQueryStaticColumn(cluster,
+ "LET row0 = (SELECT k, c, s, v FROM " +
currentTable + " WHERE k = ? AND c = 0);\n" +
+ "SELECT row0.k, row0.c, row0.s,
row0.v;\n",
+
+ "SELECT k, c, s, v FROM " +
currentTable + " WHERE k = ? AND c = 0");
+
+ // select partition key, clustering key and static column,
restrict on partition and limit to 1 row
+ testQueryStaticColumn(cluster,
+ "LET row0 = (SELECT k, c, s, v FROM " +
currentTable + " WHERE k = ? LIMIT 1);\n" +
+ "SELECT row0.k, row0.c, row0.s,
row0.v;\n",
+
+ "SELECT k, c, s, v FROM " +
currentTable + " WHERE k = ? LIMIT 1");
+
+ // select static column and regular column, restrict on
partition and clustering
+ testQueryStaticColumn(cluster,
+ "LET row0 = (SELECT s, v FROM " +
currentTable + " WHERE k = ? AND c = 0);\n" +
+ "SELECT row0.s, row0.v;\n",
+
+ "SELECT s, v FROM " + currentTable + "
WHERE k = ? AND c = 0");
+
+ // select just static column, restrict on partition and limit
to 1 row
+ testQueryStaticColumn(cluster,
+ "LET row0 = (SELECT s FROM " +
currentTable + " WHERE k = ? LIMIT 1);\n" +
+ "SELECT row0.s;\n",
+
+ "SELECT s FROM " + currentTable + "
WHERE k = ? LIMIT 1");
+ });
+ }
+
+ private void testQueryStaticColumn(Cluster cluster, String
accordReadQuery, String simpleReadQuery)
+ {
+ logger().info("Empty table");
+ int key = 10;
+ assertResultsFromAccordMatches(cluster, accordReadQuery,
simpleReadQuery, key++);
+
+ cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + "
(k, s) VALUES (?, null);", ConsistencyLevel.ALL, key);
+ logger().info("null -> static column");
+ assertResultsFromAccordMatches(cluster, accordReadQuery,
simpleReadQuery, key++);
+
+ cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + "
(k, s) VALUES (?, 1);", ConsistencyLevel.ALL, key);
+ logger().info("Inserted 1 -> static column");
+ assertResultsFromAccordMatches(cluster, accordReadQuery,
simpleReadQuery, key++);
+
+ cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + "
(k, c) VALUES (?, 0);", ConsistencyLevel.ALL, key);
+ logger().info("Inserted 0 -> clustering");
+ assertResultsFromAccordMatches(cluster, accordReadQuery,
simpleReadQuery, key);
+ }
+
+ @Test
+ public void testUpdateStaticColumn() throws Exception {
+ test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v
int, primary key (k, c))",
+ cluster ->
+ {
+ checkUpdateStatic(cluster, "SET s=1 WHERE k=?", 101, "[[101,
null, 1, null]]", "[]");
+ checkUpdateStatic(cluster, "SET s=1, v=11 WHERE k=? AND c=0",
101, "[[101, 0, 1, 11]]", "[[101, 0, 1, 11]]");
+
+ // commented out until
org.apache.cassandra.cql3.statements.ModificationStatement.createSelectForTxn
is fixed
+ // checkUpdateStatic(cluster, "SET s+=1 WHERE k=?", 101,
"[]", "[]");
+
+ checkUpdateStatic(cluster, "SET s+=1, v+=11 WHERE k=? AND
c=0", 101, "[]", "[]");
+ });
+ }
+
+ private void checkUpdateStatic(Cluster cluster, String update, int key,
String expPart, String expClust)
+ {
+ Object[][] r1, r2, r3, r4, r;
+ r = cluster.get(1).coordinator().execute("UPDATE " + currentTable + "
" + update + " IF s = NULL;", ConsistencyLevel.QUORUM, key);
+ Assertions.assertThat(Arrays.deepToString(r)).isEqualTo("[[true]]");
+ r1 = cluster.get(1).coordinator().execute("SELECT * FROM " +
currentTable + " WHERE k = ? LIMIT 1;", ConsistencyLevel.SERIAL, key);
+ r2 = cluster.get(1).coordinator().execute("SELECT * FROM " +
currentTable + " WHERE k = ? AND c = 0;", ConsistencyLevel.SERIAL, key);
+ cluster.get(1).coordinator().execute("TRUNCATE " + currentTable,
ConsistencyLevel.ALL);
+
+ executeAsTxn(cluster, "UPDATE " + currentTable + " " + update + ";",
key);
+ r3 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k
= ? LIMIT 1;", key).toObjectArrays();
+ r4 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k
= ? AND c = 0;", key).toObjectArrays();
+ cluster.get(1).coordinator().execute("TRUNCATE " + currentTable,
ConsistencyLevel.ALL);
+
+ Assertions.assertThat(Arrays.deepToString(r1)).isEqualTo(expPart);
+ Assertions.assertThat(Arrays.deepToString(r2)).isEqualTo(expClust);
+ Assertions.assertThat(Arrays.deepToString(r3)).isEqualTo(expPart);
+ Assertions.assertThat(Arrays.deepToString(r4)).isEqualTo(expClust);
+ }
+
+ private void assertResultsFromAccordMatches(Cluster cluster, String
accordRead, String simpleRead, int key)
+ {
+ Object[][] simpleReadResult =
cluster.get(1).executeInternal(simpleRead, key);
+ Object[][] accordReadResult = executeWithRetry(cluster, accordRead,
key).toObjectArrays();
+
+
Assertions.assertThat(withRemovedNullOnlyRows(accordReadResult)).isEqualTo(withRemovedNullOnlyRows(simpleReadResult));
+ }
+
+ private static Object[][] withRemovedNullOnlyRows(Object[][] results)
+ {
+ return Arrays.stream(results)
+ .filter(row ->
!Arrays.stream(row).allMatch(Objects::isNull))
+ .toArray(Object[][]::new);
+ }
+
@Test
public void testScalarEQ() throws Throwable
{
@@ -2305,7 +2413,7 @@ public class AccordCQLTest extends AccordTestBase
" LET existing = (SELECT * FROM demo_ks.org_docs
WHERE org_name='demo' AND doc_id=101);\n" +
" SELECT members_version FROM demo_ks.org_users WHERE
org_name='demo' LIMIT 1;\n" +
" IF demo_user.members_version = 5 AND existing IS
NULL THEN\n" +
- " UPDATE demo_ks.org_docs SET title='slides.key',
permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n"
+
+ " UPDATE demo_ks.org_docs SET title='slides.key',
permissions=777, contents_version = 6 WHERE org_name='demo' AND doc_id=101;\n" +
" UPDATE demo_ks.user_docs SET title='slides.key',
permissions=777 WHERE user='blake' AND doc_id=101;\n" +
" UPDATE demo_ks.user_docs SET title='slides.key',
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
" END IF\n" +
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 7590fa4136..1a7d59f8fe 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -19,11 +19,13 @@
package org.apache.cassandra.distributed.test.accord;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.junit.AfterClass;
import org.junit.Before;
@@ -60,12 +62,14 @@ import static org.junit.Assert.assertArrayEquals;
public abstract class AccordTestBase extends TestBaseImpl
{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordTestBase.class);
+ private static final int MAX_RETRIES = 10;
+
protected static final AtomicInteger COUNTER = new AtomicInteger(0);
protected static Cluster SHARED_CLUSTER;
-
+
protected String currentTable;
- private final Logger logger = LoggerFactory.getLogger(getClass());
@BeforeClass
public static void setupClass() throws IOException
@@ -140,8 +144,16 @@ public abstract class AccordTestBase extends TestBaseImpl
.start());
}
- private static SimpleQueryResult execute(Cluster cluster, String check,
Object... boundValues)
+ protected static SimpleQueryResult executeAsTxn(Cluster cluster, String
check, Object... boundValues)
+ {
+ String normalized = wrapInTxn(check);
+ logger.info("Executing transaction statement:\n{}", normalized);
+ return cluster.coordinator(1).executeWithResult(normalized,
ConsistencyLevel.ANY, boundValues);
+ }
+
+ protected static SimpleQueryResult execute(Cluster cluster, String check,
Object... boundValues)
{
+ logger.info("Executing statement:\n{}", check);
return cluster.coordinator(1).executeWithResult(check,
ConsistencyLevel.ANY, boundValues);
}
@@ -181,29 +193,48 @@ public abstract class AccordTestBase extends TestBaseImpl
{
return execute(cluster, check, boundValues);
}
- catch (Throwable t)
+ catch (RuntimeException ex)
{
- if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
+ if (count <= MAX_RETRIES &&
AssertionUtils.rootCauseIs(Preempted.class).matches(ex))
{
- logger.warn("[Retry attempt={}] Preempted failure for {}",
count, check);
+ logger.warn("[Retry attempt={}] Preempted failure for\n{}",
count, check);
return executeWithRetry0(count + 1, cluster, check,
boundValues);
}
- throw t;
+ throw ex;
}
}
protected SimpleQueryResult executeWithRetry(Cluster cluster, String
check, Object... boundValues)
{
+ check = wrapInTxn(check);
+
// is this method safe?
- cluster.get(1).runOnInstance(() -> {
- TransactionStatement stmt = AccordTestUtils.parse(check);
- if (!isIdempotent(stmt))
- throw new AssertionError("Unable to retry txn that is not
idempotent: cql=" + check);
- });
+
+ if (!isIdempotent(cluster, check))
+ throw new AssertionError("Unable to retry txn that is not
idempotent: cql=\n" + check);
+
return executeWithRetry0(0, cluster, check, boundValues);
}
+ private boolean isIdempotent(Cluster cluster, String cql)
+ {
+ return cluster.get(1).callOnInstance(() -> {
+ TransactionStatement stmt = AccordTestUtils.parse(cql);
+ return isIdempotent(stmt);
+ });
+ }
+
+ private static String wrapInTxn(String statement)
+ {
+ if (!statement.trim().toUpperCase().startsWith("BEGIN TRANSACTION"))
+ {
+ statement = statement.trim();
+ statement =
Arrays.stream(statement.split("\\n")).collect(Collectors.joining("\n ", "BEGIN
TRANSACTION\n ", "\nCOMMIT TRANSACTION"));
+ }
+ return statement;
+ }
+
public static boolean isIdempotent(TransactionStatement statement)
{
for (ModificationStatement update : statement.getUpdates())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]