This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9bf1680b1f1c016717381aa59367950fa770f57f Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Nov 27 08:47:42 2024 +0100 Avoid prepared statement invalidation race when committing schema changes Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20116 --- CHANGES.txt | 1 + .../apache/cassandra/schema/DistributedSchema.java | 69 ++++++---- .../cassandra/tcm/listeners/SchemaListener.java | 1 + .../PreparedStatementInvalidationRaceTest.java | 139 +++++++++++++++++++++ 4 files changed, 188 insertions(+), 22 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b3c4f13bec..4d2ef6c3d4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Avoid prepared statement invalidation race when committing schema changes (CASSANDRA-20116) * Restore optimization in MultiCBuilder around building one clustering (CASSANDRA-20129) * Consolidate all snapshot management to SnapshotManager and introduce SnapshotManagerMBean (CASSANDRA-18111) * Fix RequestFailureReason constants codes (CASSANDRA-20126) diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java b/src/java/org/apache/cassandra/schema/DistributedSchema.java index e10c1e0049..ec0c65eb3a 100644 --- a/src/java/org/apache/cassandra/schema/DistributedSchema.java +++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java @@ -187,16 +187,8 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> SchemaChangeNotifier schemaChangeNotifier = Schema.instance.schemaChangeNotifier(); schemaChangeNotifier.notifyPreChanges(new SchemaTransformation.SchemaTransformationResult(prev, this, ksDiff)); - ksDiff.dropped.forEach(metadata -> { - schemaChangeNotifier.notifyKeyspaceDropped(metadata, loadSSTables); - dropKeyspace(metadata, true); - }); - - ksDiff.created.forEach(metadata -> { - schemaChangeNotifier.notifyKeyspaceCreated(metadata); - keyspaceInstances.put(metadata.name, new Keyspace(Schema.instance, metadata, loadSSTables)); - }); - + ksDiff.dropped.forEach(metadata -> dropKeyspace(metadata, true)); + ksDiff.created.forEach(metadata -> keyspaceInstances.put(metadata.name, new Keyspace(Schema.instance, metadata, loadSSTables))); ksDiff.altered.forEach(delta -> { boolean initialized = Keyspace.isInitialized(); @@ -218,12 +210,9 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> delta.tables.altered.forEach(diff -> alterTable(keyspace, diff.after)); delta.views.altered.forEach(diff -> alterView(keyspace, diff.after)); - schemaChangeNotifier.notifyKeyspaceAltered(delta, loadSSTables); // deal with all added, and altered views keyspace.viewManager.reload(keyspaces.get(keyspace.getName()).get()); } - - SchemaDiagnostics.keyspaceAltered(Schema.instance, delta); }); // Avoid system table side effects during initialization @@ -234,6 +223,51 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> } } + public void notifyPostCommit(DistributedSchema prevSchema, boolean loadSSTables) + { + if (!prevSchema.isEmpty() && prevSchema.keyspaceInstances.isEmpty()) + prevSchema = DistributedSchema.empty(); + + Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prevSchema.getKeyspaces(), getKeyspaces()); + + SchemaChangeNotifier schemaChangeNotifier = Schema.instance.schemaChangeNotifier(); + + ksDiff.dropped.forEach(metadata -> { + schemaChangeNotifier.notifyKeyspaceDropped(metadata, loadSSTables); + if (Keyspace.isInitialized()) + { + metadata.views.forEach((tableMetadata) -> SchemaDiagnostics.tableDropped(Schema.instance, tableMetadata.metadata)); + metadata.tables.forEach((tableMetadata) -> SchemaDiagnostics.tableDropped(Schema.instance, tableMetadata)); + SchemaDiagnostics.metadataRemoved(Schema.instance, metadata); + SchemaDiagnostics.keyspaceDropped(Schema.instance, metadata); + } + }); + ksDiff.created.forEach(schemaChangeNotifier::notifyKeyspaceCreated); + ksDiff.altered.forEach(delta -> { + boolean initialized = Keyspace.isInitialized(); + Keyspace keyspace = initialized ? keyspaceInstances.get(delta.before.name) : null; + if (initialized) + { + assert keyspace != null : String.format("Keyspace %s is not initialized. Initialized keyspaces: %s.", delta.before.name, keyspaceInstances.keySet()); + assert delta.before.name.equals(delta.after.name); + schemaChangeNotifier.notifyKeyspaceAltered(delta, loadSSTables); + + // drop tables and views + delta.views.dropped.forEach(v -> SchemaDiagnostics.tableDropped(Schema.instance, v.metadata)); + delta.tables.dropped.forEach(t -> SchemaDiagnostics.tableDropped(Schema.instance, t)); + + // add tables and views + delta.tables.created.forEach(t -> SchemaDiagnostics.tableCreated(Schema.instance, t)); + delta.views.created.forEach(v -> SchemaDiagnostics.tableCreated(Schema.instance, v.metadata)); + + // update tables and views + delta.tables.altered.forEach(diff -> SchemaDiagnostics.tableAltered(Schema.instance, diff.after)); + delta.views.altered.forEach(diff -> SchemaDiagnostics.tableAltered(Schema.instance, diff.after.metadata)); + } + SchemaDiagnostics.keyspaceAltered(Schema.instance, delta); + }); + } + public static void maybeRebuildViews(DistributedSchema prev, DistributedSchema current) { Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), current.getKeyspaces()); @@ -274,7 +308,6 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> // remove the keyspace from the static instances Keyspace unloadedKeyspace = keyspaceInstances.remove(keyspaceMetadata.name); unloadedKeyspace.unload(true); - SchemaDiagnostics.metadataRemoved(Schema.instance, keyspaceMetadata); assert unloadedKeyspace == keyspace; Keyspace.writeOrder.awaitNewBarrier(); @@ -282,10 +315,7 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> else { keyspace.unload(true); - SchemaDiagnostics.metadataRemoved(Schema.instance, keyspaceMetadata); } - - SchemaDiagnostics.keyspaceDropped(Schema.instance, keyspaceMetadata); } /** * @@ -296,35 +326,30 @@ public class DistributedSchema implements MetadataValue<DistributedSchema> { SchemaDiagnostics.tableDropping(Schema.instance, metadata); keyspace.dropCf(metadata.id, dropData); - SchemaDiagnostics.tableDropped(Schema.instance, metadata); } private void createTable(Keyspace keyspace, TableMetadata table, boolean loadSSTables) { SchemaDiagnostics.tableCreating(Schema.instance, table); keyspace.initCf(table, loadSSTables); - SchemaDiagnostics.tableCreated(Schema.instance, table); } private void createView(Keyspace keyspace, ViewMetadata view) { SchemaDiagnostics.tableCreating(Schema.instance, view.metadata); keyspace.initCf(view.metadata, true); - SchemaDiagnostics.tableCreated(Schema.instance, view.metadata); } private void alterTable(Keyspace keyspace, TableMetadata updated) { SchemaDiagnostics.tableAltering(Schema.instance, updated); keyspace.getColumnFamilyStore(updated.name).reload(updated); - SchemaDiagnostics.tableAltered(Schema.instance, updated); } private void alterView(Keyspace keyspace, ViewMetadata updated) { SchemaDiagnostics.tableAltering(Schema.instance, updated.metadata); keyspace.getColumnFamilyStore(updated.name()).reload(updated.metadata); - SchemaDiagnostics.tableAltered(Schema.instance, updated.metadata); } public Keyspaces getKeyspaces() diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java index e3507bf2bb..2dadbb5007 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java @@ -55,6 +55,7 @@ public class SchemaListener implements ChangeListener if (!fromSnapshot && next.schema.lastModified().equals(prev.schema.lastModified())) return; + next.schema.notifyPostCommit(prev.schema, loadSSTables); DistributedSchema.maybeRebuildViews(prev.schema, next.schema); SchemaDiagnostics.versionUpdated(Schema.instance); Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion())); diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java new file mode 100644 index 0000000000..0a12855753 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.TableMetadata; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.junit.Assert.assertEquals; + +public class PreparedStatementInvalidationRaceTest extends TestBaseImpl +{ + @Test + public void testInvalidationRace() throws Exception + { + try (ICluster<IInvokableInstance> c = init(builder().withNodes(1) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .withInstanceInitializer(BBHelper::install) + .start())) + { + try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder() + .addContactPoint("127.0.0.1") + .build(); + Session s = cluster.connect()) + { + s.execute(withKeyspace("CREATE TABLE %s.tbl (pk int primary key)")); + PreparedStatement prepared = s.prepare(withKeyspace("select pk from %s.tbl where pk = ?")); + s.execute(prepared.bind(1)); + c.get(1).runOnInstance(() -> BBHelper.enabled.set(true)); + Thread t = new Thread(() -> s.execute(withKeyspace("alter table %s.tbl add x int"))); + t.start(); + c.get(1).runOnInstance(() -> await(BBHelper.initializeKeyspaceInstancesDone)); + // This is where the race existed before - we used to invalidate the statement in + // initializeKeyspaceInstances, but the schema change is not yet committed so the + // next execute would reprepare with the wrong tablemetadata (and keep it forever). + // Now we invalidate after committing - so the last execute below will reprepare the + // query, with the correct tablemetadata + s.execute(prepared.bind(1)); + c.get(1).runOnInstance(() -> { + BBHelper.enabled.set(false); + BBHelper.delayCommittingSchemaChange.countDown(); + }); + t.join(); + s.execute(prepared.bind(1)); + c.get(1).runOnInstance(() -> { + TableMetadata tableMetadata = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata(); + Collection<QueryHandler.Prepared> serverSidePrepared = QueryProcessor.instance.getPreparedStatements().values(); + assertEquals(1, serverSidePrepared.size()); + for (QueryHandler.Prepared ssp : QueryProcessor.instance.getPreparedStatements().values()) + assertEquals(tableMetadata.epoch, ((SelectStatement)ssp.statement).table.epoch); + }); + } + } + } + + public static class BBHelper + { + static AtomicBoolean enabled = new AtomicBoolean(); + static CountDownLatch delayCommittingSchemaChange = new CountDownLatch(1); + static CountDownLatch initializeKeyspaceInstancesDone = new CountDownLatch(1); + + public static void install(ClassLoader cl, int i) + { + new ByteBuddy().rebase(DistributedSchema.class) + .method(named("initializeKeyspaceInstances").and(takesArguments(2))) + .intercept(MethodDelegation.to(BBHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static void initializeKeyspaceInstances(DistributedSchema prev, boolean loadSSTables, @SuperCall Callable<Void> zuper) + { + try + { + zuper.call(); + if (enabled.get()) + { + initializeKeyspaceInstancesDone.countDown(); + delayCommittingSchemaChange.await(); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + + private static void await(CountDownLatch cdl) + { + try + { + cdl.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org