This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 06440e947a8986f53739ac2cea4f0bd7add8e087
Merge: 55db3181fb 7f92e1ac2c
Author: Dmitry Konstantinov <netud...@gmail.com>
AuthorDate: Wed Sep 10 23:37:50 2025 +0100

    Merge branch 'cassandra-5.0' into trunk
    
    * cassandra-5.0:
      Prevent too long table names not fitting file names

 CHANGES.txt                                        |  1 +
 .../statements/schema/AlterSchemaStatement.java    | 15 ++---
 .../statements/schema/CreateIndexStatement.java    | 17 ++++--
 src/java/org/apache/cassandra/db/Directories.java  | 10 ++--
 .../org/apache/cassandra/schema/IndexMetadata.java | 21 +++----
 .../apache/cassandra/schema/KeyspaceMetadata.java  | 30 +++++++---
 .../apache/cassandra/schema/SchemaConstants.java   | 29 ++++++++-
 .../org/apache/cassandra/schema/TableMetadata.java | 70 +++++++++++++++++++---
 .../index/sai/cql/StorageAttachedIndexDDLTest.java |  3 +-
 .../schema/CreateTableValidationTest.java          | 40 +++++++++++++
 .../apache/cassandra/schema/IndexMetadataTest.java | 25 +-------
 .../apache/cassandra/schema/SchemaChangesTest.java | 51 ++++++++++------
 .../apache/cassandra/schema/ValidationTest.java    | 16 ++---
 13 files changed, 223 insertions(+), 105 deletions(-)

diff --cc CHANGES.txt
index febc9d5da0,93cc0ead34..e3fd000fbd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -304,16 -81,7 +304,17 @@@ Merged from 4.1
   * Optionally skip exception logging on invalid legacy protocol magic 
exception (CASSANDRA-19483)
   * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202)
   * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking 
thread is interrupted (CASSANDRA-20084)
 + * Run audit_logging_options through santiation and validation on startup 
(CASSANDRA-20208)
 + * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052)
 + * Fix race condition in DecayingEstimatedHistogramReservoir during rescale 
(CASSANDRA-19365)
  Merged from 4.0:
++ * Prevent too long table names not fitting file names (CASSANDRA-20389)
 + * Update Jackson to 2.19.2 (CASSANDRA-20848)
 + * Update commons-lang3 to 3.18.0 (CASSANDRA-20849)
 + * Add NativeTransportMaxConcurrentConnectionsPerIp to StorageProxyMBean 
(CASSANDRA-20642)
 + * Make secondary index implementations notified about rows in fully expired 
SSTables in compaction (CASSANDRA-20829)
 + * Ensure prepared_statement INSERT timestamp precedes eviction DELETE 
(CASSANDRA-19703)
 + * Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields (CASSANDRA-20659)
   * Handle sstable metadata stats file getting a new mtime after compaction 
has finished (CASSANDRA-18119)
   * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
   * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --cc 
src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
index 3901e65cfb,ca4babc204..f67f5db0e3
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
@@@ -46,17 -38,12 +46,19 @@@ import org.apache.cassandra.transport.D
  import org.apache.cassandra.transport.Event.SchemaChange;
  import org.apache.cassandra.transport.messages.ResultMessage;
  
+ import static 
org.apache.cassandra.schema.KeyspaceMetadata.validateKeyspaceName;
+ 
  abstract public class AlterSchemaStatement implements 
CQLStatement.SingleKeyspaceCqlStatement, SchemaTransformation
  {
 +    private static final Logger logger = 
LoggerFactory.getLogger(AlterSchemaStatement.class);
 +
      protected final String keyspaceName; // name of the keyspace affected by 
the statement
      protected ClientState state;
 +    // TODO: not sure if this is going to stay the same, or will be replaced 
by more efficient serialization/sanitation means
 +    // or just `toString` for every statement
 +    private String cql;
 +    public static final long NO_EXECUTION_TIMESTAMP = -1;
 +    private long executionTimestamp = NO_EXECUTION_TIMESTAMP;
  
      protected AlterSchemaStatement(String keyspaceName)
      {
@@@ -168,25 -113,13 +170,26 @@@
          if (null != keyspace && keyspace.isVirtual())
              throw ire("Virtual keyspace '%s' is not user-modifiable", 
keyspaceName);
  
-         validateKeyspaceName();
+         validateKeyspaceName(keyspaceName, AlterSchemaStatement::ire);
+ 
 -        SchemaTransformationResult result = Schema.instance.transform(this, 
locally);
 +        setExecutionTimestamp(state.getTimestamp());
 +        // Perform a 'dry-run' attempt to apply the transformation locally 
before submitting to the CMS. This can save a
 +        // round trip to the CMS for things syntax errors, but also fail fast 
for things like configuration errors.
 +        // Such failures may be dependent on the specific node's config (for 
things like guardrails/memtable
 +        // config/etc), but executing a schema change which has already been 
committed by the CMS should always succeed
 +        // or else the node cannot make progress on any subsequent metadata 
changes. For this reason, validation errors
 +        // during execution are trapped and the node will fall back to safe 
default config wherever possible. Attempting
 +        // to apply the SchemaTransformation at this point will catch any 
such error which occurs locally before
 +        // submission to the CMS, but it can't guarantee that the statement 
can be applied as-is on every node in the
 +        // cluster, as config can be heterogenous falling back to safe 
defaults may occur on some nodes.
 +        ClusterMetadata metadata = ClusterMetadata.current();
 +        apply(metadata);
 +        ClusterMetadata result = commit(metadata);
  
 -        clientWarnings(result.diff).forEach(ClientWarn.instance::warn);
 +        KeyspacesDiff diff = Keyspaces.diff(metadata.schema.getKeyspaces(), 
result.schema.getKeyspaces());
 +        clientWarnings(diff).forEach(ClientWarn.instance::warn);
  
 -        if (result.diff.isEmpty())
 +        if (diff.isEmpty())
              return new ResultMessage.Void();
  
          /*
@@@ -198,29 -131,11 +201,19 @@@
           */
          AuthenticatedUser user = state.getClientState().getUser();
          if (null != user && !user.isAnonymous())
 -            createdResources(result.diff).forEach(r -> 
grantPermissionsOnResource(r, user));
 +            createdResources(diff).forEach(r -> grantPermissionsOnResource(r, 
user));
 +
 +        // if the changes affected accord, wait for accord to apply them
 +        AccordTopology.awaitTopologyReadiness(diff, result.epoch);
  
 -        return new ResultMessage.SchemaChange(schemaChangeEvent(result.diff));
 +        return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
 +    }
 +
 +    protected ClusterMetadata commit(ClusterMetadata metadata)
 +    {
 +        return Schema.instance.submit(this);
      }
  
-     private void validateKeyspaceName()
-     {
-         if (!SchemaConstants.isValidName(keyspaceName))
-         {
-             throw ire("Keyspace name must not be empty, more than %d 
characters long, " +
-                       "or contain non-alphanumeric-underscore characters (got 
'%s')",
-                       SchemaConstants.NAME_LENGTH, keyspaceName);
-         }
-     }
- 
      protected void validateDefaultTimeToLive(TableParams params)
      {
          if (params.defaultTimeToLive == 0
diff --cc src/java/org/apache/cassandra/schema/IndexMetadata.java
index e95b7ad56d,08433fb204..0eee4f7832
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@@ -20,15 -20,8 +20,14 @@@ package org.apache.cassandra.schema
  
  import java.io.IOException;
  import java.lang.reflect.InvocationTargetException;
 -import java.util.*;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.TreeSet;
 +import java.util.UUID;
  import java.util.concurrent.ConcurrentHashMap;
- import java.util.regex.Pattern;
  import java.util.stream.Collectors;
  
  import com.google.common.base.Objects;
@@@ -54,8 -46,8 +53,10 @@@ import org.apache.cassandra.tcm.seriali
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.UUIDSerializer;
  
 +import static org.apache.cassandra.db.TypeSizes.sizeof;
 +import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
+ import static 
org.apache.cassandra.schema.SchemaConstants.PATTERN_NON_WORD_CHAR;
+ import static org.apache.cassandra.schema.SchemaConstants.isValidCharsName;
  
  /**
   * An immutable representation of secondary index metadata.
@@@ -64,12 -56,7 +65,8 @@@ public final class IndexMetadat
  {
      private static final Logger logger = 
LoggerFactory.getLogger(IndexMetadata.class);
  
-     private static final Pattern PATTERN_NON_WORD_CHAR = 
Pattern.compile("\\W");
-     private static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+");
- 
- 
      public static final Serializer serializer = new Serializer();
 +    public static final MetadataSerializer metadataSerializer = new 
MetadataSerializer();
  
      /**
       * A mapping of user-friendly index names to their fully qualified index 
class names.
@@@ -117,24 -104,19 +114,19 @@@
      {
          Map<String, String> newOptions = new HashMap<>(options);
          newOptions.put(IndexTarget.TARGET_OPTION_NAME, targets.stream()
--                                                              .map(target -> 
target.asCqlString())
++                                                              
.map(IndexTarget::asCqlString)
                                                                
.collect(Collectors.joining(", ")));
          return new IndexMetadata(name, newOptions, kind);
      }
  
-     public static boolean isNameValid(String name)
-     {
-         return name != null && !name.isEmpty() && 
PATTERN_WORD_CHARS.matcher(name).matches();
-     }
- 
      public static String generateDefaultIndexName(String table, 
ColumnIdentifier column)
      {
--        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + column.toString() 
+ "_idx").replaceAll("");
++        return PATTERN_NON_WORD_CHAR.matcher(table + '_' + column.toString() 
+ "_idx").replaceAll("");
      }
  
      public static String generateDefaultIndexName(String table)
      {
--        return PATTERN_NON_WORD_CHAR.matcher(table + "_" + 
"idx").replaceAll("");
++        return PATTERN_NON_WORD_CHAR.matcher(table + "_idx").replaceAll("");
      }
  
      public void validate(TableMetadata table)
diff --cc src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
index 16a5c1b67a,4bc3452c79..bfd0f54383
--- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java
@@@ -39,29 -36,41 +39,48 @@@ import org.apache.cassandra.cql3.functi
  import org.apache.cassandra.cql3.functions.UDFunction;
  import org.apache.cassandra.db.marshal.UserType;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.exceptions.RequestValidationException;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.schema.UserFunctions.FunctionsDiff;
 +import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 +import org.apache.cassandra.tcm.serialization.Version;
  import org.apache.cassandra.schema.Tables.TablesDiff;
  import org.apache.cassandra.schema.Types.TypesDiff;
  import org.apache.cassandra.schema.Views.ViewsDiff;
 -import org.apache.cassandra.service.StorageService;
 -
 -import static java.lang.String.format;
  
  import static com.google.common.collect.Iterables.any;
 +import static java.lang.String.format;
 +import static org.apache.cassandra.db.TypeSizes.sizeof;
+ import static org.apache.cassandra.schema.SchemaConstants.TABLE_NAME_LENGTH;
 +import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
  
  /**
   * An immutable representation of keyspace metadata (name, params, tables, 
types, and functions).
   */
  public final class KeyspaceMetadata implements SchemaElement
  {
 +    public static final Serializer serializer = new Serializer();
 +
+     /**
+      * Validates the keyspace name for valid characters and correct length.
+      * Throws an exception if it's invalid.
+      *
+      * @param keyspaceName     The name of the keyspace to validate
+      * @param exceptionBuilder The exception constructor to throw if 
validation fails
+      */
+     public static <T extends RequestValidationException> void 
validateKeyspaceName(String keyspaceName, java.util.function.Function<String, 
T> exceptionBuilder)
+     {
+         if (!SchemaConstants.isValidCharsName(keyspaceName))
+             throw exceptionBuilder.apply(format("Keyspace name must not be 
empty and must contain alphanumeric or underscore characters only (got \"%s\")",
+                                                 keyspaceName));
+         if (keyspaceName.length() > SchemaConstants.NAME_LENGTH)
+             throw exceptionBuilder.apply(format("Keyspace name must not be 
more than %d characters long (got %d characters for \"%s\")",
+                                                 TABLE_NAME_LENGTH, 
keyspaceName.length(), keyspaceName));
+     }
+ 
      public enum Kind
      {
          REGULAR, VIRTUAL
@@@ -210,26 -211,15 +229,26 @@@
  
      public String findAvailableIndexName(String baseName)
      {
 -        if (!hasIndex(baseName))
 +        return findAvailableIndexName(baseName, Collections.emptySet(), this);
 +    }
 +
 +    /**
-      * find an avaiable index name based on the indexes in target keyspace 
and indexes collections
++     * find an available index name based on the indexes in target keyspace 
and indexes collections
 +     * @param baseName the base name of index
 +     * @param indexes find out whether there is any conflict with baseName in 
the indexes
 +     * @param keyspaceMetadata find out whether there is any conflict with 
baseName in keyspaceMetadata
 +     * */
 +    public String findAvailableIndexName(String baseName, 
Collection<IndexMetadata> indexes, KeyspaceMetadata keyspaceMetadata)
 +    {
 +        if (!hasIndex(baseName, indexes, keyspaceMetadata))
              return baseName;
  
 -        int i = 1;
          do
          {
 -            String name = baseName + '_' + i++;
 -            if (!hasIndex(name))
 +            String name = generateIndexName(baseName);
 +            if (!hasIndex(name, indexes, keyspaceMetadata))
                  return name;
 +            baseName = name;
          }
          while (true);
      }
@@@ -379,17 -334,10 +398,10 @@@
          return builder.toString();
      }
  
 -    public void validate()
 +    public void validate(ClusterMetadata metadata)
      {
-         if (!SchemaConstants.isValidName(name))
-         {
-             throw new ConfigurationException(format("Keyspace name must not 
be empty, more than %s characters long, "
-                                                     + "or contain 
non-alphanumeric-underscore characters (got \"%s\")",
-                                                     
SchemaConstants.NAME_LENGTH,
-                                                     name));
-         }
- 
+         validateKeyspaceName(name, ConfigurationException::new);
 -        params.validate(name, null);
 +        params.validate(name, null, metadata);
          tablesAndViews().forEach(TableMetadata::validate);
  
          Set<String> indexNames = new HashSet<>();
diff --cc src/java/org/apache/cassandra/schema/SchemaConstants.java
index efccc997b3,ff990f7cbb..6e3cb7a72e
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@@ -30,10 -30,9 +30,11 @@@ import com.google.common.collect.Sets
  import org.apache.cassandra.auth.AuthKeyspace;
  import org.apache.cassandra.db.Digest;
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.service.accord.AccordKeyspace;
  import org.apache.cassandra.tracing.TraceKeyspace;
  
+ import static 
org.apache.cassandra.db.Directories.TABLE_DIRECTORY_NAME_SEPARATOR;
 +import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;
  
  /**
   * When adding new String keyspace names here, double check if it needs to be 
added to PartitionDenylist.canDenylistKeyspace
diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java
index 011a5051df,98bcf042ef..1336747379
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@@ -90,9 -72,12 +89,14 @@@ import static com.google.common.collect
  import static java.lang.String.format;
  import static java.util.stream.Collectors.toList;
  import static java.util.stream.Collectors.toSet;
+ import static 
org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
+ import static 
org.apache.cassandra.db.Directories.TABLE_DIRECTORY_NAME_SEPARATOR;
 +import static org.apache.cassandra.db.TypeSizes.sizeof;
 +import static org.apache.cassandra.schema.ColumnMetadata.NO_UNIQUE_ID;
- import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
+ import static 
org.apache.cassandra.schema.KeyspaceMetadata.validateKeyspaceName;
+ import static org.apache.cassandra.schema.SchemaConstants.FILENAME_LENGTH;
+ import static org.apache.cassandra.schema.SchemaConstants.TABLE_NAME_LENGTH;
+ import static org.apache.cassandra.schema.SchemaConstants.isValidCharsName;
  
  @Unmetered
  public class TableMetadata implements SchemaElement
@@@ -632,23 -507,19 +634,34 @@@
              except("Missing partition keys for table %s", toString());
  
          indexes.validate(this);
 +
 +        for (ColumnMetadata columnMetadata : columns())
 +        {
 +            ColumnConstraints constraints = 
columnMetadata.getColumnConstraints();
 +            try
 +            {
 +                constraints.validate(columnMetadata);
 +            }
 +            catch (InvalidConstraintDefinitionException e)
 +            {
 +                throw new InvalidRequestException(e.getMessage(), e);
 +            }
 +        }
 +
 +        require((params.transactionalMode == TransactionalMode.off && 
params.transactionalMigrationFrom == TransactionalMigrationFromMode.none) || 
!isCounter(), "Counters are not supported with Accord for table " + this);
      }
  
+     private void validateTableName()
+     {
+         if (!isValidCharsName(name))
+             except("Table name must not be empty or not contain 
non-alphanumeric-underscore characters (got \"%s\")", name);
+ 
+         if (name.length() > TABLE_NAME_LENGTH)
+             except("Table name must not be more than %d characters long (got 
%d characters for \"%s\")", TABLE_NAME_LENGTH, name.length(), name);
+ 
+         assert getTableDirectoryName().length() <= FILENAME_LENGTH : 
String.format("Generated directory name for a table of %d characters doesn't 
fit the max filename legnth of %s. This unexpectedly wasn't prevented by check 
of the table name length, %d, to fit %d characters (got table name \"%s\" and 
generated directory name \"%s\"", getTableDirectoryName().length(), 
FILENAME_LENGTH, name.length(), TABLE_NAME_LENGTH, name, 
getTableDirectoryName());
+     }
+ 
      /**
       * To support backward compatibility with thrift super columns in the C* 
3.0+ storage engine, we encode said super
       * columns as a CQL {@code map<blob, blob>}. To ensure the name of this 
map did not conflict with any other user
@@@ -1799,7 -1580,7 +1851,7 @@@
              literals[i++] = asCQLLiteral(clusteringColumns().get(j).type, 
clustering.bufferAt(j));
          }
  
--        return i == 1 ? literals[0] : "(" + String.join(", ", literals) + ")";
++        return i == 1 ? literals[0] : '(' + String.join(", ", literals) + ')';
      }
  
      private static String asCQLLiteral(AbstractType<?> type, ByteBuffer value)
diff --cc test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
index 02c5d7b260,c8f5489b55..a625f84993
--- a/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
+++ b/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
@@@ -26,7 -24,12 +26,11 @@@ import org.apache.cassandra.utils.Bloom
  
  import org.junit.Test;
  
+ import static org.apache.cassandra.schema.SchemaConstants.FILENAME_LENGTH;
+ import static org.apache.cassandra.schema.SchemaConstants.NAME_LENGTH;
+ import static org.apache.cassandra.schema.SchemaConstants.TABLE_NAME_LENGTH;
+ import static org.assertj.core.api.Assertions.assertThat;
  import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 -import static org.junit.Assert.fail;
  
  public class CreateTableValidationTest extends CQLTester
  {
@@@ -91,10 -104,46 +95,46 @@@
                          "Missing CLUSTERING ORDER for column ck1");
      }
  
+     @Test
+     public void testCreatingTableWithLongName() throws Throwable
+     {
+         int tableIdSuffix = "-1b255f4def2540a60000000000000000".length();
+         String keyspaceName = "k".repeat(NAME_LENGTH);
+         String tableName = "t".repeat(FILENAME_LENGTH - tableIdSuffix);
+         String tooLongTableName = "l".repeat(FILENAME_LENGTH - tableIdSuffix 
+ 1);
+ 
+         execute(String.format("CREATE KEYSPACE %s with replication = " +
+                               "{ 'class' : 'SimpleStrategy', 
'replication_factor' : 1 }",
+                               keyspaceName));
+ 
+         assertInvalidMessage(String.format("%s.%s: Table name must not be 
more than %d characters long (got %d characters for \"%2$s\")",
+                                            keyspaceName, tooLongTableName, 
TABLE_NAME_LENGTH, tooLongTableName.length()),
+                              String.format("CREATE TABLE %s.%s (" +
+                                            "key int PRIMARY KEY," +
+                                            "val int)", keyspaceName, 
tooLongTableName));
+ 
+         createTable(String.format("CREATE TABLE %s.%s (" +
+                                   "key int PRIMARY KEY," +
+                                   "val int)", keyspaceName, tableName));
+         execute(String.format("INSERT INTO %s.%s (key,val) VALUES (1,1)", 
keyspaceName, tableName));
+         assertThat(execute(String.format("SELECT * from %s.%s", keyspaceName, 
tableName))).hasSize(1);
+         flush(keyspaceName, tableName);
+         assertThat(execute(String.format("SELECT * from %s.%s", keyspaceName, 
tableName))).hasSize(1);
+     }
+ 
+     @Test
+     public void testNonAlphanummericTableName() throws Throwable
+     {
+         assertInvalidMessage(String.format("%s.d-3: Table name must not be 
empty or not contain non-alphanumeric-underscore characters (got \"d-3\")", 
KEYSPACE),
+                              String.format("CREATE TABLE %s.\"d-3\" (key int 
PRIMARY KEY, val int)", KEYSPACE));
+         assertInvalidMessage(String.format("%s.    : Table name must not be 
empty or not contain non-alphanumeric-underscore characters (got \"    \")", 
KEYSPACE),
+                              String.format("CREATE TABLE %s.\"    \" (key int 
PRIMARY KEY, val int)", KEYSPACE));
+     }
+ 
 -    private void expectedFailure(String statement, String errorMsg)
 +    private void expectedFailure(final Class<? extends 
RequestValidationException> exceptionType, String statement, String errorMsg)
      {
  
 -        assertThatExceptionOfType(InvalidRequestException.class)
 +        assertThatExceptionOfType(exceptionType)
          .isThrownBy(() -> createTableMayThrow(statement)) 
.withMessageContaining(errorMsg);
      }
  }
diff --cc test/unit/org/apache/cassandra/schema/SchemaChangesTest.java
index 9d966c11e4,0000000000..c2d3796a35
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/schema/SchemaChangesTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaChangesTest.java
@@@ -1,578 -1,0 +1,595 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.function.Supplier;
 +
 +import com.google.common.collect.ImmutableMap;
++
++import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.junit.BeforeClass;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.rules.ExpectedException;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.marshal.ByteType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
 +import org.apache.cassandra.io.util.File;
 +import org.apache.cassandra.locator.NetworkTopologyStrategy;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.Util.throwAssert;
 +import static org.apache.cassandra.cql3.CQLTester.assertRows;
 +import static org.apache.cassandra.cql3.CQLTester.row;
++import static org.apache.cassandra.schema.SchemaConstants.NAME_LENGTH;
++import static org.apache.cassandra.utils.AssertionUtils.assertThatThrownBy;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
++import static org.junit.Assert.assertNotEquals;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +
 +
 +public class SchemaChangesTest
 +{
 +    private static final String KEYSPACE1 = "keyspace1";
 +    private static final String KEYSPACE3 = "keyspace3";
 +    private static final String KEYSPACE6 = "keyspace6";
 +    private static final String EMPTY_KEYSPACE = "test_empty_keyspace";
 +    private static final String TABLE1 = "standard1";
 +    private static final String TABLE2 = "standard2";
 +    private static final String TABLE1i = "indexed1";
 +
 +    @Rule
 +    public ExpectedException thrown = ExpectedException.none();
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.startGossiper();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, 
TABLE1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, 
TABLE2));
 +        SchemaLoader.createKeyspace(KEYSPACE3,
 +                                    KeyspaceParams.simple(5),
 +                                    SchemaLoader.standardCFMD(KEYSPACE3, 
TABLE1),
 +                                    
SchemaLoader.compositeIndexCFMD(KEYSPACE3, TABLE1i, true));
 +        SchemaLoader.createKeyspace(KEYSPACE6,
 +                                    KeyspaceParams.simple(1),
 +                                    
SchemaLoader.compositeIndexCFMD(KEYSPACE6, TABLE1i, true));
 +    }
 +
 +    @Test
 +    public void testTableMetadataBuilder() throws ConfigurationException
 +    {
 +        TableMetadata.Builder builder =
 +            TableMetadata.builder(KEYSPACE1, "TestApplyCFM_CF")
 +                         .addPartitionKeyColumn("keys", BytesType.instance)
 +                         .addClusteringColumn("col", BytesType.instance)
 +                         .comment("No comment")
 +                         .gcGraceSeconds(100000)
 +                         
.compaction(CompactionParams.stcs(ImmutableMap.of("min_threshold", "500", 
"max_threshold", "500")));
 +
 +        for (int i = 0; i < 5; i++)
 +        {
 +            ByteBuffer name = ByteBuffer.wrap(new byte[] { (byte)i });
 +            builder.addRegularColumn(ColumnIdentifier.getInterned(name, 
BytesType.instance), ByteType.instance);
 +        }
 +
 +
 +        TableMetadata table = builder.build();
 +        // we'll be adding this one later. make sure it's not already there.
 +        assertNull(table.getColumn(ByteBuffer.wrap(new byte[]{ 5 })));
 +
 +        // add one.
 +        ColumnMetadata addIndexDef = ColumnMetadata.regularColumn(table, 
ByteBuffer.wrap(new byte[] { 5 }), BytesType.instance, 
ColumnMetadata.NO_UNIQUE_ID);
 +        builder.addColumn(addIndexDef);
 +
 +        // remove one.
 +        ColumnMetadata removeIndexDef = ColumnMetadata.regularColumn(table, 
ByteBuffer.wrap(new byte[] { 0 }), BytesType.instance, 
ColumnMetadata.NO_UNIQUE_ID);
 +        builder.removeRegularOrStaticColumn(removeIndexDef.name);
 +
 +        TableMetadata table2 = builder.build();
 +
 +        for (int i = 1; i < table2.columns().size(); i++)
 +            assertNotNull(table2.getColumn(ByteBuffer.wrap(new byte[]{ 1 })));
 +        assertNull(table2.getColumn(ByteBuffer.wrap(new byte[]{ 0 })));
 +        assertNotNull(table2.getColumn(ByteBuffer.wrap(new byte[]{ 5 })));
 +    }
 +
 +    @Test
 +    public void testInvalidNames()
 +    {
-         String[] valid = {"1", "a", "_1", "b_", "__", "1_a"};
++        String[] valid = { "1", "a", "_1", "b_", "__", "1_a" };
 +        for (String s : valid)
-             assertTrue(SchemaConstants.isValidName(s));
- 
-         String[] invalid = {"b@t", "dash-y", "", " ", "dot.s", ".hidden"};
++        {
++            assertTrue(SchemaConstants.isValidCharsName(s));
++            KeyspaceMetadata.validateKeyspaceName(s, 
InvalidRequestException::new);
++        }
++        String[] invalid = { "b@t", "dash-y", "", " ", "dot.s", ".hidden" };
 +        for (String s : invalid)
-             assertFalse(SchemaConstants.isValidName(s));
++        {
++            assertFalse(SchemaConstants.isValidCharsName(s));
++            assertThatThrownBy(() -> KeyspaceMetadata.validateKeyspaceName(s, 
InvalidRequestException::new)).isInstanceOf(InvalidRequestException.class);
++        }
++
++        KeyspaceMetadata.validateKeyspaceName("k".repeat(NAME_LENGTH), 
InvalidRequestException::new);
++        assertThatThrownBy(() -> 
KeyspaceMetadata.validateKeyspaceName("k".repeat(NAME_LENGTH + 1), 
InvalidRequestException::new)).isInstanceOf(InvalidRequestException.class);
 +    }
 +
 +    @Test
 +    public void addNewCfToBogusKeyspace()
 +    {
 +        TableMetadata newCf = addTestTable("MadeUpKeyspace", "NewCF", "new 
cf");
 +        try
 +        {
 +            SchemaTestUtil.announceNewTable(newCf);
 +            throw new AssertionError("You shouldn't be able to do anything to 
a keyspace that doesn't exist.");
 +        }
 +        catch (ConfigurationException expected)
 +        {
 +        }
 +    }
 +
 +    @Test
 +    public void addNewTable() throws ConfigurationException
 +    {
 +        final String ksName = KEYSPACE1;
 +        final String tableName = "anewtable";
 +        KeyspaceMetadata original = 
Schema.instance.getKeyspaceMetadata(ksName);
 +
 +        TableMetadata cfm = addTestTable(original.name, tableName, "A New 
Table");
 +
 +        
assertFalse(Schema.instance.getKeyspaceMetadata(ksName).tables.get(cfm.name).isPresent());
 +        SchemaTestUtil.announceNewTable(cfm);
 +
 +        
assertTrue(Schema.instance.getKeyspaceMetadata(ksName).tables.get(cfm.name).isPresent());
 +        assertEquals(cfm, 
Schema.instance.getKeyspaceMetadata(ksName).tables.get(cfm.name).get());
 +
 +        // now read and write to it.
 +        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (key, 
col, val) VALUES (?, ?, ?)",
 +                                                     ksName, tableName),
 +                                       "key0", "col0", "val0");
 +
 +        // flush to exercise more than just hitting the memtable
 +        ColumnFamilyStore cfs = 
Keyspace.open(ksName).getColumnFamilyStore(tableName);
 +        assertNotNull(cfs);
 +        Util.flush(cfs);
 +
 +        // and make sure we get out what we put in
 +        UntypedResultSet rows = 
QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s", ksName, 
tableName));
 +        assertRows(rows, row("key0", "col0", "val0"));
 +    }
 +
 +    @Test
 +    public void dropCf() throws ConfigurationException
 +    {
 +        // sanity
 +        final KeyspaceMetadata ks = 
Schema.instance.getKeyspaceMetadata(KEYSPACE1);
 +        assertNotNull(ks);
 +        final TableMetadata cfm = ks.tables.getNullable(TABLE1);
 +        assertNotNull(cfm);
 +
 +        // write some data, force a flush, then verify that files exist on 
disk.
 +        for (int i = 0; i < 100; i++)
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(key, name, val) VALUES (?, ?, ?)",
 +                                                         KEYSPACE1, TABLE1),
 +                                           "dropCf", "col" + i, "anyvalue");
 +        ColumnFamilyStore store = 
Keyspace.open(cfm.keyspace).getColumnFamilyStore(cfm.name);
 +        assertNotNull(store);
 +        Util.flush(store);
 +        
assertTrue(store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().size()
 > 0);
 +
 +        SchemaTestUtil.announceTableDrop(ks.name, cfm.name);
 +
 +        
assertFalse(Schema.instance.getKeyspaceMetadata(ks.name).tables.get(cfm.name).isPresent());
 +
 +        // any write should fail.
 +        boolean success = true;
 +        try
 +        {
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(key, name, val) VALUES (?, ?, ?)",
 +                                                         KEYSPACE1, TABLE1),
 +                                           "dropCf", "col0", "anyvalue");
 +        }
 +        catch (Throwable th)
 +        {
 +            success = false;
 +        }
 +        assertFalse("This mutation should have failed since the CF no longer 
exists.", success);
 +
 +        // verify that the files are gone.
 +        Supplier<Object> lambda = () -> {
 +            for (File file : 
store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).listFiles())
 +            {
 +                if (file.path().endsWith("Data.db") && !new 
File(file.path().replace("Data.db", "Compacted")).exists())
 +                    return false;
 +            }
 +            return true;
 +        };
 +        Util.spinAssertEquals(true, lambda, 30);
 +
 +    }
 +
 +    @Test
 +    public void addNewKS() throws ConfigurationException
 +    {
 +        TableMetadata cfm = addTestTable("newkeyspace1", "newstandard1", "A 
new cf for a new ks");
 +        KeyspaceMetadata newKs = KeyspaceMetadata.create(cfm.keyspace, 
KeyspaceParams.simple(5), Tables.of(cfm));
 +        SchemaTestUtil.announceNewKeyspace(newKs);
 +
 +        assertNotNull(Schema.instance.getKeyspaceMetadata(cfm.keyspace));
 +        assertEquals(Schema.instance.getKeyspaceMetadata(cfm.keyspace), 
newKs);
 +
 +        // test reads and writes.
 +        QueryProcessor.executeInternal("INSERT INTO newkeyspace1.newstandard1 
(key, col, val) VALUES (?, ?, ?)",
 +                                       "key0", "col0", "val0");
 +        ColumnFamilyStore store = 
Keyspace.open(cfm.keyspace).getColumnFamilyStore(cfm.name);
 +        assertNotNull(store);
 +        Util.flush(store);
 +
 +        UntypedResultSet rows = QueryProcessor.executeInternal("SELECT * FROM 
newkeyspace1.newstandard1");
 +        assertRows(rows, row("key0", "col0", "val0"));
 +    }
 +
 +    @Test
 +    public void dropKSUnflushed() throws ConfigurationException
 +    {
 +        // sanity
 +        final KeyspaceMetadata ks = 
Schema.instance.getKeyspaceMetadata(KEYSPACE3);
 +        assertNotNull(ks);
 +        final TableMetadata cfm = ks.tables.getNullable(TABLE1);
 +        assertNotNull(cfm);
 +
 +        // write some data
 +        for (int i = 0; i < 100; i++)
 +            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s 
(key, name, val) VALUES (?, ?, ?)",
 +                                                         KEYSPACE3, TABLE1),
 +                                           "dropKs", "col" + i, "anyvalue");
 +
 +        SchemaTestUtil.announceKeyspaceDrop(ks.name);
 +
 +        assertNull(Schema.instance.getKeyspaceMetadata(ks.name));
 +    }
 +
 +    @Test
 +    public void createEmptyKsAddNewCf() throws ConfigurationException
 +    {
 +        assertNull(Schema.instance.getKeyspaceMetadata(EMPTY_KEYSPACE));
 +        KeyspaceMetadata newKs = KeyspaceMetadata.create(EMPTY_KEYSPACE, 
KeyspaceParams.simple(5));
 +        SchemaTestUtil.announceNewKeyspace(newKs);
 +        assertNotNull(Schema.instance.getKeyspaceMetadata(EMPTY_KEYSPACE));
 +
 +        String tableName = "added_later";
 +        TableMetadata newCf = addTestTable(EMPTY_KEYSPACE, tableName, "A new 
CF to add to an empty KS");
 +
 +        //should not exist until apply
 +        
assertFalse(Schema.instance.getKeyspaceMetadata(newKs.name).tables.get(newCf.name).isPresent());
 +
 +        //add the new CF to the empty space
 +        SchemaTestUtil.announceNewTable(newCf);
 +
 +        
assertTrue(Schema.instance.getKeyspaceMetadata(newKs.name).tables.get(newCf.name).isPresent());
 +        
assertEquals(Schema.instance.getKeyspaceMetadata(newKs.name).tables.get(newCf.name).get(),
 newCf);
 +
 +        // now read and write to it.
 +        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (key, 
col, val) VALUES (?, ?, ?)",
 +                                                     EMPTY_KEYSPACE, 
tableName),
 +                                       "key0", "col0", "val0");
 +
 +        ColumnFamilyStore cfs = 
Keyspace.open(newKs.name).getColumnFamilyStore(newCf.name);
 +        assertNotNull(cfs);
 +        Util.flush(cfs);
 +
 +        UntypedResultSet rows = 
QueryProcessor.executeInternal(String.format("SELECT * FROM %s.%s", 
EMPTY_KEYSPACE, tableName));
 +        assertRows(rows, row("key0", "col0", "val0"));
 +    }
 +
 +    @Test
 +    public void testUpdateKeyspace() throws ConfigurationException
 +    {
 +        // create a keyspace to serve as existing.
 +        TableMetadata cf = addTestTable("UpdatedKeyspace", "AddedStandard1", 
"A new cf for a new ks");
 +        KeyspaceMetadata oldKs = KeyspaceMetadata.create(cf.keyspace, 
KeyspaceParams.simple(5), Tables.of(cf));
 +
 +        SchemaTestUtil.announceNewKeyspace(oldKs);
 +
 +        assertNotNull(Schema.instance.getKeyspaceMetadata(cf.keyspace));
 +        assertEquals(Schema.instance.getKeyspaceMetadata(cf.keyspace), oldKs);
 +
 +        // names should match.
 +        KeyspaceMetadata newBadKs2 = KeyspaceMetadata.create(cf.keyspace + 
"trash", KeyspaceParams.simple(4));
 +        try
 +        {
 +            SchemaTestUtil.announceKeyspaceUpdate(newBadKs2);
 +            throw new AssertionError("Should not have been able to update a 
KS with an invalid KS name.");
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            // expected.
 +        }
 +
 +        Map<String, String> replicationMap = new HashMap<>();
 +        replicationMap.put(ReplicationParams.CLASS, 
NetworkTopologyStrategy.class.getName());
 +        replicationMap.put("replication_factor", "1");
 +
 +        KeyspaceMetadata newKs = KeyspaceMetadata.create(cf.keyspace, 
KeyspaceParams.create(true, replicationMap));
 +        SchemaTestUtil.announceKeyspaceUpdate(newKs);
 +
 +        KeyspaceMetadata newFetchedKs = 
Schema.instance.getKeyspaceMetadata(newKs.name);
 +        assertEquals(newFetchedKs.params.replication.klass, 
newKs.params.replication.klass);
-         
assertFalse(newFetchedKs.params.replication.klass.equals(oldKs.params.replication.klass));
++        assertNotEquals(newFetchedKs.params.replication.klass, 
oldKs.params.replication.klass);
 +    }
 +
 +    /*
 +    @Test
 +    public void testUpdateColumnFamilyNoIndexes() throws 
ConfigurationException
 +    {
 +        // create a keyspace with a cf to update.
 +        CFMetaData cf = addTestTable("UpdatedCfKs", "Standard1added", "A new 
cf that will be updated");
 +        KSMetaData ksm = KSMetaData.testMetadata(cf.ksName, 
SimpleStrategy.class, KSMetaData.optsWithRF(1), cf);
 +        MigrationManager.announceNewKeyspace(ksm);
 +
 +        assertNotNull(Schema.instance.getKSMetaData(cf.ksName));
 +        assertEquals(Schema.instance.getKSMetaData(cf.ksName), ksm);
 +        assertNotNull(Schema.instance.getTableMetadataRef(cf.ksName, 
cf.cfName));
 +
 +        // updating certain fields should fail.
 +        CFMetaData newCfm = cf.copy();
 +        newCfm.defaultValidator(BytesType.instance);
 +        newCfm.minCompactionThreshold(5);
 +        newCfm.maxCompactionThreshold(31);
 +
 +        // test valid operations.
 +        newCfm.comment("Modified comment");
 +        MigrationManager.announceTableUpdate(newCfm); // doesn't get set back 
here.
 +
 +        newCfm.readRepairChance(0.23);
 +        MigrationManager.announceTableUpdate(newCfm);
 +
 +        newCfm.gcGraceSeconds(12);
 +        MigrationManager.announceTableUpdate(newCfm);
 +
 +        newCfm.defaultValidator(UTF8Type.instance);
 +        MigrationManager.announceTableUpdate(newCfm);
 +
 +        newCfm.minCompactionThreshold(3);
 +        MigrationManager.announceTableUpdate(newCfm);
 +
 +        newCfm.maxCompactionThreshold(33);
 +        MigrationManager.announceTableUpdate(newCfm);
 +
 +        // can't test changing the reconciler because there is only one impl.
 +
 +        // check the cumulative affect.
 +        assertEquals(Schema.instance.getTableMetadataRef(cf.ksName, 
cf.cfName).getComment(), newCfm.getComment());
 +        assertEquals(Schema.instance.getTableMetadataRef(cf.ksName, 
cf.cfName).getReadRepairChance(), newCfm.getReadRepairChance(), 0.0001);
 +        assertEquals(Schema.instance.getTableMetadataRef(cf.ksName, 
cf.cfName).getGcGraceSeconds(), newCfm.getGcGraceSeconds());
 +        assertEquals(UTF8Type.instance, 
Schema.instance.getTableMetadataRef(cf.ksName, 
cf.cfName).getDefaultValidator());
 +
 +        // Change tableId
 +        newCfm = new CFMetaData(cf.ksName, cf.cfName, cf.cfType, 
cf.comparator);
 +        CFMetaData.copyOpts(newCfm, cf);
 +        try
 +        {
 +            cf.apply(newCfm);
 +            throw new AssertionError("Should have blown up when you used a 
different id.");
 +        }
 +        catch (ConfigurationException expected) {}
 +
 +        // Change cfName
 +        newCfm = new CFMetaData(cf.ksName, cf.cfName + "_renamed", cf.cfType, 
cf.comparator);
 +        CFMetaData.copyOpts(newCfm, cf);
 +        try
 +        {
 +            cf.apply(newCfm);
 +            throw new AssertionError("Should have blown up when you used a 
different name.");
 +        }
 +        catch (ConfigurationException expected) {}
 +
 +        // Change ksName
 +        newCfm = new CFMetaData(cf.ksName + "_renamed", cf.cfName, cf.cfType, 
cf.comparator);
 +        CFMetaData.copyOpts(newCfm, cf);
 +        try
 +        {
 +            cf.apply(newCfm);
 +            throw new AssertionError("Should have blown up when you used a 
different keyspace.");
 +        }
 +        catch (ConfigurationException expected) {}
 +
 +        // Change cf type
 +        newCfm = new CFMetaData(cf.ksName, cf.cfName, ColumnFamilyType.Super, 
cf.comparator);
 +        CFMetaData.copyOpts(newCfm, cf);
 +        try
 +        {
 +            cf.apply(newCfm);
 +            throw new AssertionError("Should have blwon up when you used a 
different cf type.");
 +        }
 +        catch (ConfigurationException expected) {}
 +
 +        // Change comparator
 +        newCfm = new CFMetaData(cf.ksName, cf.cfName, cf.cfType, new 
SimpleDenseCellNameType(TimeUUIDType.instance));
 +        CFMetaData.copyOpts(newCfm, cf);
 +        try
 +        {
 +            cf.apply(newCfm);
 +            throw new AssertionError("Should have blown up when you used a 
different comparator.");
 +        }
 +        catch (ConfigurationException expected) {}
 +    }
 +    */
 +
 +    @Test
 +    public void testDropIndex() throws ConfigurationException
 +    {
 +        // persist keyspace definition in the system keyspace
 +        
SchemaKeyspace.makeCreateKeyspaceMutation(Schema.instance.getKeyspaceMetadata(KEYSPACE6),
 FBUtilities.timestampMicros()).build().applyUnsafe();
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE6).getColumnFamilyStore(TABLE1i);
 +        String indexName = TABLE1i + "_birthdate_key_index";
 +
 +        // insert some data.  save the sstable descriptor so we can make sure 
it's marked for delete after the drop
 +        QueryProcessor.executeInternal(String.format(
 +                                                    "INSERT INTO %s.%s (key, 
c1, birthdate, notbirthdate) VALUES (?, ?, ?, ?)",
 +                                                    KEYSPACE6,
 +                                                    TABLE1i),
 +                                       "key0", "col0", 1L, 1L);
 +
 +        Util.flush(cfs);
 +        ColumnFamilyStore indexCfs = 
cfs.indexManager.getIndexByName(indexName)
 +                                                     .getBackingTable()
 +                                                     
.orElseThrow(throwAssert("Cannot access index cfs"));
 +        Descriptor desc = 
indexCfs.getLiveSSTables().iterator().next().descriptor;
 +
 +        // drop the index
 +        TableMetadata meta = cfs.metadata();
 +        IndexMetadata existing = meta.indexes
 +                                     .get(indexName)
 +                                     .orElseThrow(throwAssert("Index not 
found"));
 +
 +        
SchemaTestUtil.announceTableUpdate(meta.unbuild().indexes(meta.indexes.without(existing.name)).build());
 +
 +        // check
 +        assertTrue(cfs.indexManager.listIndexes().isEmpty());
 +        LifecycleTransaction.waitForDeletions();
 +        assertFalse(desc.fileFor(Components.DATA).exists());
 +    }
 +
 +    @Test
-     public void testValidateNullKeyspace()
++    public void testTableMetadataBuilderWithInvalidKeyspace()
 +    {
-         TableMetadata.Builder builder = TableMetadata.builder(null, 
TABLE1).addPartitionKeyColumn("partitionKey", BytesType.instance);
-         try
++        String[][] keyspaces =
 +        {
-             builder.build();
-         }
-         catch (ConfigurationException e)
++        { null, "Keyspace name must not be empty" },
++        { "a@b", "Keyspace name must not be empty and must contain 
alphanumeric or underscore characters only" },
++        { "k".repeat(NAME_LENGTH), String.format("Keyspace name must not be 
more than %d characters long", NAME_LENGTH) }
++        };
++        for (String[] keyspace : keyspaces)
 +        {
-             assertTrue(e.getMessage().contains(null + "." + TABLE1 + ": 
Keyspace name must not be empty"));
++            TableMetadata.Builder builder = 
TableMetadata.builder(keyspace[0], 
TABLE1).addPartitionKeyColumn("partitionKey", BytesType.instance);
++            thrown.expect(ConfigurationException.class);
++            thrown.expectMessage(String.format("%s.%s: %s", keyspace[0], 
TABLE1, keyspace[1]));
++            builder.build();
 +        }
 +    }
 +
 +    @Test
-     public void testValidateCompatibilityIDMismatch() throws Exception
++    public void testValidateCompatibilityIDMismatch()
 +    {
 +        TableMetadata.Builder builder = TableMetadata.builder(KEYSPACE1, 
TABLE1).addPartitionKeyColumn("partitionKey", BytesType.instance);
 +
 +        TableMetadata table1 = builder.build();
 +        TableMetadata table2 = 
table1.unbuild().id(TableId.generate()).build();
 +        thrown.expect(ConfigurationException.class);
-         thrown.expectMessage(KEYSPACE1 + "." + TABLE1 + ": Table ID 
mismatch");
++        thrown.expectMessage(KEYSPACE1 + '.' + TABLE1 + ": Table ID 
mismatch");
 +        table1.validateCompatibility(table2);
 +    }
 +
 +    @Test
-     public void testValidateCompatibilityNameMismatch() throws Exception
++    public void testValidateCompatibilityNameMismatch()
 +    {
 +        TableMetadata.Builder builder1 = TableMetadata.builder(KEYSPACE1, 
TABLE1).addPartitionKeyColumn("partitionKey", BytesType.instance);
 +        TableMetadata.Builder builder2 = TableMetadata.builder(KEYSPACE1, 
TABLE2).addPartitionKeyColumn("partitionKey", BytesType.instance);
 +        TableMetadata table1 = builder1.build();
 +        TableMetadata table2 = builder2.build();
 +        thrown.expect(ConfigurationException.class);
-         thrown.expectMessage(KEYSPACE1 + "." + TABLE1 + ": Table mismatch");
++        thrown.expectMessage(KEYSPACE1 + '.' + TABLE1 + ": Table mismatch");
 +        table1.validateCompatibility(table2);
 +    }
 +
 +    @Test
 +    public void testEvolveSystemKeyspaceNew()
 +    {
 +        TableMetadata table = addTestTable("ks0", "t", "");
 +        KeyspaceMetadata keyspace = KeyspaceMetadata.create("ks0", 
KeyspaceParams.simple(1), Tables.of(table));
 +
 +        SchemaTransformation transformation = 
SchemaTransformations.updateSystemKeyspace(keyspace, 0);
 +        Keyspaces before = Keyspaces.none();
 +        Keyspaces after = 
transformation.apply(ClusterMetadataTestHelper.minimalForTesting(before));
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before, after);
 +
 +        assertTrue(diff.altered.isEmpty());
 +        assertTrue(diff.dropped.isEmpty());
 +        assertEquals(keyspace, diff.created.getNullable("ks0"));
 +    }
 +
 +    @Test
 +    public void testEvolveSystemKeyspaceExistsUpToDate()
 +    {
 +        TableMetadata table = addTestTable("ks1", "t", "");
 +        KeyspaceMetadata keyspace = KeyspaceMetadata.create("ks1", 
KeyspaceParams.simple(1), Tables.of(table));
 +
 +        SchemaTransformation transformation = 
SchemaTransformations.updateSystemKeyspace(keyspace, 0);
 +        Keyspaces before = Keyspaces.of(keyspace);
 +        Keyspaces after = 
transformation.apply(ClusterMetadataTestHelper.minimalForTesting(before));
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before, after);
 +
 +        assertTrue(diff.isEmpty());
 +    }
 +
 +    @Test
 +    public void testEvolveSystemKeyspaceChanged()
 +    {
 +        TableMetadata table0 = addTestTable("ks2", "t", "");
 +        KeyspaceMetadata keyspace0 = KeyspaceMetadata.create("ks2", 
KeyspaceParams.simple(1), Tables.of(table0));
 +
 +        TableMetadata table1 = table0.unbuild().comment("comment").build();
 +        KeyspaceMetadata keyspace1 = KeyspaceMetadata.create("ks2", 
KeyspaceParams.simple(1), Tables.of(table1));
 +
 +        SchemaTransformation transformation = 
SchemaTransformations.updateSystemKeyspace(keyspace1, 1);
 +        Keyspaces before = Keyspaces.of(keyspace0);
 +        Keyspaces after = 
transformation.apply(ClusterMetadataTestHelper.minimalForTesting(before));
 +        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(before, after);
 +
 +        assertTrue(diff.created.isEmpty());
 +        assertTrue(diff.dropped.isEmpty());
 +        assertEquals(1, diff.altered.size());
 +        assertEquals(keyspace1, diff.altered.get(0).after);
 +    }
 +
 +    private TableMetadata addTestTable(String ks, String cf, String comment)
 +    {
 +        return
 +            TableMetadata.builder(ks, cf)
 +                         .addPartitionKeyColumn("key", UTF8Type.instance)
 +                         .addClusteringColumn("col", UTF8Type.instance)
 +                         .addRegularColumn("val", UTF8Type.instance)
 +                         .comment(comment)
 +                         .build();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/schema/ValidationTest.java
index e9edbf729d,7af8112d82..9e1e834d88
--- a/test/unit/org/apache/cassandra/schema/ValidationTest.java
+++ b/test/unit/org/apache/cassandra/schema/ValidationTest.java
@@@ -58,19 -40,19 +58,19 @@@ public class ValidationTest extends Cas
      @Test
      public void testIsNameValidNegative()
      {
-         assertFalse(SchemaConstants.isValidName(null));
-         assertFalse(SchemaConstants.isValidName(""));
-         assertFalse(SchemaConstants.isValidName(" "));
-         assertFalse(SchemaConstants.isValidName("@"));
-         assertFalse(SchemaConstants.isValidName("!"));
+         assertFalse(SchemaConstants.isValidCharsName(null));
+         assertFalse(SchemaConstants.isValidCharsName(""));
+         assertFalse(SchemaConstants.isValidCharsName(" "));
+         assertFalse(SchemaConstants.isValidCharsName("@"));
+         assertFalse(SchemaConstants.isValidCharsName("!"));
      }
  
 -    private static Set<String> primitiveTypes =
 -        new HashSet<>(Arrays.asList(new String[] { "ascii", "bigint", "blob", 
"boolean", "date",
 -                                                   "duration", "decimal", 
"double", "float",
 -                                                   "inet", "int", "smallint", 
"text", "time",
 -                                                   "timestamp", "timeuuid", 
"tinyint", "uuid",
 -                                                   "varchar", "varint" }));
 +    private static final Set<String> primitiveTypes =
 +        new HashSet<>(Arrays.asList("ascii", "bigint", "blob", "boolean", 
"date",
 +                                    "duration", "decimal", "double", "float",
 +                                    "inet", "int", "smallint", "text", "time",
 +                                    "timestamp", "timeuuid", "tinyint", 
"uuid",
 +                                    "varchar", "varint"));
  
      @Test
      public void typeCompatibilityTest()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to