This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit cd4f484fa0696f7faecf6cfb752cfb37ed391035 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 30 16:49:47 2023 +0100 [CEP-21] Ensure that SchemaTransformation impls correctly set TableMetadata epoch patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov for CASSANDRA-18457 --- .../statements/schema/AlterTableStatement.java | 4 +- src/java/org/apache/cassandra/db/ReadCommand.java | 9 +- .../cassandra/tcm/transformations/AlterSchema.java | 49 ++++- .../org/apache/cassandra/db/ReadCommandTest.java | 10 +- .../org/apache/cassandra/schema/SchemaTest.java | 219 +++++++++++++++------ 5 files changed, 226 insertions(+), 65 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index c0f85afa31..7d3ddbeb39 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -378,7 +378,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); - TableMetadata.Builder builder = table.unbuild().epoch(epoch); + TableMetadata.Builder builder = table.unbuild(); removedColumns.forEach(c -> dropColumn(keyspace, table, c, ifColumnExists, builder)); return keyspace.withSwapped(keyspace.tables.withSwapped(builder.build())); } @@ -583,7 +583,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement ? ImmutableSet.of(Flag.COMPOUND, Flag.COUNTER) : ImmutableSet.of(Flag.COMPOUND); - return keyspace.withSwapped(keyspace.tables.withSwapped(table.unbuild().epoch(epoch).flags(flags).build())); + return keyspace.withSwapped(keyspace.tables.withSwapped(table.unbuild().flags(flags).build())); } /** diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index de218d9b67..691b6dc30b 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -70,6 +70,7 @@ import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.TimeUUID; @@ -1008,6 +1009,11 @@ public abstract class ReadCommand extends AbstractReadQuery @VisibleForTesting public static class Serializer implements IVersionedSerializer<ReadCommand> { + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS); + private static final NoSpamLogger.NoSpamLogStatement schemaMismatchStmt = + noSpamLogger.getStatement("Schema epoch mismatch during read command deserialization. " + + "TableId: {}, remote epoch: {}, local epoch: {}", 10L, TimeUnit.SECONDS); + private final SchemaProvider schema; public Serializer() @@ -1104,7 +1110,8 @@ public abstract class ReadCommand extends AbstractReadQuery Epoch schemaVersion = null; if (version >= MessagingService.VERSION_50) schemaVersion = Epoch.serializer.deserialize(in); - assert schemaVersion == null || metadata.epoch.equals(schemaVersion) : metadata.epoch + " " + schemaVersion; // TODO: handle etc + if (schemaVersion != null && !(metadata.epoch.equals(schemaVersion))) + schemaMismatchStmt.info(metadata.id, schemaVersion.getEpoch(), metadata.epoch.getEpoch()); int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java index 0e0dec2bf0..667d5410a6 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java +++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java @@ -19,18 +19,24 @@ package org.apache.cassandra.tcm.transformations; import java.io.IOException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaProvider; import org.apache.cassandra.schema.SchemaTransformation; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.sequences.LockedRanges; @@ -41,7 +47,7 @@ public class AlterSchema implements Transformation { public static final Serializer serializer = new Serializer(); - protected final SchemaTransformation schemaTransformation; + public final SchemaTransformation schemaTransformation; protected final SchemaProvider schemaProvider; public AlterSchema(SchemaTransformation schemaTransformation, @@ -65,16 +71,46 @@ public class AlterSchema implements Transformation return new Rejected("Can't have schema changes during ring movements: " + prev.lockedRanges.locked); Keyspaces newKeyspaces; - try { newKeyspaces = schemaTransformation.apply(prev, prev.schema.getKeyspaces()); + newKeyspaces.forEach(ksm -> { + ksm.tables.forEach(tm -> { + if (tm.epoch.isAfter(prev.nextEpoch())) + throw new InvalidRequestException(String.format("Invalid schema transformation. " + + "Resultant epoch for table metadata of %s.%s (%d) " + + "is greater than for cluster metadata (%d)", + ksm.name, tm.name, tm.epoch.getEpoch(), + prev.nextEpoch().getEpoch())); + }); + }); } catch (ConfigurationException | InvalidRequestException t) { return new Rejected(t.getMessage()); } + // Ensure that any new or modified TableMetadata has the correct epoch + Epoch nextEpoch = prev.nextEpoch(); + Keyspaces.KeyspacesDiff diff = Keyspaces.diff(prev.schema.getKeyspaces(), newKeyspaces); + + for (KeyspaceMetadata newKSM : diff.created) + { + Tables tables = Tables.of(normaliseEpochs(nextEpoch, newKSM.tables.stream())); + newKeyspaces = newKeyspaces.withAddedOrUpdated(newKSM.withSwapped(tables)); + } + + for (KeyspaceMetadata.KeyspaceDiff alteredKSM : diff.altered) + { + Tables tables = Tables.of(alteredKSM.after.tables); + for (TableMetadata created : normaliseEpochs(nextEpoch, alteredKSM.tables.created.stream())) + tables = tables.withSwapped(created); + + for (TableMetadata altered : normaliseEpochs(nextEpoch, alteredKSM.tables.altered.stream().map(altered -> altered.after))) + tables = tables.withSwapped(altered); + newKeyspaces = newKeyspaces.withAddedOrUpdated(alteredKSM.after.withSwapped(tables)); + } + DistributedSchema snapshotAfter = new DistributedSchema(newKeyspaces); // state.schema is a DistributedSchema, so doesn't include local keyspaces. If we don't explicitly include those @@ -90,6 +126,15 @@ public class AlterSchema implements Transformation return success(next, LockedRanges.AffectedRanges.EMPTY); } + private static Iterable<TableMetadata> normaliseEpochs(Epoch nextEpoch, Stream<TableMetadata> tables) + { + return tables.map(tm -> tm.epoch.is(nextEpoch) + ? tm + : tm.unbuild().epoch(nextEpoch).build()) + .collect(Collectors.toList()); + } + + static class Serializer implements AsymmetricMetadataSerializer<Transformation, AlterSchema> { @Override diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index bf272b87cf..fde5650108 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -78,6 +78,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -931,7 +932,14 @@ public class ReadCommandTest { TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build(); KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace); - SchemaTestUtil.addOrUpdateKeyspace(keyspaceMetadata.withSwapped(keyspaceMetadata.tables.withSwapped(cfs.metadata().withSwapped(newParams))), true); + // Because addOrUpdateKeyspace uses an anonymous SchemaTransformation, we need to supply an epoch + // to be used when updating the TableMetadata params. This would usually be the responsibility of + // the SchemaTransformation impl (i.e. a subclass of AlterSchemaStatement). + Epoch nextEpoch = cfs.metadata().epoch.nextEpoch(); + SchemaTestUtil.addOrUpdateKeyspace( + keyspaceMetadata.withSwapped( + keyspaceMetadata.tables.withSwapped( + cfs.metadata().withSwapped(newParams))), true); } private long getAndResetOverreadCount(ColumnFamilyStore cfs) diff --git a/test/unit/org/apache/cassandra/schema/SchemaTest.java b/test/unit/org/apache/cassandra/schema/SchemaTest.java index 5b22211653..c35b42debc 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaTest.java @@ -18,26 +18,29 @@ */ package org.apache.cassandra.schema; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; +import java.util.function.Predicate; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.schema.AlterSchemaStatement; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class SchemaTest { + private static final String KS_PREFIX = "schema_test_ks_"; + private static final String KS_ONE = KS_PREFIX + "1"; + private static final String KS_TWO = KS_PREFIX + "2"; + @BeforeClass public static void setup() { @@ -45,69 +48,167 @@ public class SchemaTest ServerTestUtils.prepareServer(); } + @Before + public void clearSchema() + { + SchemaTestUtil.dropKeyspaceIfExist(KS_ONE, true); + SchemaTestUtil.dropKeyspaceIfExist(KS_TWO, true); + } + @Test - public void testTransKsMigration() throws IOException + public void tablesInNewKeyspaceHaveCorrectEpoch() { - assertEquals(0, Schema.instance.distributedKeyspaces().size()); + Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), + TableMetadata.minimal(KS_ONE, "modified2")); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); + applyAndAssertTableMetadata((metadata, schema) -> schema.withAddedOrUpdated(ksm), true); + } - Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); - try - { - // add a few. - saveKeyspaces(); -// Schema.instance.reloadSchema(); + @Test + public void newTablesInExistingKeyspaceHaveCorrectEpoch() + { + // Create an empty keyspace + KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(ksm)); + + // Add two tables and verify that the resultant table metadata has the correct epoch + Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), TableMetadata.minimal(KS_ONE, "modified2")); + KeyspaceMetadata updated = ksm.withSwapped(tables); + applyAndAssertTableMetadata((metadata, schema) -> schema.withAddedOrUpdated(updated), true); + } - assertNotNull(Schema.instance.getKeyspaceMetadata("ks0")); - assertNotNull(Schema.instance.getKeyspaceMetadata("ks1")); + @Test + public void newTablesInNonEmptyKeyspaceHaveCorrectEpoch() + { + Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified")); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(ksm)); + + // Add a second table and assert that its table metadata has the latest epoch, but that the + // metadata of the other table stays unmodified + KeyspaceMetadata updated = ksm.withSwapped(tables.with(TableMetadata.minimal(KS_ONE, "modified1"))); + applyAndAssertTableMetadata((metadata, schema) -> schema.withAddedOrUpdated(updated)); + } - Schema.instance.submit((metadata, schema) -> schema.without(Arrays.asList("ks0", "ks1"))); + @Test + public void createTableCQLSetsCorrectEpoch() + { + Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified")); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(ksm)); - assertNull(Schema.instance.getKeyspaceMetadata("ks0")); - assertNull(Schema.instance.getKeyspaceMetadata("ks1")); + applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k int PRIMARY KEY)")); + } - saveKeyspaces(); -// Schema.instance.reloadSchema(); // TODO .reloadSchemaAndAnnounceVersion(); + @Test + public void createTablesInMultipleKeyspaces() + { + KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); + KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, KeyspaceParams.simple(1)); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); + + // Add two tables in each ks and verify that the resultant table metadata has the correct epoch + Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), TableMetadata.minimal(KS_ONE, "modified2")); + KeyspaceMetadata updated1 = ksm1.withSwapped(tables1); + Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "modified1"), TableMetadata.minimal(KS_TWO, "modified2")); + KeyspaceMetadata updated2 = ksm2.withSwapped(tables2); + applyAndAssertTableMetadata((metadata, schema) -> schema.withAddedOrUpdated(updated1) + .withAddedOrUpdated(updated2), + true); + } + + + @Test + public void createTablesInMultipleNonEmptyKeyspaces() + { + KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); + KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, KeyspaceParams.simple(1)); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); + + // Add two tables in each ks and verify that the resultant table metadata has the correct epoch + Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified1"), TableMetadata.minimal(KS_ONE, "unmodified2")); + KeyspaceMetadata updated1 = ksm1.withSwapped(tables1); + Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "unmodified1"), TableMetadata.minimal(KS_TWO, "unmodified2")); + KeyspaceMetadata updated2 = ksm2.withSwapped(tables2); + Schema.instance.submit((metadata, schema) -> schema.withAddedOrUpdated(updated1).withAddedOrUpdated(updated2)); + + // Add a third table in one ks and assert that its table metadata has the latest epoch, but that the + // metadata of the all other tables stays unmodified + applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k int PRIMARY KEY)")); + } - assertNotNull(Schema.instance.getKeyspaceMetadata("ks0")); - assertNotNull(Schema.instance.getKeyspaceMetadata("ks1")); - } - finally - { - Gossiper.instance.stop(); - } + @Test + public void alterTableAndVerifyEpoch() + { + SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)), true); + Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.unmodified (k int PRIMARY KEY)")); + Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.modified ( " + + "k int, " + + "c1 int, " + + "v1 text, " + + "v2 text, " + + "v3 text, " + + "v4 text," + + "PRIMARY KEY(k,c1))")); + + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified DROP v4")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified ADD v5 text")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified RENAME c1 TO c2")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH comment = 'altered'")); } @Test - public void testKeyspaceCreationWhenNotInitialized() { - Keyspace.unsetInitialized(); - try - { - SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("test", KeyspaceParams.simple(1)), true); - assertNotNull(Schema.instance.getKeyspaceMetadata("test")); - assertNull(Schema.instance.getKeyspaceInstance("test")); - - SchemaTestUtil.dropKeyspaceIfExist("test", true); - assertNull(Schema.instance.getKeyspaceMetadata("test")); - assertNull(Schema.instance.getKeyspaceInstance("test")); - } - finally - { - Keyspace.setInitialized(); - } - - SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create("test", KeyspaceParams.simple(1)), true); - assertNotNull(Schema.instance.getKeyspaceMetadata("test")); - assertNotNull(Schema.instance.getKeyspaceInstance("test")); - - SchemaTestUtil.dropKeyspaceIfExist("test", true); - assertNull(Schema.instance.getKeyspaceMetadata("test")); - assertNull(Schema.instance.getKeyspaceInstance("test")); + public void alterTableMultipleKeyspacesAndVerifyEpoch() + { + SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)), true); + SchemaTestUtil.addOrUpdateKeyspace(KeyspaceMetadata.create(KS_TWO, KeyspaceParams.simple(1)), true); + Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.unmodified (k int PRIMARY KEY)")); + Schema.instance.submit(cql(KS_TWO, "CREATE TABLE %s.unmodified (k int PRIMARY KEY)")); + Schema.instance.submit(cql(KS_ONE, "CREATE TABLE %s.modified ( " + + "k int, " + + "c1 int, " + + "v1 text, " + + "v2 text, " + + "v3 text, " + + "v4 text," + + "PRIMARY KEY(k, c1))")); + + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified DROP v4")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified ADD v5 text")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified RENAME c1 TO c2")); + applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH comment = 'altered'")); + } + + private void applyAndAssertTableMetadata(SchemaTransformation transformation) + { + applyAndAssertTableMetadata(transformation, false); + } + + private void applyAndAssertTableMetadata(SchemaTransformation transformation, boolean onlyModified) + { + Epoch before = ClusterMetadata.current().epoch; + Schema.instance.submit(transformation); + Epoch after = ClusterMetadata.current().epoch; + assertTrue(after.isDirectlyAfter(before)); + DistributedSchema schema = ClusterMetadata.current().schema; + Predicate<TableMetadata> modified = (tm) -> tm.name.startsWith("modified") && tm.epoch.is(after); + Predicate<TableMetadata> predicate = onlyModified + ? modified + : modified.or((tm) -> tm.name.startsWith("unmodified") && tm.epoch.isBefore(after)); + + schema.getKeyspaces().forEach(keyspace -> { + if (keyspace.name.startsWith(KS_PREFIX)) + { + boolean containsUnmodified = keyspace.tables.stream().anyMatch(tm -> tm.name.startsWith("unmodified")); + assertEquals("Expected an unmodified table metadata but none found in " + keyspace.name, !onlyModified, containsUnmodified); + assertTrue(keyspace.tables.stream().allMatch(predicate)); + } + }); } - private void saveKeyspaces() + private static AlterSchemaStatement cql(String keyspace, String cql) { - Collection<Mutation> mutations = Arrays.asList(SchemaKeyspace.makeCreateKeyspaceMutation(KeyspaceMetadata.create("ks0", KeyspaceParams.simple(3)), FBUtilities.timestampMicros()).build(), - SchemaKeyspace.makeCreateKeyspaceMutation(KeyspaceMetadata.create("ks1", KeyspaceParams.simple(3)), FBUtilities.timestampMicros()).build()); - SchemaKeyspace.applyChanges(mutations); + return (AlterSchemaStatement) QueryProcessor.parseStatement(String.format(cql, keyspace)) + .prepare(ClientState.forInternalCalls()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
