This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cd4f484fa0696f7faecf6cfb752cfb37ed391035
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 30 16:49:47 2023 +0100

    [CEP-21] Ensure that SchemaTransformation impls correctly set TableMetadata 
epoch
    
    patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov
    for CASSANDRA-18457
---
 .../statements/schema/AlterTableStatement.java     |   4 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |   9 +-
 .../cassandra/tcm/transformations/AlterSchema.java |  49 ++++-
 .../org/apache/cassandra/db/ReadCommandTest.java   |  10 +-
 .../org/apache/cassandra/schema/SchemaTest.java    | 219 +++++++++++++++------
 5 files changed, 226 insertions(+), 65 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index c0f85afa31..7d3ddbeb39 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -378,7 +378,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
-            TableMetadata.Builder builder = table.unbuild().epoch(epoch);
+            TableMetadata.Builder builder = table.unbuild();
             removedColumns.forEach(c -> dropColumn(keyspace, table, c, 
ifColumnExists, builder));
             return 
keyspace.withSwapped(keyspace.tables.withSwapped(builder.build()));
         }
@@ -583,7 +583,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
                             ? ImmutableSet.of(Flag.COMPOUND, Flag.COUNTER)
                             : ImmutableSet.of(Flag.COMPOUND);
 
-            return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.unbuild().epoch(epoch).flags(flags).build()));
+            return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.unbuild().flags(flags).build()));
         }
 
         /**
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index de218d9b67..691b6dc30b 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -70,6 +70,7 @@ import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.TimeUUID;
 
@@ -1008,6 +1009,11 @@ public abstract class ReadCommand extends 
AbstractReadQuery
     @VisibleForTesting
     public static class Serializer implements IVersionedSerializer<ReadCommand>
     {
+        private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS);
+        private static final NoSpamLogger.NoSpamLogStatement 
schemaMismatchStmt =
+            noSpamLogger.getStatement("Schema epoch mismatch during read 
command deserialization. " +
+                                      "TableId: {}, remote epoch: {}, local 
epoch: {}", 10L, TimeUnit.SECONDS);
+
         private final SchemaProvider schema;
 
         public Serializer()
@@ -1104,7 +1110,8 @@ public abstract class ReadCommand extends 
AbstractReadQuery
             Epoch schemaVersion = null;
             if (version >= MessagingService.VERSION_50)
                 schemaVersion = Epoch.serializer.deserialize(in);
-            assert schemaVersion == null || 
metadata.epoch.equals(schemaVersion) : metadata.epoch + " " + schemaVersion; // 
TODO: handle etc
+            if (schemaVersion != null && 
!(metadata.epoch.equals(schemaVersion)))
+                schemaMismatchStmt.info(metadata.id, schemaVersion.getEpoch(), 
metadata.epoch.getEpoch());
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata);
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java 
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index 0e0dec2bf0..667d5410a6 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -19,18 +19,24 @@
 package org.apache.cassandra.tcm.transformations;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaProvider;
 import org.apache.cassandra.schema.SchemaTransformation;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
@@ -41,7 +47,7 @@ public class AlterSchema implements Transformation
 {
     public static final Serializer serializer = new Serializer();
 
-    protected final SchemaTransformation schemaTransformation;
+    public final SchemaTransformation schemaTransformation;
     protected final SchemaProvider schemaProvider;
 
     public AlterSchema(SchemaTransformation schemaTransformation,
@@ -65,16 +71,46 @@ public class AlterSchema implements Transformation
             return new Rejected("Can't have schema changes during ring 
movements: " + prev.lockedRanges.locked);
 
         Keyspaces newKeyspaces;
-
         try
         {
             newKeyspaces = schemaTransformation.apply(prev, 
prev.schema.getKeyspaces());
+            newKeyspaces.forEach(ksm -> {
+               ksm.tables.forEach(tm -> {
+                   if (tm.epoch.isAfter(prev.nextEpoch()))
+                       throw new 
InvalidRequestException(String.format("Invalid schema transformation. " +
+                                                                       
"Resultant epoch for table metadata of %s.%s (%d) " +
+                                                                       "is 
greater than for cluster metadata (%d)",
+                                                                       
ksm.name, tm.name, tm.epoch.getEpoch(),
+                                                                       
prev.nextEpoch().getEpoch()));
+               });
+            });
         }
         catch (ConfigurationException | InvalidRequestException t)
         {
             return new Rejected(t.getMessage());
         }
 
+        // Ensure that any new or modified TableMetadata has the correct epoch
+        Epoch nextEpoch = prev.nextEpoch();
+        Keyspaces.KeyspacesDiff diff = 
Keyspaces.diff(prev.schema.getKeyspaces(), newKeyspaces);
+
+        for (KeyspaceMetadata newKSM : diff.created)
+        {
+            Tables tables = Tables.of(normaliseEpochs(nextEpoch, 
newKSM.tables.stream()));
+            newKeyspaces = 
newKeyspaces.withAddedOrUpdated(newKSM.withSwapped(tables));
+        }
+
+        for (KeyspaceMetadata.KeyspaceDiff alteredKSM : diff.altered)
+        {
+            Tables tables = Tables.of(alteredKSM.after.tables);
+            for (TableMetadata created : normaliseEpochs(nextEpoch, 
alteredKSM.tables.created.stream()))
+                tables = tables.withSwapped(created);
+
+            for (TableMetadata altered : normaliseEpochs(nextEpoch, 
alteredKSM.tables.altered.stream().map(altered -> altered.after)))
+                tables = tables.withSwapped(altered);
+            newKeyspaces = 
newKeyspaces.withAddedOrUpdated(alteredKSM.after.withSwapped(tables));
+        }
+
         DistributedSchema snapshotAfter = new DistributedSchema(newKeyspaces);
 
         // state.schema is a DistributedSchema, so doesn't include local 
keyspaces. If we don't explicitly include those
@@ -90,6 +126,15 @@ public class AlterSchema implements Transformation
         return success(next, LockedRanges.AffectedRanges.EMPTY);
     }
 
+    private static Iterable<TableMetadata> normaliseEpochs(Epoch nextEpoch, 
Stream<TableMetadata> tables)
+    {
+        return tables.map(tm -> tm.epoch.is(nextEpoch)
+                                ? tm
+                                : tm.unbuild().epoch(nextEpoch).build())
+                     .collect(Collectors.toList());
+    }
+
+
     static class Serializer implements 
AsymmetricMetadataSerializer<Transformation, AlterSchema>
     {
         @Override
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index bf272b87cf..fde5650108 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -78,6 +78,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -931,7 +932,14 @@ public class ReadCommandTest
     {
         TableParams newParams = 
cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
         KeyspaceMetadata keyspaceMetadata = 
Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
-        
SchemaTestUtil.addOrUpdateKeyspace(keyspaceMetadata.withSwapped(keyspaceMetadata.tables.withSwapped(cfs.metadata().withSwapped(newParams))),
 true);
+        // Because addOrUpdateKeyspace uses an anonymous SchemaTransformation, 
we need to supply an epoch
+        // to be used when updating the TableMetadata params. This would 
usually be the responsibility of
+        // the SchemaTransformation impl (i.e. a subclass of 
AlterSchemaStatement).
+        Epoch nextEpoch = cfs.metadata().epoch.nextEpoch();
+        SchemaTestUtil.addOrUpdateKeyspace(
+            keyspaceMetadata.withSwapped(
+                keyspaceMetadata.tables.withSwapped(
+                    cfs.metadata().withSwapped(newParams))), true);
     }
 
     private long getAndResetOverreadCount(ColumnFamilyStore cfs)
diff --git a/test/unit/org/apache/cassandra/schema/SchemaTest.java 
b/test/unit/org/apache/cassandra/schema/SchemaTest.java
index 5b22211653..c35b42debc 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaTest.java
@@ -18,26 +18,29 @@
  */
 package org.apache.cassandra.schema;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.function.Predicate;
 
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.schema.AlterSchemaStatement;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class SchemaTest
 {
+    private static final String KS_PREFIX = "schema_test_ks_";
+    private static final String KS_ONE = KS_PREFIX + "1";
+    private static final String KS_TWO = KS_PREFIX + "2";
+
     @BeforeClass
     public static void setup()
     {
@@ -45,69 +48,167 @@ public class SchemaTest
         ServerTestUtils.prepareServer();
     }
 
+    @Before
+    public void clearSchema()
+    {
+        SchemaTestUtil.dropKeyspaceIfExist(KS_ONE, true);
+        SchemaTestUtil.dropKeyspaceIfExist(KS_TWO, true);
+    }
+
     @Test
-    public void testTransKsMigration() throws IOException
+    public void tablesInNewKeyspaceHaveCorrectEpoch()
     {
-        assertEquals(0, Schema.instance.distributedKeyspaces().size());
+        Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"),
+                                  TableMetadata.minimal(KS_ONE, "modified2"));
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
+        applyAndAssertTableMetadata((metadata, schema) -> 
schema.withAddedOrUpdated(ksm), true);
+    }
 
-        Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
-        try
-        {
-            // add a few.
-            saveKeyspaces();
-//            Schema.instance.reloadSchema();
+    @Test
+    public void newTablesInExistingKeyspaceHaveCorrectEpoch()
+    {
+        // Create an empty keyspace
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(ksm));
+
+        // Add two tables and verify that the resultant table metadata has the 
correct epoch
+        Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), 
TableMetadata.minimal(KS_ONE, "modified2"));
+        KeyspaceMetadata updated = ksm.withSwapped(tables);
+        applyAndAssertTableMetadata((metadata, schema) -> 
schema.withAddedOrUpdated(updated), true);
+    }
 
-            assertNotNull(Schema.instance.getKeyspaceMetadata("ks0"));
-            assertNotNull(Schema.instance.getKeyspaceMetadata("ks1"));
+    @Test
+    public void newTablesInNonEmptyKeyspaceHaveCorrectEpoch()
+    {
+        Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified"));
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(ksm));
+
+        // Add a second table and assert that its table metadata has the 
latest epoch, but that the
+        // metadata of the other table stays unmodified
+        KeyspaceMetadata updated = 
ksm.withSwapped(tables.with(TableMetadata.minimal(KS_ONE, "modified1")));
+        applyAndAssertTableMetadata((metadata, schema) -> 
schema.withAddedOrUpdated(updated));
+    }
 
-            Schema.instance.submit((metadata, schema) -> 
schema.without(Arrays.asList("ks0", "ks1")));
+    @Test
+    public void createTableCQLSetsCorrectEpoch()
+    {
+        Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified"));
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(ksm));
 
-            assertNull(Schema.instance.getKeyspaceMetadata("ks0"));
-            assertNull(Schema.instance.getKeyspaceMetadata("ks1"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k 
int PRIMARY KEY)"));
+    }
 
-            saveKeyspaces();
-//            Schema.instance.reloadSchema(); // TODO 
.reloadSchemaAndAnnounceVersion();
+    @Test
+    public void createTablesInMultipleKeyspaces()
+    {
+        KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
+        KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, 
KeyspaceParams.simple(1));
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
+
+        // Add two tables in each ks and verify that the resultant table 
metadata has the correct epoch
+        Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), 
TableMetadata.minimal(KS_ONE, "modified2"));
+        KeyspaceMetadata updated1 = ksm1.withSwapped(tables1);
+        Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "modified1"), 
TableMetadata.minimal(KS_TWO, "modified2"));
+        KeyspaceMetadata updated2 = ksm2.withSwapped(tables2);
+        applyAndAssertTableMetadata((metadata, schema) -> 
schema.withAddedOrUpdated(updated1)
+                                                                
.withAddedOrUpdated(updated2),
+                                    true);
+    }
+
+
+    @Test
+    public void createTablesInMultipleNonEmptyKeyspaces()
+    {
+        KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
+        KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, 
KeyspaceParams.simple(1));
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
+
+        // Add two tables in each ks and verify that the resultant table 
metadata has the correct epoch
+        Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, 
"unmodified1"), TableMetadata.minimal(KS_ONE, "unmodified2"));
+        KeyspaceMetadata updated1 = ksm1.withSwapped(tables1);
+        Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, 
"unmodified1"), TableMetadata.minimal(KS_TWO, "unmodified2"));
+        KeyspaceMetadata updated2 = ksm2.withSwapped(tables2);
+        Schema.instance.submit((metadata, schema) -> 
schema.withAddedOrUpdated(updated1).withAddedOrUpdated(updated2));
+
+        // Add a third table in one ks and assert that its table metadata has 
the latest epoch, but that the
+        // metadata of the all other tables stays unmodified
+        applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k 
int PRIMARY KEY)"));
+    }
 
-            assertNotNull(Schema.instance.getKeyspaceMetadata("ks0"));
-            assertNotNull(Schema.instance.getKeyspaceMetadata("ks1"));
-        }
-        finally
-        {
-            Gossiper.instance.stop();
-        }
+    @Test
+    public void alterTableAndVerifyEpoch()
+    {
+        SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1)), true);
+        Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.unmodified (k int 
PRIMARY KEY)"));
+        Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.modified ( " +
+                                           "k int, " +
+                                           "c1 int, " +
+                                           "v1 text, " +
+                                           "v2 text, " +
+                                           "v3 text, " +
+                                           "v4 text," +
+                                           "PRIMARY KEY(k,c1))"));
+
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified DROP 
v4"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified ADD 
v5 text"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified 
RENAME c1 TO c2"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH 
comment = 'altered'"));
     }
 
     @Test
-    public void testKeyspaceCreationWhenNotInitialized() {
-        Keyspace.unsetInitialized();
-        try
-        {
-            SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("test", 
KeyspaceParams.simple(1)), true);
-            assertNotNull(Schema.instance.getKeyspaceMetadata("test"));
-            assertNull(Schema.instance.getKeyspaceInstance("test"));
-
-            SchemaTestUtil.dropKeyspaceIfExist("test", true);
-            assertNull(Schema.instance.getKeyspaceMetadata("test"));
-            assertNull(Schema.instance.getKeyspaceInstance("test"));
-        }
-        finally
-        {
-            Keyspace.setInitialized();
-        }
-
-        SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("test", 
KeyspaceParams.simple(1)), true);
-        assertNotNull(Schema.instance.getKeyspaceMetadata("test"));
-        assertNotNull(Schema.instance.getKeyspaceInstance("test"));
-
-        SchemaTestUtil.dropKeyspaceIfExist("test", true);
-        assertNull(Schema.instance.getKeyspaceMetadata("test"));
-        assertNull(Schema.instance.getKeyspaceInstance("test"));
+    public void alterTableMultipleKeyspacesAndVerifyEpoch()
+    {
+        SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1)), true);
+        SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_TWO, 
KeyspaceParams.simple(1)), true);
+        Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.unmodified (k int 
PRIMARY KEY)"));
+        Schema.instance.submit(cql(KS_TWO, "CREATE TABLE %s.unmodified (k int 
PRIMARY KEY)"));
+        Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.modified ( " +
+                                           "k int, " +
+                                           "c1 int, " +
+                                           "v1 text, " +
+                                           "v2 text, " +
+                                           "v3 text, " +
+                                           "v4 text," +
+                                           "PRIMARY KEY(k, c1))"));
+
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified DROP 
v4"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified ADD 
v5 text"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified 
RENAME c1 TO c2"));
+        applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH 
comment = 'altered'"));
+    }
+
+    private void applyAndAssertTableMetadata(SchemaTransformation 
transformation)
+    {
+        applyAndAssertTableMetadata(transformation, false);
+    }
+
+    private void applyAndAssertTableMetadata(SchemaTransformation 
transformation, boolean onlyModified)
+    {
+        Epoch before = ClusterMetadata.current().epoch;
+        Schema.instance.submit(transformation);
+        Epoch after = ClusterMetadata.current().epoch;
+        assertTrue(after.isDirectlyAfter(before));
+        DistributedSchema schema = ClusterMetadata.current().schema;
+        Predicate<TableMetadata> modified = (tm) -> 
tm.name.startsWith("modified") && tm.epoch.is(after);
+        Predicate<TableMetadata> predicate = onlyModified
+                                             ? modified
+                                             : modified.or((tm) -> 
tm.name.startsWith("unmodified") && tm.epoch.isBefore(after));
+
+        schema.getKeyspaces().forEach(keyspace -> {
+            if (keyspace.name.startsWith(KS_PREFIX))
+            {
+                boolean containsUnmodified = 
keyspace.tables.stream().anyMatch(tm -> tm.name.startsWith("unmodified"));
+                assertEquals("Expected an unmodified table metadata but none 
found in " + keyspace.name, !onlyModified, containsUnmodified);
+                assertTrue(keyspace.tables.stream().allMatch(predicate));
+            }
+        });
     }
 
-    private void saveKeyspaces()
+    private static AlterSchemaStatement cql(String keyspace, String cql)
     {
-        Collection<Mutation> mutations = 
Arrays.asList(SchemaKeyspace.makeCreateKeyspaceMutation(KeyspaceMetadata.create("ks0",
 KeyspaceParams.simple(3)), FBUtilities.timestampMicros()).build(),
-                                                       
SchemaKeyspace.makeCreateKeyspaceMutation(KeyspaceMetadata.create("ks1", 
KeyspaceParams.simple(3)), FBUtilities.timestampMicros()).build());
-        SchemaKeyspace.applyChanges(mutations);
+        return (AlterSchemaStatement) 
QueryProcessor.parseStatement(String.format(cql, keyspace))
+                                                    
.prepare(ClientState.forInternalCalls());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to