This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9bf1680b1f1c016717381aa59367950fa770f57f
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Nov 27 08:47:42 2024 +0100

    Avoid prepared statement invalidation race when committing schema changes
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20116
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/schema/DistributedSchema.java |  69 ++++++----
 .../cassandra/tcm/listeners/SchemaListener.java    |   1 +
 .../PreparedStatementInvalidationRaceTest.java     | 139 +++++++++++++++++++++
 4 files changed, 188 insertions(+), 22 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b3c4f13bec..4d2ef6c3d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Avoid prepared statement invalidation race when committing schema changes 
(CASSANDRA-20116)
  * Restore optimization in MultiCBuilder around building one clustering 
(CASSANDRA-20129)
  * Consolidate all snapshot management to SnapshotManager and introduce 
SnapshotManagerMBean (CASSANDRA-18111)
  * Fix RequestFailureReason constants codes (CASSANDRA-20126)
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java 
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index e10c1e0049..ec0c65eb3a 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -187,16 +187,8 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         SchemaChangeNotifier schemaChangeNotifier = 
Schema.instance.schemaChangeNotifier();
         schemaChangeNotifier.notifyPreChanges(new 
SchemaTransformation.SchemaTransformationResult(prev, this, ksDiff));
 
-        ksDiff.dropped.forEach(metadata -> {
-            schemaChangeNotifier.notifyKeyspaceDropped(metadata, loadSSTables);
-            dropKeyspace(metadata, true);
-        });
-
-        ksDiff.created.forEach(metadata -> {
-            schemaChangeNotifier.notifyKeyspaceCreated(metadata);
-            keyspaceInstances.put(metadata.name, new Keyspace(Schema.instance, 
metadata, loadSSTables));
-        });
-
+        ksDiff.dropped.forEach(metadata -> dropKeyspace(metadata, true));
+        ksDiff.created.forEach(metadata -> 
keyspaceInstances.put(metadata.name, new Keyspace(Schema.instance, metadata, 
loadSSTables)));
         ksDiff.altered.forEach(delta -> {
             boolean initialized = Keyspace.isInitialized();
 
@@ -218,12 +210,9 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
                 delta.tables.altered.forEach(diff -> alterTable(keyspace, 
diff.after));
                 delta.views.altered.forEach(diff -> alterView(keyspace, 
diff.after));
 
-                schemaChangeNotifier.notifyKeyspaceAltered(delta, 
loadSSTables);
                 // deal with all added, and altered views
                 
keyspace.viewManager.reload(keyspaces.get(keyspace.getName()).get());
             }
-
-            SchemaDiagnostics.keyspaceAltered(Schema.instance, delta);
         });
 
         // Avoid system table side effects during initialization
@@ -234,6 +223,51 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         }
     }
 
+    public void notifyPostCommit(DistributedSchema prevSchema, boolean 
loadSSTables)
+    {
+        if (!prevSchema.isEmpty() && prevSchema.keyspaceInstances.isEmpty())
+            prevSchema = DistributedSchema.empty();
+
+        Keyspaces.KeyspacesDiff ksDiff = 
Keyspaces.diff(prevSchema.getKeyspaces(), getKeyspaces());
+
+        SchemaChangeNotifier schemaChangeNotifier = 
Schema.instance.schemaChangeNotifier();
+
+        ksDiff.dropped.forEach(metadata -> {
+            schemaChangeNotifier.notifyKeyspaceDropped(metadata, loadSSTables);
+            if (Keyspace.isInitialized())
+            {
+                metadata.views.forEach((tableMetadata) -> 
SchemaDiagnostics.tableDropped(Schema.instance, tableMetadata.metadata));
+                metadata.tables.forEach((tableMetadata) -> 
SchemaDiagnostics.tableDropped(Schema.instance, tableMetadata));
+                SchemaDiagnostics.metadataRemoved(Schema.instance, metadata);
+                SchemaDiagnostics.keyspaceDropped(Schema.instance, metadata);
+            }
+        });
+        ksDiff.created.forEach(schemaChangeNotifier::notifyKeyspaceCreated);
+        ksDiff.altered.forEach(delta -> {
+            boolean initialized = Keyspace.isInitialized();
+            Keyspace keyspace = initialized ? 
keyspaceInstances.get(delta.before.name) : null;
+            if (initialized)
+            {
+                assert keyspace != null : String.format("Keyspace %s is not 
initialized. Initialized keyspaces: %s.", delta.before.name, 
keyspaceInstances.keySet());
+                assert delta.before.name.equals(delta.after.name);
+                schemaChangeNotifier.notifyKeyspaceAltered(delta, 
loadSSTables);
+
+                // drop tables and views
+                delta.views.dropped.forEach(v -> 
SchemaDiagnostics.tableDropped(Schema.instance, v.metadata));
+                delta.tables.dropped.forEach(t -> 
SchemaDiagnostics.tableDropped(Schema.instance, t));
+
+                // add tables and views
+                delta.tables.created.forEach(t -> 
SchemaDiagnostics.tableCreated(Schema.instance, t));
+                delta.views.created.forEach(v -> 
SchemaDiagnostics.tableCreated(Schema.instance, v.metadata));
+
+                // update tables and views
+                delta.tables.altered.forEach(diff -> 
SchemaDiagnostics.tableAltered(Schema.instance, diff.after));
+                delta.views.altered.forEach(diff -> 
SchemaDiagnostics.tableAltered(Schema.instance, diff.after.metadata));
+            }
+            SchemaDiagnostics.keyspaceAltered(Schema.instance, delta);
+        });
+    }
+
     public static void maybeRebuildViews(DistributedSchema prev, 
DistributedSchema current)
     {
         Keyspaces.KeyspacesDiff ksDiff = Keyspaces.diff(prev.getKeyspaces(), 
current.getKeyspaces());
@@ -274,7 +308,6 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
             // remove the keyspace from the static instances
             Keyspace unloadedKeyspace = 
keyspaceInstances.remove(keyspaceMetadata.name);
             unloadedKeyspace.unload(true);
-            SchemaDiagnostics.metadataRemoved(Schema.instance, 
keyspaceMetadata);
             assert unloadedKeyspace == keyspace;
 
             Keyspace.writeOrder.awaitNewBarrier();
@@ -282,10 +315,7 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         else
         {
             keyspace.unload(true);
-            SchemaDiagnostics.metadataRemoved(Schema.instance, 
keyspaceMetadata);
         }
-
-        SchemaDiagnostics.keyspaceDropped(Schema.instance, keyspaceMetadata);
     }
     /**
      *
@@ -296,35 +326,30 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
     {
         SchemaDiagnostics.tableDropping(Schema.instance, metadata);
         keyspace.dropCf(metadata.id, dropData);
-        SchemaDiagnostics.tableDropped(Schema.instance, metadata);
     }
 
     private void createTable(Keyspace keyspace, TableMetadata table, boolean 
loadSSTables)
     {
         SchemaDiagnostics.tableCreating(Schema.instance, table);
         keyspace.initCf(table, loadSSTables);
-        SchemaDiagnostics.tableCreated(Schema.instance, table);
     }
 
     private void createView(Keyspace keyspace, ViewMetadata view)
     {
         SchemaDiagnostics.tableCreating(Schema.instance, view.metadata);
         keyspace.initCf(view.metadata, true);
-        SchemaDiagnostics.tableCreated(Schema.instance, view.metadata);
     }
 
     private void alterTable(Keyspace keyspace, TableMetadata updated)
     {
         SchemaDiagnostics.tableAltering(Schema.instance, updated);
         keyspace.getColumnFamilyStore(updated.name).reload(updated);
-        SchemaDiagnostics.tableAltered(Schema.instance, updated);
     }
 
     private void alterView(Keyspace keyspace, ViewMetadata updated)
     {
         SchemaDiagnostics.tableAltering(Schema.instance, updated.metadata);
         keyspace.getColumnFamilyStore(updated.name()).reload(updated.metadata);
-        SchemaDiagnostics.tableAltered(Schema.instance, updated.metadata);
     }
 
     public Keyspaces getKeyspaces()
diff --git a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
index e3507bf2bb..2dadbb5007 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/SchemaListener.java
@@ -55,6 +55,7 @@ public class SchemaListener implements ChangeListener
         if (!fromSnapshot && 
next.schema.lastModified().equals(prev.schema.lastModified()))
             return;
 
+        next.schema.notifyPostCommit(prev.schema, loadSSTables);
         DistributedSchema.maybeRebuildViews(prev.schema, next.schema);
         SchemaDiagnostics.versionUpdated(Schema.instance);
         Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.schema(next.schema.getVersion()));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java
new file mode 100644
index 0000000000..0a12855753
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreparedStatementInvalidationRaceTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+
+public class PreparedStatementInvalidationRaceTest extends TestBaseImpl
+{
+    @Test
+    public void testInvalidationRace() throws Exception
+    {
+        try (ICluster<IInvokableInstance> c = init(builder().withNodes(1)
+                                                            .withConfig(config 
-> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                                            
.withInstanceInitializer(BBHelper::install)
+                                                            .start()))
+        {
+            try (com.datastax.driver.core.Cluster cluster = 
com.datastax.driver.core.Cluster.builder()
+                                                                               
             .addContactPoint("127.0.0.1")
+                                                                               
             .build();
+                 Session s = cluster.connect())
+            {
+                s.execute(withKeyspace("CREATE TABLE %s.tbl (pk int primary 
key)"));
+                PreparedStatement prepared = s.prepare(withKeyspace("select pk 
from %s.tbl where pk = ?"));
+                s.execute(prepared.bind(1));
+                c.get(1).runOnInstance(() -> BBHelper.enabled.set(true));
+                Thread t = new Thread(() -> s.execute(withKeyspace("alter 
table %s.tbl add x int")));
+                t.start();
+                c.get(1).runOnInstance(() -> 
await(BBHelper.initializeKeyspaceInstancesDone));
+                // This is where the race existed before - we used to 
invalidate the statement in
+                // initializeKeyspaceInstances, but the schema change is not 
yet committed so the
+                // next execute would reprepare with the wrong tablemetadata 
(and keep it forever).
+                // Now we invalidate after committing - so the last execute 
below will reprepare the
+                // query, with the correct tablemetadata
+                s.execute(prepared.bind(1));
+                c.get(1).runOnInstance(() -> {
+                    BBHelper.enabled.set(false);
+                    BBHelper.delayCommittingSchemaChange.countDown();
+                });
+                t.join();
+                s.execute(prepared.bind(1));
+                c.get(1).runOnInstance(() -> {
+                    TableMetadata tableMetadata = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata();
+                    Collection<QueryHandler.Prepared> serverSidePrepared = 
QueryProcessor.instance.getPreparedStatements().values();
+                    assertEquals(1, serverSidePrepared.size());
+                    for (QueryHandler.Prepared ssp : 
QueryProcessor.instance.getPreparedStatements().values())
+                        assertEquals(tableMetadata.epoch, 
((SelectStatement)ssp.statement).table.epoch);
+                });
+            }
+        }
+    }
+
+    public static class BBHelper
+    {
+        static AtomicBoolean enabled = new AtomicBoolean();
+        static CountDownLatch delayCommittingSchemaChange = new 
CountDownLatch(1);
+        static CountDownLatch initializeKeyspaceInstancesDone = new 
CountDownLatch(1);
+
+        public static void install(ClassLoader cl, int i)
+        {
+            new ByteBuddy().rebase(DistributedSchema.class)
+                           
.method(named("initializeKeyspaceInstances").and(takesArguments(2)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static void initializeKeyspaceInstances(DistributedSchema prev, 
boolean loadSSTables, @SuperCall Callable<Void> zuper)
+        {
+            try
+            {
+                zuper.call();
+                if (enabled.get())
+                {
+                    initializeKeyspaceInstancesDone.countDown();
+                    delayCommittingSchemaChange.await();
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private static void await(CountDownLatch cdl)
+    {
+        try
+        {
+            cdl.await();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to