This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 44f6a3271888300546172563069372fba617ab9c Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Nov 7 17:28:58 2025 +0000 AlterSchemaStatement with no effect locally isn't submitted to CMS Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21001 --- CHANGES.txt | 1 + .../statements/schema/AlterSchemaStatement.java | 10 +- .../cql3/AlterSchemaStatementNoOpTest.java | 173 +++++++++++++++++++++ .../cassandra/cql3/PreparedStatementsTest.java | 91 +++++------ 4 files changed, 218 insertions(+), 57 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5a5022e418..75983caa80 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Don't submit AlterSchemaStatements which produce no effect locally to the CMS (CASSANDRA-21001) * Avoid iterating all prepared statements when getting PreparedStatementsCacheSize metric (CASSANDRA-21038) * Reduce performance impact of TableMetadataRef.get and KeyspaceMetadataRef.get (CASSANDRA-20465) * Improve CMS initialization (CASSANDRA-21036) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java index f67f5db0e3..e0e82591e8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java @@ -183,12 +183,20 @@ abstract public class AlterSchemaStatement implements CQLStatement.SingleKeyspac // submission to the CMS, but it can't guarantee that the statement can be applied as-is on every node in the // cluster, as config can be heterogenous falling back to safe defaults may occur on some nodes. ClusterMetadata metadata = ClusterMetadata.current(); - apply(metadata); + Keyspaces proposed = apply(metadata); + KeyspacesDiff localDiff = Keyspaces.diff(metadata.schema.getKeyspaces(), proposed); + if (localDiff.isEmpty()) + return new ResultMessage.Void(); + ClusterMetadata result = commit(metadata); KeyspacesDiff diff = Keyspaces.diff(metadata.schema.getKeyspaces(), result.schema.getKeyspaces()); clientWarnings(diff).forEach(ClientWarn.instance::warn); + // Even though the preliminary local application produced a non-empty diff, there may have been concurrent + // schema transformations that had been committed to the log but not yet enacted locally. So there remains a + // possibility that the ultimate result is a no-op. i.e. two identical "CREATE IF NOT EXISTS..." racing from + // different coordinators. if (diff.isEmpty()) return new ResultMessage.Void(); diff --git a/test/unit/org/apache/cassandra/cql3/AlterSchemaStatementNoOpTest.java b/test/unit/org/apache/cassandra/cql3/AlterSchemaStatementNoOpTest.java new file mode 100644 index 0000000000..0c2651aad8 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/AlterSchemaStatementNoOpTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.triggers.TriggersTest; + +import static java.lang.String.format; +import static org.junit.Assert.assertTrue; + +public class AlterSchemaStatementNoOpTest extends CQLTester +{ + // Tests that when an AlterSchemaStatement is a no-op according to the local schema representation, then the + // schema transformation is not submitted to the CMS and serialized into the cluster metadata log. e.g. + // * The statement has an IF NOT EXISTS and the target element is present in local schema + // * The statement has IF EXISTS and the target element is not present locally + @Test + public void testKeyspaceNoOps() + { + String ks = name(); + assertNoEpochChange(format("ALTER KEYSPACE IF EXISTS %s WITH replication = {'class': 'SimpleStrategy'}", ks)); + assertMultipleExecutionSingleEpoch(format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy'}", ks)); + assertMultipleExecutionSingleEpoch(format("DROP KEYSPACE IF EXISTS %s", ks)); + } + + @Test + public void testTableNoOps() + { + boolean ddm = DatabaseDescriptor.getDynamicDataMaskingEnabled(); + try + { + DatabaseDescriptor.setDynamicDataMaskingEnabled(true); + String table = KEYSPACE + '.' + name(); + assertNoEpochChange(format("ALTER TABLE IF EXISTS %s WITH comment = 'test'", table)); + assertNoEpochChange(format("ALTER TABLE IF EXISTS %s DROP COMPACT STORAGE", table)); + assertMultipleExecutionSingleEpoch(format("CREATE TABLE IF NOT EXISTS %s (k int PRIMARY KEY, v int)", table)); + assertNoEpochChange(format("ALTER TABLE %s ADD IF NOT EXISTS v text", table)); + assertNoEpochChange(format("ALTER TABLE %s ALTER IF EXISTS foo MASKED WITH mask_default()", table)); + assertNoEpochChange(format("ALTER TABLE %s ALTER IF EXISTS foo DROP MASKED", table)); + assertNoEpochChange(format("ALTER TABLE %s DROP IF EXISTS foo", table)); + // RENAME <col> doesn't currently obey the IF EXISTS clause as an IRE is thrown if the column isn't found + // assertNoEpochChange(format("ALTER TABLE %s RENAME IF EXISTS foo TO bar", table)); + assertMultipleExecutionSingleEpoch(format("DROP TABLE IF EXISTS %s", table)); + } + finally + { + DatabaseDescriptor.setDynamicDataMaskingEnabled(ddm); + } + } + + @Test + public void testUserTypeNoOps() + { + String type = KEYSPACE + '.' + name(); + assertNoEpochChange(format("ALTER TYPE IF EXISTS %s ADD f1 int", type)); + assertMultipleExecutionSingleEpoch(format("CREATE TYPE IF NOT EXISTS %s (f1 int);", type)); + assertNoEpochChange(format("ALTER TYPE IF EXISTS %s ADD IF NOT EXISTS f1 int", type)); + assertNoEpochChange(format("ALTER TYPE IF EXISTS %s RENAME IF EXISTS foo to bar", type)); + assertMultipleExecutionSingleEpoch(format("DROP TYPE IF EXISTS %s", type)); + } + + @Test + public void testIndexNoOps() + { + String table = createTable("create table %s (k int primary key, v int)"); + String index = name(); + assertMultipleExecutionSingleEpoch(format("CREATE INDEX IF NOT EXISTS %s on %s.%s(v)", index, KEYSPACE, table)); + assertMultipleExecutionSingleEpoch(format("DROP INDEX IF EXISTS %s.%s", KEYSPACE, index)); + } + + @Test + public void testMaterializedViewNoOps() + { + String table = KEYSPACE + '.' + createTable("CREATE TABLE %s (k int, c int, v text, PRIMARY KEY(k,c))"); + String view = KEYSPACE + '.' + name(); + assertNoEpochChange(format("ALTER MATERIALIZED VIEW IF EXISTS %s WITH comment = 'test'", view)); + assertMultipleExecutionSingleEpoch(format("CREATE MATERIALIZED VIEW IF NOT EXISTS %s " + + "AS SELECT * FROM %s " + + "WHERE v IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL " + + "PRIMARY KEY (v,k,c)", + view, table)); + assertMultipleExecutionSingleEpoch(format("DROP MATERIALIZED VIEW IF EXISTS %s", view)); + } + + @Test + public void testTriggerNoOps() + { + String table = createTable("create table %s (k int primary key, v int)"); + String trigger = name(); + assertMultipleExecutionSingleEpoch(format("CREATE TRIGGER IF NOT EXISTS %s ON %s.%s USING '%s'", + trigger, KEYSPACE, table, TriggersTest.TestTrigger.class.getName())); + assertMultipleExecutionSingleEpoch(format("DROP TRIGGER IF EXISTS %s ON %s.%s", trigger, KEYSPACE, table)); + } + + @Test + public void testUserFunctionNoOp() + { + String function = name(); + assertMultipleExecutionSingleEpoch(format("CREATE FUNCTION IF NOT EXISTS %s.%s(a int, b int) " + + "CALLED ON NULL INPUT " + + "RETURNS int " + + "LANGUAGE java " + + "AS 'return Integer.valueOf((a!=null?a.intValue():0) + b.intValue());'", + KEYSPACE, function)); + assertMultipleExecutionSingleEpoch(format("DROP FUNCTION IF EXISTS %s.%s", KEYSPACE, function)); + } + + @Test + public void testUserAggregateNoOp() throws Throwable + { + String function = createFunction(KEYSPACE, + "double, double", + "CREATE OR REPLACE FUNCTION %s(state double, val double) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS double " + + "LANGUAGE java " + + "AS 'return state;';"); + String aggregate = name(); + assertMultipleExecutionSingleEpoch(format("CREATE AGGREGATE IF NOT EXISTS %s.%s(double) " + + "SFUNC %s " + + "STYPE double " + + "INITCOND 0", + KEYSPACE, aggregate, shortFunctionName(function))); + assertMultipleExecutionSingleEpoch(format("DROP AGGREGATE IF EXISTS %s.%s", KEYSPACE, aggregate)); + } + + private void assertMultipleExecutionSingleEpoch(String cql) + { + // execute the statement once to ensure it is valid and to set + // up the test condition (i.e. the thing exists/doesn't exist) + Epoch first = ClusterMetadata.current().epoch; + QueryProcessor.executeInternal(cql); + Epoch second = ClusterMetadata.current().epoch; + assertTrue(second.isAfter(first)); + // execute again to check this is truly a no-op and the epoch isn't incremented + QueryProcessor.executeInternal(cql); + Epoch third = ClusterMetadata.current().epoch; + assertTrue(third.is(second)); + } + + private void assertNoEpochChange(String cql) + { + Epoch first = ClusterMetadata.current().epoch; + QueryProcessor.executeInternal(cql); + Epoch second = ClusterMetadata.current().epoch; + assertTrue(second.is(first)); + } + + private String name() + { + return "n" + System.nanoTime(); + } +} diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index a4430ff78f..850f64a134 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -47,9 +47,7 @@ import org.apache.cassandra.serializers.Int32Serializer; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.transport.messages.ResultMessage; @@ -84,25 +82,6 @@ public class PreparedStatementsTest extends CQLTester ClusterMetadataService.instance().log().waitForHighestConsecutive(); } - private static void runAndAwaitNextEpoch(Runnable runnable) - { - try - { - Epoch current = ClusterMetadata.current().epoch; - runnable.run(); - ClusterMetadataService.instance().awaitAtLeast(Epoch.create(current.getEpoch() + 1)); - } - catch (Throwable e) - { - throw new RuntimeException(e); - } - } - - private static void sessionSchemaUpdate(Session session, String update) - { - runAndAwaitNextEpoch(() -> session.execute(update)); - } - @Test public void testUnqualifiedPreparedSelectOrModificationStatementsEmitWarning() { @@ -198,13 +177,13 @@ public class PreparedStatementsTest extends CQLTester public void testInvalidatePreparedStatementsOnDrop() { Session session = sessionNet(ProtocolVersion.V5); - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); + session.execute(dropKsStatement); + session.execute(createKsStatement); String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text) WITH transactional_mode='" + test_unsafe + "';"; String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;"; - sessionSchemaUpdate(session, createTableStatement); + session.execute(createTableStatement); String insert = "INSERT INTO " + KEYSPACE + ".qp_cleanup (id, cid, val) VALUES (?, ?, ?)"; PreparedStatement prepared = session.prepare(insert); @@ -212,18 +191,18 @@ public class PreparedStatementsTest extends CQLTester PreparedStatement preparedTxn = session.prepare(txn(insert)); preparedTxn.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM); - sessionSchemaUpdate(session, dropTableStatement); - sessionSchemaUpdate(session, createTableStatement); + session.execute(dropTableStatement); + session.execute(createTableStatement); updateTxnState(); session.execute(prepared.bind(1, 1, "value")); session.execute(preparedBatch.bind(2, 2, "value2")); session.execute(preparedTxn.bind(3, 3, "value3")); - sessionSchemaUpdate(session, dropTableStatement); // since this is an accord table, need to drop the table before the keyspace - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); - sessionSchemaUpdate(session, createTableStatement); + session.execute(dropTableStatement); // since this is an accord table, need to drop the table before the keyspace + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); updateTxnState(); // The driver will get a response about the prepared statement being invalid, causing it to transparently @@ -232,8 +211,8 @@ public class PreparedStatementsTest extends CQLTester session.execute(prepared.bind(1, 1, "value")); session.execute(preparedBatch.bind(2, 2, "value2")); session.execute(preparedTxn.bind(3, 3, "value3")); - sessionSchemaUpdate(session, dropTableStatement); // since this is an accord table, need to drop the table before the keyspace - sessionSchemaUpdate(session, dropKsStatement); + session.execute(dropTableStatement); // since this is an accord table, need to drop the table before the keyspace + session.execute(dropKsStatement); } @Test @@ -255,9 +234,9 @@ public class PreparedStatementsTest extends CQLTester String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;"; - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); - sessionSchemaUpdate(session, createTableStatement); + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); updateTxnState(); String select = "SELECT * FROM " + KEYSPACE + ".qp_cleanup"; @@ -276,7 +255,7 @@ public class PreparedStatementsTest extends CQLTester assertRowsNet(session.execute(preparedSelectTxn.bind(2)), row(2, 3, 4)); - sessionSchemaUpdate(session, alterTableStatement); + session.execute(alterTableStatement); updateTxnState(); session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);", @@ -320,8 +299,8 @@ public class PreparedStatementsTest extends CQLTester } } - sessionSchemaUpdate(session, dropTableStatement); - sessionSchemaUpdate(session, dropKsStatement); + session.execute(dropTableStatement); + session.execute(dropKsStatement); } @Test @@ -343,9 +322,9 @@ public class PreparedStatementsTest extends CQLTester String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;"; String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;"; - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); - sessionSchemaUpdate(session, createTableStatement); + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); updateTxnState(); String select = "SELECT a, b, c FROM " + KEYSPACE + ".qp_cleanup"; @@ -369,7 +348,7 @@ public class PreparedStatementsTest extends CQLTester Assertions.assertThat(columnNames(rs)).containsExactlyInAnyOrder("a", "b", "c"); } - sessionSchemaUpdate(session, alterTableStatement); + session.execute(alterTableStatement); updateTxnState(); session.execute("INSERT INTO " + KEYSPACE + ".qp_cleanup (a, b, c, d) VALUES (?, ?, ?, ?);", @@ -389,8 +368,8 @@ public class PreparedStatementsTest extends CQLTester Assertions.assertThat(columnNames(rs)).containsExactlyInAnyOrder("a", "b", "c"); } - sessionSchemaUpdate(session, dropTableStatement); - sessionSchemaUpdate(session, dropKsStatement); + session.execute(dropTableStatement); + session.execute(dropKsStatement); } @Test @@ -399,9 +378,9 @@ public class PreparedStatementsTest extends CQLTester Session session = sessionNet(ProtocolVersion.V5); session.execute("USE " + keyspace()); - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); - runAndAwaitNextEpoch(() -> createTable("CREATE TABLE %s (id int PRIMARY KEY, cid int, val text) WITH transactional_mode='" + test_unsafe + "';")); + session.execute(dropKsStatement); + session.execute(createKsStatement); + createTable("CREATE TABLE %s (id int PRIMARY KEY, cid int, val text) WITH transactional_mode='" + test_unsafe + "';"); updateTxnState(); String insertCQL = "INSERT INTO " + currentTable() + " (id, cid, val) VALUES (?, ?, ?)"; @@ -447,14 +426,14 @@ public class PreparedStatementsTest extends CQLTester { Session session = sessionNet(ProtocolVersion.V5); - sessionSchemaUpdate(session, dropKsStatement); - sessionSchemaUpdate(session, createKsStatement); + session.execute(dropKsStatement); + session.execute(createKsStatement); String table = "custom_expr_test"; String index = "custom_index"; - sessionSchemaUpdate(session, String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int PRIMARY KEY, cid int, val text) WITH transactional_mode='" + test_unsafe + "';", + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int PRIMARY KEY, cid int, val text) WITH transactional_mode='" + test_unsafe + "';", KEYSPACE, table)); - sessionSchemaUpdate(session, String.format("CREATE CUSTOM INDEX %s ON %s.%s(val) USING '%s'", + session.execute(String.format("CREATE CUSTOM INDEX %s ON %s.%s(val) USING '%s'", index, KEYSPACE, table, StubIndex.class.getName())); updateTxnState(); @@ -493,7 +472,7 @@ public class PreparedStatementsTest extends CQLTester // Note: this test does not cover all aspects of 10786 (yet) - it was intended to test the // changes for CASSANDRA-13992. - runAndAwaitNextEpoch(() -> createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))")); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)"); try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT))) @@ -679,7 +658,7 @@ public class PreparedStatementsTest extends CQLTester { Session session = sessionNet(version); session.execute("USE " + keyspace()); - runAndAwaitNextEpoch(() -> createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))")); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); PreparedStatement prepared1 = session.prepare(String.format("UPDATE %s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?", currentTable())); PreparedStatement prepared2 = session.prepare(String.format("INSERT INTO %s (pk, v1, v2) VALUES (?, 200, 300) IF NOT EXISTS", currentTable())); @@ -743,7 +722,7 @@ public class PreparedStatementsTest extends CQLTester { Session session = sessionNet(version); session.execute("USE " + keyspace()); - runAndAwaitNextEpoch(() -> createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))")); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); PreparedStatement prepared1 = session.prepare("BEGIN BATCH " + "UPDATE " + currentTable() + " SET v1 = ? WHERE pk = 1 IF v1 = ?;" + @@ -778,7 +757,7 @@ public class PreparedStatementsTest extends CQLTester row(false, 1, 10, 20)); assertEquals(rs.getColumnDefinitions().size(), 4); - runAndAwaitNextEpoch(() -> alterTable("ALTER TABLE %s ADD v3 int;")); + alterTable("ALTER TABLE %s ADD v3 int;"); rs = session.execute(prepared2.bind()); assertRowsNet(rs, @@ -810,7 +789,7 @@ public class PreparedStatementsTest extends CQLTester int maxAttempts = 3; Session session = sessionNet(version); session.execute("USE " + keyspace()); - runAndAwaitNextEpoch(() -> createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk)) WITH transactional_mode='full'")); + createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk)) WITH transactional_mode='full'"); updateTxnState(); PreparedStatement writeOnly = session.prepare(txn( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
