This is an automated email from the ASF dual-hosted git repository. edimitrova pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c7432e98a381c16cb63692e727c35e2909d7200a Merge: 0e990d7 b654000 Author: Ekaterina Dimitrova <[email protected]> AuthorDate: Wed Mar 10 20:54:07 2021 -0500 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + build.xml | 2 +- .../statements/schema/AlterTableStatement.java | 93 ++++++- .../org/apache/cassandra/db/ColumnFamilyStore.java | 4 + .../db/compaction/CompactionStrategyManager.java | 5 +- .../org/apache/cassandra/db/lifecycle/Tracker.java | 61 +++-- .../org/apache/cassandra/gms/ApplicationState.java | 17 ++ .../cassandra/gms/GossipDigestAckVerbHandler.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 42 ++- .../org/apache/cassandra/gms/VersionedValue.java | 12 +- .../apache/cassandra/io/sstable/Descriptor.java | 7 + .../cassandra/io/sstable/format/Version.java | 1 - .../io/sstable/format/VersionAndType.java | 93 +++++++ .../InitialSSTableAddedNotification.java} | 27 +- src/java/org/apache/cassandra/schema/Schema.java | 3 +- .../cassandra/schema/SchemaTransformation.java | 4 +- .../cassandra/service/SSTablesGlobalTracker.java | 284 +++++++++++++++++++++ .../SSTablesVersionsInUseChangeNotification.java | 50 ++++ .../apache/cassandra/service/StartupChecks.java | 5 +- .../apache/cassandra/service/StorageService.java | 19 ++ .../upgrade/CompactStorage3to4UpgradeTest.java | 4 + .../validation/operations/CompactTableTest.java | 1 - .../apache/cassandra/db/lifecycle/TrackerTest.java | 31 ++- .../io/sstable/format/VersionAndTypeTest.java | 51 ++++ .../service/SSTablesGlobalTrackerTest.java | 158 ++++++++++++ 25 files changed, 899 insertions(+), 78 deletions(-) diff --cc CHANGES.txt index ffec3cb,1e74bf6..592c9c3 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,50 -1,10 +1,51 @@@ -3.11.11 +4.0-beta5 + * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during startup (CASSANDRA-16425) + * Reinstate removed ApplicationState padding (CASSANDRA-16484) + * Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335) + * Add possibility to copy SSTables in SSTableImporter instead of moving them (CASSANDRA-16407) + * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482) + * Fix cassandra-stress JMX connection (CASSANDRA-16473) + * Avoid processing redundant application states on endpoint changes (CASSANDRA-16381) + * Prevent parent repair sessions leak (CASSANDRA-16446) + * Fix timestamp issue in SinglePartitionSliceCommandTest testPartitionD…eletionRowDeletionTie (CASSANDRA-16443) + * Promote protocol V5 out of beta (CASSANDRA-14973) + * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429) + * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296) + * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344) + * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422) + * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083) + * Reduce new reserved keywords introduced since 3.0 (CASSANDRA-16439) + * Improve system tables handling in case of disk failures (CASSANDRA-14793) + * Add access and datacenters to unreserved keywords (CASSANDRA-16398) + * Fix nodetool ring, status output when DNS resolution or port printing are in use (CASSANDRA-16283) + * Upgrade Jacoco to 0.8.6 (for Java 11 support) (CASSANDRA-16365) + * Move excessive repair debug loggings to trace level (CASSANDRA-16406) + * Restore validation of each message's protocol version (CASSANDRA-16374) + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392) + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834) + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) +Merged from 3.11: * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462) + * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962) + * Reduce amount of allocations during batch statement execution (CASSANDRA-16201) + * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393) Merged from 3.0: + * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use (CASSANDRA-15897) * Fix ColumnFilter::toString not returning a valid CQL fragment (CASSANDRA-16483) * Fix ColumnFilter behaviour to prevent digest mitmatches during upgrades (CASSANDRA-16415) + * Update debian packaging for python3 (CASSANDRA-16396) * Avoid pushing schema mutations when setting up distributed system keyspaces locally (CASSANDRA-16387) + * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) + * Improve empty hint file handling during startup (CASSANDRA-16162) + * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) + * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) Merged from 2.2: * Fix centos packaging for arm64, >=4.0 rpm's now require python3 (CASSANDRA-16477) * Make TokenMetadata's ring version increments atomic (CASSANDRA-16286) diff --cc build.xml index 36db9e5,5726fed..ecb1ad8 --- a/build.xml +++ b/build.xml @@@ -586,14 -412,21 +586,15 @@@ <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/> <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/> <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/> - <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/> - - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"> - <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/> - <exclusion groupId="junit" artifactId="junit"/> - </dependency> - <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> - <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2"> - <exclusion groupId="commons-logging" artifactId="commons-logging"/> - </dependency> + <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/> + <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.26"/> <dependency groupId="junit" artifactId="junit" version="4.12" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.26" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.7" /> <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> @@@ -667,22 -484,11 +668,21 @@@ <dependency groupId="de.jflex" artifactId="jflex" version="1.6.0" /> <dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" /> <dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" /> - <dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" /> - <dependency groupId="org.jctools" artifactId="jctools-core" version="1.2.1"/> - <dependency groupId="org.ow2.asm" artifactId="asm" version="5.0.4" /> + <dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.3.5" /> + <dependency groupId="org.jctools" artifactId="jctools-core" version="3.1.0"/> + <dependency groupId="org.ow2.asm" artifactId="asm" version="${asm.version}" /> + <dependency groupId="org.ow2.asm" artifactId="asm-tree" version="${asm.version}" /> + <dependency groupId="org.ow2.asm" artifactId="asm-commons" version="${asm.version}" /> + <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-cli" version="0.14"/> + <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-core" version="0.14"/> + <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-stacktrace" version="0.14"/> + <dependency groupId="org.gridkit.jvmtool" artifactId="mxdump" version="0.14"/> + <dependency groupId="org.gridkit.lab" artifactId="jvm-attach-api" version="1.5"/> + <dependency groupId="org.gridkit.jvmtool" artifactId="sjk-json" version="0.14"/> + <dependency groupId="com.beust" artifactId="jcommander" version="1.30"/> <!-- when updating assertj, make sure to also update the corresponding junit-bom dependency --> <dependency groupId="org.assertj" artifactId="assertj-core" version="3.15.0"/> + <dependency groupId="org.awaitility" artifactId="awaitility" version="4.0.3" /> - </dependencyManagement> <developer id="adelapena" name="Andres de la Peña"/> <developer id="alakshman" name="Avinash Lakshman"/> diff --cc src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index b6cab44,0000000..0820073 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@@ -1,506 -1,0 +1,593 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + ++import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; ++import java.util.concurrent.TimeUnit; + ++import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableSet; + ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.Permission; + +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QualifiedName; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.AbstractType; + ++import org.apache.cassandra.exceptions.InvalidRequestException; ++import org.apache.cassandra.gms.ApplicationState; ++import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.schema.ViewMetadata; +import org.apache.cassandra.schema.Views; +import org.apache.cassandra.service.ClientState; ++import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; ++import org.apache.cassandra.utils.NoSpamLogger; + ++import static java.lang.String.format; +import static java.lang.String.join; + +import static com.google.common.collect.Iterables.isEmpty; +import static com.google.common.collect.Iterables.transform; + +public abstract class AlterTableStatement extends AlterSchemaStatement +{ + protected final String tableName; + + public AlterTableStatement(String keyspaceName, String tableName) + { + super(keyspaceName); + this.tableName = tableName; + } + - public Keyspaces apply(Keyspaces schema) ++ public Keyspaces apply(Keyspaces schema) throws UnknownHostException + { + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + + TableMetadata table = null == keyspace + ? null + : keyspace.getTableOrViewNullable(tableName); + + if (null == table) + throw ire("Table '%s.%s' doesn't exist", keyspaceName, tableName); + + if (table.isView()) + throw ire("Cannot use ALTER TABLE on a materialized view; use ALTER MATERIALIZED VIEW instead"); + + return schema.withAddedOrUpdated(apply(keyspace, table)); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, tableName); + } + + public void authorize(ClientState client) + { + client.ensureTablePermission(keyspaceName, tableName, Permission.ALTER); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.ALTER_TABLE, keyspaceName, tableName); + } + + public String toString() + { - return String.format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); ++ return format("%s (%s, %s)", getClass().getSimpleName(), keyspaceName, tableName); + } + - abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table); ++ abstract KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) throws UnknownHostException; + + /** + * ALTER TABLE <table> ALTER <column> TYPE <newtype>; + * + * No longer supported. + */ + public static class AlterColumn extends AlterTableStatement + { + AlterColumn(String keyspaceName, String tableName) + { + super(keyspaceName, tableName); + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + throw ire("Altering column types is no longer supported"); + } + } + + /** + * ALTER TABLE <table> ADD <column> <newtype> + * ALTER TABLE <table> ADD (<column> <newtype>, <column1> <newtype1>, ... <columnn> <newtypen>) + */ + private static class AddColumns extends AlterTableStatement + { + private static class Column + { + private final ColumnIdentifier name; + private final CQL3Type.Raw type; + private final boolean isStatic; + + Column(ColumnIdentifier name, CQL3Type.Raw type, boolean isStatic) + { + this.name = name; + this.type = type; + this.isStatic = isStatic; + } + } + + private final Collection<Column> newColumns; + + private AddColumns(String keyspaceName, String tableName, Collection<Column> newColumns) + { + super(keyspaceName, tableName); + this.newColumns = newColumns; + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + TableMetadata.Builder tableBuilder = table.unbuild(); + Views.Builder viewsBuilder = keyspace.views.unbuild(); + newColumns.forEach(c -> addColumn(keyspace, table, c, tableBuilder, viewsBuilder)); + + return keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build())) + .withSwapped(viewsBuilder.build()); + } + + private void addColumn(KeyspaceMetadata keyspace, + TableMetadata table, + Column column, + TableMetadata.Builder tableBuilder, + Views.Builder viewsBuilder) + { + ColumnIdentifier name = column.name; + AbstractType<?> type = column.type.prepare(keyspaceName, keyspace.types).getType(); + boolean isStatic = column.isStatic; + + if (null != tableBuilder.getColumn(name)) + throw ire("Column with name '%s' already exists", name); + + if (isStatic && table.clusteringColumns().isEmpty()) + throw ire("Static columns are only useful (and thus allowed) if the table has at least one clustering column"); + + ColumnMetadata droppedColumn = table.getDroppedColumn(name.bytes); + if (null != droppedColumn) + { + // After #8099, not safe to re-add columns of incompatible types - until *maybe* deser logic with dropped + // columns is pushed deeper down the line. The latter would still be problematic in cases of schema races. + if (!type.isValueCompatibleWith(droppedColumn.type)) + { + throw ire("Cannot re-add previously dropped column '%s' of type %s, incompatible with previous type %s", + name, + type.asCQL3Type(), + droppedColumn.type.asCQL3Type()); + } + + if (droppedColumn.isStatic() != isStatic) + { + throw ire("Cannot re-add previously dropped column '%s' of kind %s, incompatible with previous kind %s", + name, + isStatic ? ColumnMetadata.Kind.STATIC : ColumnMetadata.Kind.REGULAR, + droppedColumn.kind); + } + + // Cannot re-add a dropped counter column. See #7831. + if (table.isCounter()) + throw ire("Cannot re-add previously dropped counter column %s", name); + } + + if (isStatic) + tableBuilder.addStaticColumn(name, type); + else + tableBuilder.addRegularColumn(name, type); + + if (!isStatic) + { + for (ViewMetadata view : keyspace.views.forTable(table.id)) + { + if (view.includeAllColumns) + { + ColumnMetadata viewColumn = ColumnMetadata.regularColumn(view.metadata, name.bytes, type); + viewsBuilder.put(viewsBuilder.get(view.name()).withAddedRegularColumn(viewColumn)); + } + } + } + } + } + + /** + * ALTER TABLE <table> DROP <column> + * ALTER TABLE <table> DROP ( <column>, <column1>, ... <columnn>) + */ + // TODO: swap UDT refs with expanded tuples on drop + private static class DropColumns extends AlterTableStatement + { + private final Set<ColumnIdentifier> removedColumns; + private final Long timestamp; + + private DropColumns(String keyspaceName, String tableName, Set<ColumnIdentifier> removedColumns, Long timestamp) + { + super(keyspaceName, tableName); + this.removedColumns = removedColumns; + this.timestamp = timestamp; + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + TableMetadata.Builder builder = table.unbuild(); + removedColumns.forEach(c -> dropColumn(keyspace, table, c, builder)); + return keyspace.withSwapped(keyspace.tables.withSwapped(builder.build())); + } + + private void dropColumn(KeyspaceMetadata keyspace, TableMetadata table, ColumnIdentifier column, TableMetadata.Builder builder) + { + ColumnMetadata currentColumn = table.getColumn(column); + if (null == currentColumn) + throw ire("Column %s was not found in table '%s'", column, table); + + if (currentColumn.isPrimaryKeyColumn()) + throw ire("Cannot drop PRIMARY KEY column %s", column); + + /* + * Cannot allow dropping top-level columns of user defined types that aren't frozen because we cannot convert + * the type into an equivalent tuple: we only support frozen tuples currently. And as such we cannot persist + * the correct type in system_schema.dropped_columns. + */ + if (currentColumn.type.isUDT() && currentColumn.type.isMultiCell()) + throw ire("Cannot drop non-frozen column %s of user type %s", column, currentColumn.type.asCQL3Type()); + + // TODO: some day try and find a way to not rely on Keyspace/IndexManager/Index to find dependent indexes + Set<IndexMetadata> dependentIndexes = Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(currentColumn); + if (!dependentIndexes.isEmpty()) + { + throw ire("Cannot drop column %s because it has dependent secondary indexes (%s)", + currentColumn, + join(", ", transform(dependentIndexes, i -> i.name))); + } + + if (!isEmpty(keyspace.views.forTable(table.id))) + throw ire("Cannot drop column %s on base table %s with materialized views", currentColumn, table.name); + + builder.removeRegularOrStaticColumn(column); + builder.recordColumnDrop(currentColumn, getTimestamp()); + } + + /** + * @return timestamp from query, otherwise return current time in micros + */ + private long getTimestamp() + { + return timestamp == null ? ClientState.getTimestamp() : timestamp; + } + } + + /** + * ALTER TABLE <table> RENAME <column> TO <column>; + */ + private static class RenameColumns extends AlterTableStatement + { + private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns; + + private RenameColumns(String keyspaceName, String tableName, Map<ColumnIdentifier, ColumnIdentifier> renamedColumns) + { + super(keyspaceName, tableName); + this.renamedColumns = renamedColumns; + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + TableMetadata.Builder tableBuilder = table.unbuild(); + Views.Builder viewsBuilder = keyspace.views.unbuild(); + renamedColumns.forEach((o, n) -> renameColumn(keyspace, table, o, n, tableBuilder, viewsBuilder)); + + return keyspace.withSwapped(keyspace.tables.withSwapped(tableBuilder.build())) + .withSwapped(viewsBuilder.build()); + } + + private void renameColumn(KeyspaceMetadata keyspace, + TableMetadata table, + ColumnIdentifier oldName, + ColumnIdentifier newName, + TableMetadata.Builder tableBuilder, + Views.Builder viewsBuilder) + { + ColumnMetadata column = table.getExistingColumn(oldName); + if (null == column) + throw ire("Column %s was not found in table %s", oldName, table); + + if (!column.isPrimaryKeyColumn()) + throw ire("Cannot rename non PRIMARY KEY column %s", oldName); + + if (null != table.getColumn(newName)) + { + throw ire("Cannot rename column %s to %s in table '%s'; another column with that name already exists", + oldName, + newName, + table); + } + + // TODO: some day try and find a way to not rely on Keyspace/IndexManager/Index to find dependent indexes + Set<IndexMetadata> dependentIndexes = Keyspace.openAndGetStore(table).indexManager.getDependentIndexes(column); + if (!dependentIndexes.isEmpty()) + { + throw ire("Can't rename column %s because it has dependent secondary indexes (%s)", + oldName, + join(", ", transform(dependentIndexes, i -> i.name))); + } + + for (ViewMetadata view : keyspace.views.forTable(table.id)) + { + if (view.includes(oldName)) + { + viewsBuilder.put(viewsBuilder.get(view.name()).withRenamedPrimaryKeyColumn(oldName, newName)); + } + } + + tableBuilder.renamePrimaryKeyColumn(oldName, newName); + } + } + + /** + * ALTER TABLE <table> WITH <property> = <value> + */ + private static class AlterOptions extends AlterTableStatement + { + private final TableAttributes attrs; + + private AlterOptions(String keyspaceName, String tableName, TableAttributes attrs) + { + super(keyspaceName, tableName); + this.attrs = attrs; + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + attrs.validate(); + + TableParams params = attrs.asAlteredTableParams(table.params); + + if (table.isCounter() && params.defaultTimeToLive > 0) + throw ire("Cannot set default_time_to_live on a table with counters"); + + if (!isEmpty(keyspace.views.forTable(table.id)) && params.gcGraceSeconds == 0) + { + throw ire("Cannot alter gc_grace_seconds of the base table of a " + + "materialized view to 0, since this value is used to TTL " + + "undelivered updates. Setting gc_grace_seconds too low might " + + "cause undelivered updates to expire " + + "before being replayed."); + } + + if (keyspace.createReplicationStrategy().hasTransientReplicas() + && params.readRepair != ReadRepairStrategy.NONE) + { + throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces"); + } + + return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params))); + } + } + + + /** + * ALTER TABLE <table> DROP COMPACT STORAGE + */ + private static class DropCompactStorage extends AlterTableStatement + { ++ private static final Logger logger = LoggerFactory.getLogger(AlterTableStatement.class); ++ private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES); + private DropCompactStorage(String keyspaceName, String tableName) + { + super(keyspaceName, tableName); + } + + public KeyspaceMetadata apply(KeyspaceMetadata keyspace, TableMetadata table) + { + if (!table.isCompactTable()) + throw AlterTableStatement.ire("Cannot DROP COMPACT STORAGE on table without COMPACT STORAGE"); + ++ validateCanDropCompactStorage(); ++ + return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(ImmutableSet.of(TableMetadata.Flag.COMPOUND)))); + } ++ ++ /** ++ * Throws if DROP COMPACT STORAGE cannot be used (yet) because the cluster is not sufficiently upgraded. To be able ++ * to use DROP COMPACT STORAGE, we need to ensure that no pre-3.0 sstables exists in the cluster, as we won't be ++ * able to read them anymore once COMPACT STORAGE is dropped (see CASSANDRA-15897). In practice, this method checks ++ * 3 things: ++ * 1) that all nodes are on 3.0+. We need this because 2.x nodes don't advertise their sstable versions. ++ * 2) for 3.0+, we use the new (CASSANDRA-15897) sstables versions set gossiped by all nodes to ensure all ++ * sstables have been upgraded cluster-wise. ++ * 3) if the cluster still has some 3.0 nodes that predate CASSANDRA-15897, we will not have the sstable versions ++ * for them. In that case, we also refuse DROP COMPACT (even though it may well be safe at this point) and ask ++ * the user to upgrade all nodes. ++ */ ++ private void validateCanDropCompactStorage() ++ { ++ Set<InetAddressAndPort> before4 = new HashSet<>(); ++ Set<InetAddressAndPort> preC15897nodes = new HashSet<>(); ++ Set<InetAddressAndPort> with2xSStables = new HashSet<>(); ++ Splitter onComma = Splitter.on(',').omitEmptyStrings().trimResults(); ++ for (InetAddressAndPort node : StorageService.instance.getTokenMetadata().getAllEndpoints()) ++ { ++ if (MessagingService.instance().versions.knows(node) && ++ MessagingService.instance().versions.getRaw(node) < MessagingService.VERSION_40) ++ { ++ before4.add(node); ++ continue; ++ } ++ ++ String sstableVersionsString = Gossiper.instance.getApplicationState(node, ApplicationState.SSTABLE_VERSIONS); ++ if (sstableVersionsString == null) ++ { ++ preC15897nodes.add(node); ++ continue; ++ } ++ ++ try ++ { ++ boolean has2xSStables = onComma.splitToList(sstableVersionsString) ++ .stream() ++ .anyMatch(v -> v.compareTo("big-ma")<=0); ++ if (has2xSStables) ++ with2xSStables.add(node); ++ } ++ catch (IllegalArgumentException e) ++ { ++ // Means VersionType::fromString didn't parse a version correctly. Which shouldn't happen, we shouldn't ++ // have garbage in Gossip. But crashing the request is not ideal, so we log the error but ignore the ++ // node otherwise. ++ noSpamLogger.error("Unexpected error parsing sstable versions from gossip for {} (gossiped value " + ++ "is '{}'). This is a bug and should be reported. Cannot ensure that {} has no " + ++ "non-upgraded 2.x sstables anymore. If after this DROP COMPACT STORAGE some old " + ++ "sstables cannot be read anymore, please use `upgradesstables` with the " + ++ "`--force-compact-storage-on` option.", node, sstableVersionsString, node); ++ } ++ } ++ ++ if (!before4.isEmpty()) ++ throw new InvalidRequestException(format("Cannot DROP COMPACT STORAGE as some nodes in the cluster (%s) " + ++ "are not on 4.0+ yet. Please upgrade those nodes and run " + ++ "`upgradesstables` before retrying.", before4)); ++ if (!preC15897nodes.isEmpty()) ++ throw new InvalidRequestException(format("Cannot guarantee that DROP COMPACT STORAGE is safe as some nodes " + ++ "in the cluster (%s) do not have https://issues.apache.org/jira/browse/CASSANDRA-15897. " + ++ "Please upgrade those nodes and retry.", preC15897nodes)); ++ if (!with2xSStables.isEmpty()) ++ throw new InvalidRequestException(format("Cannot DROP COMPACT STORAGE as some nodes in the cluster (%s) " + ++ "has some non-upgraded 2.x sstables. Please run `upgradesstables` " + ++ "on those nodes before retrying", with2xSStables)); ++ } + } + + public static final class Raw extends CQLStatement.Raw + { + private enum Kind + { + ALTER_COLUMN, ADD_COLUMNS, DROP_COLUMNS, RENAME_COLUMNS, ALTER_OPTIONS, DROP_COMPACT_STORAGE + } + + private final QualifiedName name; + + private Kind kind; + + // ADD + private final List<AddColumns.Column> addedColumns = new ArrayList<>(); + + // DROP + private final Set<ColumnIdentifier> droppedColumns = new HashSet<>(); + private Long timestamp = null; // will use execution timestamp if not provided by query + + // RENAME + private final Map<ColumnIdentifier, ColumnIdentifier> renamedColumns = new HashMap<>(); + + // OPTIONS + public final TableAttributes attrs = new TableAttributes(); + + public Raw(QualifiedName name) + { + this.name = name; + } + + public AlterTableStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : state.getKeyspace(); + String tableName = name.getName(); + + switch (kind) + { + case ALTER_COLUMN: return new AlterColumn(keyspaceName, tableName); + case ADD_COLUMNS: return new AddColumns(keyspaceName, tableName, addedColumns); + case DROP_COLUMNS: return new DropColumns(keyspaceName, tableName, droppedColumns, timestamp); + case RENAME_COLUMNS: return new RenameColumns(keyspaceName, tableName, renamedColumns); + case ALTER_OPTIONS: return new AlterOptions(keyspaceName, tableName, attrs); + case DROP_COMPACT_STORAGE: return new DropCompactStorage(keyspaceName, tableName); + } + + throw new AssertionError(); + } + + public void alter(ColumnIdentifier name, CQL3Type.Raw type) + { + kind = Kind.ALTER_COLUMN; + } + + public void add(ColumnIdentifier name, CQL3Type.Raw type, boolean isStatic) + { + kind = Kind.ADD_COLUMNS; + addedColumns.add(new AddColumns.Column(name, type, isStatic)); + } + + public void drop(ColumnIdentifier name) + { + kind = Kind.DROP_COLUMNS; + droppedColumns.add(name); + } + + public void dropCompactStorage() + { + kind = Kind.DROP_COMPACT_STORAGE; + } + + public void timestamp(long timestamp) + { + this.timestamp = timestamp; + } + + public void rename(ColumnIdentifier from, ColumnIdentifier to) + { + kind = Kind.RENAME_COLUMNS; + renamedColumns.put(from, to); + } + + public void attrs() + { + this.kind = Kind.ALTER_OPTIONS; + } + } +} diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index f3c9a66,7169245..deece30 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -116,21 -88,17 +116,21 @@@ public class CompactionStrategyManager /** * Variables guarded by read and write lock above */ - //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object - private final List<AbstractCompactionStrategy> repaired = new ArrayList<>(); - private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>(); + private final PendingRepairHolder transientRepairs; + private final PendingRepairHolder pendingRepairs; + private final CompactionStrategyHolder repaired; + private final CompactionStrategyHolder unrepaired; + + private final ImmutableList<AbstractStrategyHolder> holders; + private volatile CompactionParams params; private DiskBoundaries currentBoundaries; -- private volatile boolean enabled = true; ++ private volatile boolean enabled; private volatile boolean isActive = true; - /** + /* We keep a copy of the schema compaction parameters here to be able to decide if we - should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER. + should update the compaction strategy in maybeReload() due to an ALTER. If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters. @@@ -744,10 -580,10 +744,11 @@@ readLock.lock(); try { + if (notification instanceof SSTableAddedNotification) { - handleFlushNotification(((SSTableAddedNotification) notification).added); + SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; + handleFlushNotification(flushedNotification.added); } else if (notification instanceof SSTableListChangedNotification) { diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 9418724,5d5714a..3d72a11 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@@ -107,7 -106,7 +107,7 @@@ public class Tracke Pair<View, View> apply(Function<View, View> function) { -- return apply(Predicates.<View>alwaysTrue(), function); ++ return apply(Predicates.alwaysTrue(), function); } Throwable apply(Function<View, View> function, Throwable accumulate) @@@ -236,7 -243,7 +244,7 @@@ public Throwable dropSSTables(Throwable accumulate) { -- return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate); ++ return dropSSTables(Predicates.alwaysTrue(), OperationType.UNKNOWN, accumulate); } /** @@@ -267,7 -274,7 +275,7 @@@ accumulate = updateSizeTracking(removed, emptySet(), accumulate); accumulate = release(selfRefs(removed), accumulate); // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion" -- accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate); ++ accumulate = notifySSTablesChanged(removed, Collections.emptySet(), txnLogs.type(), accumulate); } } catch (Throwable t) @@@ -291,13 -298,13 +299,7 @@@ */ public void removeUnreadableSSTables(final File directory) { -- maybeFail(dropSSTables(new Predicate<SSTableReader>() -- { -- public boolean apply(SSTableReader reader) -- { -- return reader.descriptor.directory.equals(directory); -- } -- }, OperationType.UNKNOWN, null)); ++ maybeFail(dropSSTables(reader -> reader.descriptor.directory.equals(directory), OperationType.UNKNOWN, null)); } @@@ -371,7 -378,7 +373,7 @@@ notifyDiscarded(memtable); // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both? - fail = notifyAdded(sstables, memtable, fail); - fail = notifyAdded(sstables, false, fail); ++ fail = notifyAdded(sstables, false, memtable, fail); if (!isDummy() && !cfstore.isValid()) dropSSTables(); @@@ -429,9 -436,14 +431,14 @@@ return accumulate; } - Throwable notifyAdded(Iterable<SSTableReader> added, Memtable memtable, Throwable accumulate) - Throwable notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables, Throwable accumulate) ++ Throwable notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables, Memtable memtable, Throwable accumulate) { - INotification notification = new SSTableAddedNotification(added, memtable); + INotification notification; + if (!isInitialSSTables) - notification = new SSTableAddedNotification(added); ++ notification = new SSTableAddedNotification(added, memtable); + else + notification = new InitialSSTableAddedNotification(added); + for (INotificationConsumer subscriber : subscribers) { try @@@ -446,9 -458,9 +453,9 @@@ return accumulate; } - public void notifyAdded(Iterable<SSTableReader> added) + void notifyAdded(Iterable<SSTableReader> added, boolean isInitialSSTables) { - maybeFail(notifyAdded(added, null, null)); - maybeFail(notifyAdded(added, isInitialSSTables, null)); ++ maybeFail(notifyAdded(added, isInitialSSTables, null, null)); } public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged) @@@ -529,8 -533,8 +536,6 @@@ @VisibleForTesting public void removeUnsafe(Set<SSTableReader> toRemove) { -- Pair<View, View> result = apply(view -> { -- return updateLiveSet(toRemove, emptySet()).apply(view); -- }); ++ Pair<View, View> result = apply(view -> updateLiveSet(toRemove, emptySet()).apply(view)); } } diff --cc src/java/org/apache/cassandra/gms/ApplicationState.java index d31f50c,70e6d9c..4e20d62 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@@ -17,10 -17,17 +17,18 @@@ */ package org.apache.cassandra.gms; + /** + * The various "states" exchanged through Gossip. + * + * <p><b>Important Note:</b> Gossip uses the ordinal of this enum in the messages it exchanges, so values in that enum + * should <i>not</i> be re-ordered or removed. The end of this enum should also always include some "padding" so that + * if newer versions add new states, old nodes that don't know about those new states don't "break" deserializing those + * states. + */ public enum ApplicationState { - STATUS, + // never remove a state here, ordering matters. + @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0 LOAD, SCHEMA, DC, @@@ -35,15 -42,22 +43,24 @@@ HOST_ID, TOKENS, RPC_READY, - // We added SSTABLE_VERSIONS in CASSANDRA-15897 in 3.0, and at the time, 3 more ApplicationState had been added - // to newer versions, so we skipped the first 3 of our original padding to ensure SSTABLE_VERSIONS can preserve - // its ordinal accross versions. - X1, - X2, - X3, + // pad to allow adding new states to existing cluster + INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports + NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS + STATUS_WITH_PORT, //Replacement for STATUS + /** + * The set of sstable versions on this node. This will usually be only the "current" sstable format (the one with + * which new sstables are written), but may contain more on newly upgraded nodes before `upgradesstable` has been + * run. + * + * <p>The value (a set of sstable {@link org.apache.cassandra.io.sstable.format.VersionAndType}) is serialized as + * a comma-separated list. + **/ + SSTABLE_VERSIONS, - // pad to allow adding new states to existing cluster + // DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states above. See CASSANDRA-16484 + X1, + X2, + X3, + X4, X5, X6, X7, diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java index 1e8604b,d6d9dfb..0242d83 --- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java @@@ -81,10 -79,10 +81,10 @@@ public class GossipDigestAckVerbHandle } /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ - Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>(); - Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>(); ++ Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<>(); for (GossipDigest gDigest : gDigestList) { - InetAddress addr = gDigest.getEndpoint(); + InetAddressAndPort addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr, localEpStatePtr); diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 7720379,69f7fee..a092c77 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -105,9 -98,7 +105,9 @@@ public class Gossiper implements IFailu SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING); SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE); } - private static List<String> ADMINISTRATIVELY_INACTIVE_STATES = Arrays.asList(VersionedValue.HIBERNATE, - VersionedValue.REMOVED_TOKEN, - VersionedValue.STATUS_LEFT); - ++ private static final List<String> ADMINISTRATIVELY_INACTIVE_STATES = Arrays.asList(VersionedValue.HIBERNATE, ++ VersionedValue.REMOVED_TOKEN, ++ VersionedValue.STATUS_LEFT); private volatile ScheduledFuture<?> scheduledGossipTask; private static final ReentrantLock taskLock = new ReentrantLock(); public final static int intervalInMillis = 1000; @@@ -124,18 -114,24 +124,18 @@@ // Maximimum difference between generation value and local time we are willing to accept about a peer static final int MAX_GENERATION_DIFFERENCE = 86400 * 365; -- private long fatClientTimeout; ++ private final long fatClientTimeout; private final Random random = new Random(); - private final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>() - { - public int compare(InetAddress addr1, InetAddress addr2) - { - return addr1.getHostAddress().compareTo(addr2.getHostAddress()); - } - }; /* subscribers for interest in EndpointState change */ -- private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>(); ++ private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<>(); /* live member set */ - private final Set<InetAddress> liveEndpoints = new ConcurrentSkipListSet<InetAddress>(inetcomparator); + @VisibleForTesting + final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>(); /* unreachable member set */ - private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>(); + private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>(); /* initial seeds for joining the cluster */ @VisibleForTesting @@@ -255,10 -200,10 +255,10 @@@ taskLock.lock(); /* Update the local heartbeat counter. */ - endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); + endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat(); if (logger.isTraceEnabled()) - logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion()); - final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion()); - final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); ++ final List<GossipDigest> gDigests = new ArrayList<>(); Gossiper.instance.makeRandomGossipDigest(gDigests); if (gDigests.size() > 0) @@@ -662,7 -578,7 +662,7 @@@ for (GossipDigest gDigest : gDigests) { sb.append(gDigest); -- sb.append(" "); ++ sb.append(' '); } logger.trace("Gossip Digests are : {}", sb); } @@@ -738,10 -652,10 +738,10 @@@ */ public void assassinateEndpoint(String address) throws UnknownHostException { - InetAddress endpoint = InetAddress.getByName(address); + InetAddressAndPort endpoint = InetAddressAndPort.getByName(address); runInGossipStageBlocking(() -> { EndpointState epState = endpointStateMap.get(endpoint); -- Collection<Token> tokens = null; ++ Collection<Token> tokens; logger.warn("Assassinating {} via gossip", endpoint); if (epState == null) @@@ -1076,7 -953,25 +1076,25 @@@ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value); } + /** + * The value for the provided application state for the provided endpoint as currently known by this Gossip instance. + * + * @param endpoint the endpoint from which to get the endpoint state. + * @param state the endpoint state to get. + * @return the value of the application state {@code state} for {@code endpoint}, or {@code null} if either + * {@code endpoint} is not known by Gossip or has no value for {@code state}. + */ - public String getApplicationState(InetAddress endpoint, ApplicationState state) ++ public String getApplicationState(InetAddressAndPort endpoint, ApplicationState state) + { + EndpointState epState = endpointStateMap.get(endpoint); + if (epState == null) + return null; + + VersionedValue value = epState.getApplicationState(state); + return value == null ? null : value.value; + } + - EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) + EndpointState getStateForVersionBiggerThan(InetAddressAndPort forEndpoint, int version) { EndpointState epState = endpointStateMap.get(forEndpoint); EndpointState reqdEndpointState = null; @@@ -1653,7 -1456,7 +1671,7 @@@ public void start(int generationNumber) { -- start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class)); ++ start(generationNumber, new EnumMap<>(ApplicationState.class)); } /** @@@ -1717,7 -1519,7 +1735,7 @@@ seedsInShadowRound.clear(); endpointShadowStateMap.clear(); // send a completely empty syn -- List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); ++ List<GossipDigest> gDigests = new ArrayList<>(); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); @@@ -1782,67 -1586,6 +1800,67 @@@ } } + /** + * JMX interface for triggering an update of the seed node list. + */ + public List<String> reloadSeeds() + { + logger.trace("Triggering reload of seed node list"); + + // Get the new set in the same that buildSeedsList does + Set<InetAddressAndPort> tmp = new HashSet<>(); + try + { + for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds()) + { + if (seed.equals(FBUtilities.getBroadcastAddressAndPort())) + continue; + tmp.add(seed); + } + } + // If using the SimpleSeedProvider invalid yaml added to the config since startup could + // cause this to throw. Additionally, third party seed providers may throw exceptions. + // Handle the error and return a null to indicate that there was a problem. + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.warn("Error while getting seed node list: {}", e.getLocalizedMessage()); + return null; + } + + if (tmp.size() == 0) + { + logger.trace("New seed node list is empty. Not updating seed list."); + return getSeeds(); + } + + if (tmp.equals(seeds)) + { + logger.trace("New seed node list matches the existing list."); + return getSeeds(); + } + + // Add the new entries + seeds.addAll(tmp); + // Remove the old entries + seeds.retainAll(tmp); + logger.trace("New seed node list after reload {}", seeds); + return getSeeds(); + } + + /** + * JMX endpoint for getting the list of seeds from the node + */ + public List<String> getSeeds() + { - List<String> seedList = new ArrayList<String>(); ++ List<String> seedList = new ArrayList<>(); + for (InetAddressAndPort seed : seeds) + { + seedList.add(seed.toString()); + } + return seedList; + } + // initialize local HB state if needed, i.e., if gossiper has never been started before. public void maybeInitializeLocalState(int generationNbr) { @@@ -2072,29 -1793,8 +2090,29 @@@ return state != null ? state.getReleaseVersion() : null; } + public Map<String, List<String>> getReleaseVersionsWithPort() + { - Map<String, List<String>> results = new HashMap<String, List<String>>(); ++ Map<String, List<String>> results = new HashMap<>(); + Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); + + for (InetAddressAndPort host : allHosts) + { + CassandraVersion version = getReleaseVersion(host); + String stringVersion = version == null ? "" : version.toString(); + List<String> hosts = results.get(stringVersion); + if (hosts == null) + { + hosts = new ArrayList<>(); + results.put(stringVersion, hosts); + } + hosts.add(host.getHostAddressAndPort()); + } + + return results; + } + @Nullable - public UUID getSchemaVersion(InetAddress ep) + public UUID getSchemaVersion(InetAddressAndPort ep) { EndpointState state = getEndpointStateForEndpoint(ep); return state != null ? state.getSchemaVersion() : null; diff --cc src/java/org/apache/cassandra/gms/VersionedValue.java index 7c54559,ff3c48b..880cb98 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@@ -31,9 -33,10 +33,10 @@@ import org.apache.cassandra.db.TypeSize import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.format.Version; + import org.apache.cassandra.io.sstable.format.VersionAndType; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang3.StringUtils; @@@ -109,7 -111,7 +112,7 @@@ public class VersionedValue implements @Override public String toString() { -- return "Value(" + value + "," + version + ")"; ++ return "Value(" + value + ',' + version + ')'; } public byte[] toBytes() diff --cc src/java/org/apache/cassandra/io/sstable/Descriptor.java index dc67451,7950e7b..b781ebf --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@@ -77,10 -74,16 +77,16 @@@ public class Descripto */ public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) { - this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType())); + this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType); } + @VisibleForTesting + public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) + { - this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType, Component.digestFor(formatType.info.getLatestVersion().uncompressedChecksumType())); ++ this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType); + } + - public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType, Component digestComponent) + public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType) { assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass()); this.version = version; diff --cc src/java/org/apache/cassandra/io/sstable/format/Version.java index 0e9e303,2b9dcbd..501ae85 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@@ -97,7 -105,7 +97,6 @@@ public abstract class Versio return version; } -- @Override public boolean equals(Object o) { diff --cc src/java/org/apache/cassandra/schema/Schema.java index c04c631,0000000..c5c1f36 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@@ -1,890 -1,0 +1,891 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + ++import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Sets; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +import static java.lang.String.format; + +import static com.google.common.collect.Iterables.size; + +public final class Schema implements SchemaProvider +{ + public static final Schema instance = new Schema(); + + private volatile Keyspaces keyspaces = Keyspaces.none(); + + // UUID -> mutable metadata ref map. We have to update these in place every time a table changes. + private final Map<TableId, TableMetadataRef> metadataRefs = new NonBlockingHashMap<>(); + + // (keyspace name, index name) -> mutable metadata ref map. We have to update these in place every time an index changes. + private final Map<Pair<String, String>, TableMetadataRef> indexMetadataRefs = new NonBlockingHashMap<>(); + + // Keyspace objects, one per keyspace. Only one instance should ever exist for any given keyspace. + private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<>(); + + private volatile UUID version; + + private final List<SchemaChangeListener> changeListeners = new CopyOnWriteArrayList<>(); + + /** + * Initialize empty schema object and load the hardcoded system tables + */ + private Schema() + { + if (DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized()) + { + load(SchemaKeyspace.metadata()); + load(SystemKeyspace.metadata()); + } + } + + /** + * load keyspace (keyspace) definitions, but do not initialize the keyspace instances. + * Schema version may be updated as the result. + */ + public void loadFromDisk() + { + loadFromDisk(true); + } + + /** + * Load schema definitions from disk. + * + * @param updateVersion true if schema version needs to be updated + */ + public void loadFromDisk(boolean updateVersion) + { + SchemaDiagnostics.schemataLoading(this); + SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load); + if (updateVersion) + updateVersion(); + SchemaDiagnostics.schemataLoaded(this); + } + + /** + * Update (or insert) new keyspace definition + * + * @param ksm The metadata about keyspace + */ + synchronized public void load(KeyspaceMetadata ksm) + { + KeyspaceMetadata previous = keyspaces.getNullable(ksm.name); + + if (previous == null) + loadNew(ksm); + else + reload(previous, ksm); + + keyspaces = keyspaces.withAddedOrUpdated(ksm); + } + + private void loadNew(KeyspaceMetadata ksm) + { + ksm.tablesAndViews() + .forEach(metadata -> metadataRefs.put(metadata.id, new TableMetadataRef(metadata))); + + ksm.tables + .indexTables() + .forEach((name, metadata) -> indexMetadataRefs.put(Pair.create(ksm.name, name), new TableMetadataRef(metadata))); + + SchemaDiagnostics.metadataInitialized(this, ksm); + } + + private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated) + { + Keyspace keyspace = getKeyspaceInstance(updated.name); + if (null != keyspace) + keyspace.setMetadata(updated); + + Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, updated.tables); + Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views); + + MapDifference<String, TableMetadata> indexesDiff = previous.tables.indexesDiff(updated.tables); + + // clean up after removed entries + tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id)); + viewsDiff.dropped.forEach(view -> metadataRefs.remove(view.metadata.id)); + + indexesDiff.entriesOnlyOnLeft() + .values() + .forEach(indexTable -> indexMetadataRefs.remove(Pair.create(indexTable.keyspace, indexTable.indexName().get()))); + + // load up new entries + tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table))); + viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata))); + + indexesDiff.entriesOnlyOnRight() + .values() + .forEach(indexTable -> indexMetadataRefs.put(Pair.create(indexTable.keyspace, indexTable.indexName().get()), new TableMetadataRef(indexTable))); + + // refresh refs to updated ones + tablesDiff.altered.forEach(diff -> metadataRefs.get(diff.after.id).set(diff.after)); + viewsDiff.altered.forEach(diff -> metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata)); + + indexesDiff.entriesDiffering() + .values() + .stream() + .map(MapDifference.ValueDifference::rightValue) + .forEach(indexTable -> indexMetadataRefs.get(Pair.create(indexTable.keyspace, indexTable.indexName().get())).set(indexTable)); + + SchemaDiagnostics.metadataReloaded(this, previous, updated, tablesDiff, viewsDiff, indexesDiff); + } + + public void registerListener(SchemaChangeListener listener) + { + changeListeners.add(listener); + } + + @SuppressWarnings("unused") + public void unregisterListener(SchemaChangeListener listener) + { + changeListeners.remove(listener); + } + + /** + * Get keyspace instance by name + * + * @param keyspaceName The name of the keyspace + * + * @return Keyspace object or null if keyspace was not found + */ + @Override + public Keyspace getKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.get(keyspaceName); + } + + public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id) + { + TableMetadata metadata = getTableMetadata(id); + if (metadata == null) + return null; + + Keyspace instance = getKeyspaceInstance(metadata.keyspace); + if (instance == null) + return null; + + return instance.hasColumnFamilyStore(metadata.id) + ? instance.getColumnFamilyStore(metadata.id) + : null; + } + + /** + * Store given Keyspace instance to the schema + * + * @param keyspace The Keyspace instance to store + * + * @throws IllegalArgumentException if Keyspace is already stored + */ + @Override + public void storeKeyspaceInstance(Keyspace keyspace) + { + if (keyspaceInstances.putIfAbsent(keyspace.getName(), keyspace) != null) + throw new IllegalArgumentException(String.format("Keyspace %s was already initialized.", keyspace.getName())); + } + + /** + * Remove keyspace from schema + * + * @param keyspaceName The name of the keyspace to remove + * + * @return removed keyspace instance or null if it wasn't found + */ + public Keyspace removeKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.remove(keyspaceName); + } + + public Keyspaces snapshot() + { + return keyspaces; + } + + /** + * Remove keyspace definition from system + * + * @param ksm The keyspace definition to remove + */ + synchronized void unload(KeyspaceMetadata ksm) + { + keyspaces = keyspaces.without(ksm.name); + + ksm.tablesAndViews() + .forEach(t -> metadataRefs.remove(t.id)); + + ksm.tables + .indexTables() + .keySet() + .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, name))); + + SchemaDiagnostics.metadataRemoved(this, ksm); + } + + public int getNumberOfTables() + { + return keyspaces.stream().mapToInt(k -> size(k.tablesAndViews())).sum(); + } + + public ViewMetadata getView(String keyspaceName, String viewName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + return (ksm == null) ? null : ksm.views.getNullable(viewName); + } + + /** + * Get metadata about keyspace by its name + * + * @param keyspaceName The name of the keyspace + * + * @return The keyspace metadata or null if it wasn't found + */ + @Override + public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName) + { + assert keyspaceName != null; + KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName); + return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName); + } + + private Set<String> getNonSystemKeyspacesSet() + { + return Sets.difference(keyspaces.names(), SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES); + } + + /** + * @return collection of the non-system keyspaces (note that this count as system only the + * non replicated keyspaces, so keyspace like system_traces which are replicated are actually + * returned. See getUserKeyspace() below if you don't want those) + */ + public ImmutableList<String> getNonSystemKeyspaces() + { + return ImmutableList.copyOf(getNonSystemKeyspacesSet()); + } + + /** + * @return a collection of keyspaces that do not use LocalStrategy for replication + */ + public List<String> getNonLocalStrategyKeyspaces() + { + return keyspaces.stream() + .filter(keyspace -> keyspace.params.replication.klass != LocalStrategy.class) + .map(keyspace -> keyspace.name) + .collect(Collectors.toList()); + } + + /** + * @return collection of the user defined keyspaces + */ + public List<String> getUserKeyspaces() + { + return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES)); + } + + /** + * Get metadata about keyspace inner ColumnFamilies + * + * @param keyspaceName The name of the keyspace + * + * @return metadata about ColumnFamilies the belong to the given keyspace + */ + public Iterable<TableMetadata> getTablesAndViews(String keyspaceName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + assert ksm != null; + return ksm.tablesAndViews(); + } + + /** + * @return collection of the all keyspace names registered in the system (system and non-system) + */ + public Set<String> getKeyspaces() + { + return keyspaces.names(); + } + + /* TableMetadata/Ref query/control methods */ + + /** + * Given a keyspace name and table/view name, get the table metadata + * reference. If the keyspace name or table/view name is not present + * this method returns null. + * + * @return TableMetadataRef object or null if it wasn't found + */ + @Override + public TableMetadataRef getTableMetadataRef(String keyspace, String table) + { + TableMetadata tm = getTableMetadata(keyspace, table); + return tm == null + ? null + : metadataRefs.get(tm.id); + } + + public TableMetadataRef getIndexTableMetadataRef(String keyspace, String index) + { + return indexMetadataRefs.get(Pair.create(keyspace, index)); + } + + Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs() + { + return indexMetadataRefs; + } + + /** + * Get Table metadata by its identifier + * + * @param id table or view identifier + * + * @return metadata about Table or View + */ + @Override + public TableMetadataRef getTableMetadataRef(TableId id) + { + return metadataRefs.get(id); + } + + @Override + public TableMetadataRef getTableMetadataRef(Descriptor descriptor) + { + return getTableMetadataRef(descriptor.ksname, descriptor.cfname); + } + + Map<TableId, TableMetadataRef> getTableMetadataRefs() + { + return metadataRefs; + } + + /** + * Given a keyspace name and table name, get the table + * meta data. If the keyspace name or table name is not valid + * this function returns null. + * + * @param keyspace The keyspace name + * @param table The table name + * + * @return TableMetadata object or null if it wasn't found + */ + public TableMetadata getTableMetadata(String keyspace, String table) + { + assert keyspace != null; + assert table != null; + + KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace); + return ksm == null + ? null + : ksm.getTableOrViewNullable(table); + } + + @Override + public TableMetadata getTableMetadata(TableId id) + { + TableMetadata table = keyspaces.getTableOrViewNullable(id); + return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id); + } + + public TableMetadata validateTable(String keyspaceName, String tableName) + { + if (tableName.isEmpty()) + throw new InvalidRequestException("non-empty table is required"); + + KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName); + if (keyspace == null) + throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName)); + + TableMetadata metadata = keyspace.getTableOrViewNullable(tableName); + if (metadata == null) + throw new InvalidRequestException(format("table %s does not exist", tableName)); + + return metadata; + } + + public TableMetadata getTableMetadata(Descriptor descriptor) + { + return getTableMetadata(descriptor.ksname, descriptor.cfname); + } + + /* Function helpers */ + + /** + * Get all function overloads with the specified name + * + * @param name fully qualified function name + * @return an empty list if the keyspace or the function name are not found; + * a non-empty collection of {@link Function} otherwise + */ + public Collection<Function> getFunctions(FunctionName name) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully qualified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Collections.emptyList() + : ksm.functions.get(name); + } + + /** + * Find the function with the specified name + * + * @param name fully qualified function name + * @param argTypes function argument types + * @return an empty {@link Optional} if the keyspace or the function name are not found; + * a non-empty optional of {@link Function} otherwise + */ + public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Optional.empty() + : ksm.functions.find(name, argTypes); + } + + /* Version control */ + + /** + * @return current schema version + */ + public UUID getVersion() + { + return version; + } + + /** + * Checks whether the given schema version is the same as the current local schema. + */ + public boolean isSameVersion(UUID schemaVersion) + { + return schemaVersion != null && schemaVersion.equals(version); + } + + /** + * Checks whether the current schema is empty. + */ + public boolean isEmpty() + { + return SchemaConstants.emptyVersion.equals(version); + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public void updateVersion() + { + version = SchemaKeyspace.calculateSchemaDigest(); + SystemKeyspace.updateSchemaVersion(version); + SchemaDiagnostics.versionUpdated(this); + } + + /* + * Like updateVersion, but also announces via gossip + */ + public void updateVersionAndAnnounce() + { + updateVersion(); + passiveAnnounceVersion(); + } + + /** + * Announce my version passively over gossip. + * Used to notify nodes as they arrive in the cluster. + */ + private void passiveAnnounceVersion() + { + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + SchemaDiagnostics.versionAnnounced(this); + } + + /** + * Clear all KS/CF metadata and reset version. + */ + public synchronized void clear() + { + getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k))); + updateVersionAndAnnounce(); + SchemaDiagnostics.schemataCleared(this); + } + + /* + * Reload schema from local disk. Useful if a user made changes to schema tables by hand, or has suspicion that + * in-memory representation got out of sync somehow with what's on disk. + */ + public synchronized void reloadSchemaAndAnnounceVersion() + { + Keyspaces before = keyspaces.filter(k -> !SchemaConstants.isLocalSystemKeyspace(k.name)); + Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces(); + merge(Keyspaces.diff(before, after)); + updateVersionAndAnnounce(); + } + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + */ + public synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations) + { + merge(mutations); + updateVersionAndAnnounce(); + } + - public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now) ++ public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now) throws UnknownHostException + { + KeyspacesDiff diff; + try + { + Keyspaces before = keyspaces; + Keyspaces after = transformation.apply(before); + diff = Keyspaces.diff(before, after); + } + catch (RuntimeException e) + { + return new TransformationResult(e); + } + + if (diff.isEmpty()) + return new TransformationResult(diff, Collections.emptyList()); + + Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, now); + SchemaKeyspace.applyChanges(mutations); + + merge(diff); + updateVersion(); + if (!locally) + passiveAnnounceVersion(); + + return new TransformationResult(diff, mutations); + } + + public static final class TransformationResult + { + public final boolean success; + public final RuntimeException exception; + public final KeyspacesDiff diff; + public final Collection<Mutation> mutations; + + private TransformationResult(boolean success, RuntimeException exception, KeyspacesDiff diff, Collection<Mutation> mutations) + { + this.success = success; + this.exception = exception; + this.diff = diff; + this.mutations = mutations; + } + + TransformationResult(RuntimeException exception) + { + this(false, exception, null, null); + } + + TransformationResult(KeyspacesDiff diff, Collection<Mutation> mutations) + { + this(true, null, diff, mutations); + } + } + + synchronized void merge(Collection<Mutation> mutations) + { + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(mutations); + + // fetch the current state of schema for the affected keyspaces only + Keyspaces before = keyspaces.filter(k -> affectedKeyspaces.contains(k.name)); + + // apply the schema mutations + SchemaKeyspace.applyChanges(mutations); + + // apply the schema mutations and fetch the new versions of the altered keyspaces + Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces); + + merge(Keyspaces.diff(before, after)); + } + + private void merge(KeyspacesDiff diff) + { + diff.dropped.forEach(this::dropKeyspace); + diff.created.forEach(this::createKeyspace); + diff.altered.forEach(this::alterKeyspace); + } + + private void alterKeyspace(KeyspaceDiff delta) + { + SchemaDiagnostics.keyspaceAltering(this, delta); + + // drop tables and views + delta.views.dropped.forEach(this::dropView); + delta.tables.dropped.forEach(this::dropTable); + + load(delta.after); + + // add tables and views + delta.tables.created.forEach(this::createTable); + delta.views.created.forEach(this::createView); + + // update tables and views + delta.tables.altered.forEach(diff -> alterTable(diff.after)); + delta.views.altered.forEach(diff -> alterView(diff.after)); + + // deal with all added, and altered views + Keyspace.open(delta.after.name).viewManager.reload(true); + + // notify on everything dropped + delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) uda)); + delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) udf)); + delta.views.dropped.forEach(this::notifyDropView); + delta.tables.dropped.forEach(this::notifyDropTable); + delta.types.dropped.forEach(this::notifyDropType); + + // notify on everything created + delta.types.created.forEach(this::notifyCreateType); + delta.tables.created.forEach(this::notifyCreateTable); + delta.views.created.forEach(this::notifyCreateView); + delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) udf)); + delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) uda)); + + // notify on everything altered + if (!delta.before.params.equals(delta.after.params)) + notifyAlterKeyspace(delta.before, delta.after); + delta.types.altered.forEach(diff -> notifyAlterType(diff.before, diff.after)); + delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, diff.after)); + delta.views.altered.forEach(diff -> notifyAlterView(diff.before, diff.after)); + delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, diff.after)); + delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after)); + SchemaDiagnostics.keyspaceAltered(this, delta); + } + + private void createKeyspace(KeyspaceMetadata keyspace) + { + SchemaDiagnostics.keyspaceCreating(this, keyspace); + load(keyspace); + Keyspace.open(keyspace.name); + + notifyCreateKeyspace(keyspace); + keyspace.types.forEach(this::notifyCreateType); + keyspace.tables.forEach(this::notifyCreateTable); + keyspace.views.forEach(this::notifyCreateView); + keyspace.functions.udfs().forEach(this::notifyCreateFunction); + keyspace.functions.udas().forEach(this::notifyCreateAggregate); + SchemaDiagnostics.keyspaceCreated(this, keyspace); + } + + private void dropKeyspace(KeyspaceMetadata keyspace) + { + SchemaDiagnostics.keyspaceDroping(this, keyspace); + keyspace.views.forEach(this::dropView); + keyspace.tables.forEach(this::dropTable); + + // remove the keyspace from the static instances. + Keyspace.clear(keyspace.name); + unload(keyspace); + Keyspace.writeOrder.awaitNewBarrier(); + + keyspace.functions.udas().forEach(this::notifyDropAggregate); + keyspace.functions.udfs().forEach(this::notifyDropFunction); + keyspace.views.forEach(this::notifyDropView); + keyspace.tables.forEach(this::notifyDropTable); + keyspace.types.forEach(this::notifyDropType); + notifyDropKeyspace(keyspace); + SchemaDiagnostics.keyspaceDroped(this, keyspace); + } + + private void dropView(ViewMetadata metadata) + { + Keyspace.open(metadata.keyspace()).viewManager.dropView(metadata.name()); + dropTable(metadata.metadata); + } + + private void dropTable(TableMetadata metadata) + { + SchemaDiagnostics.tableDropping(this, metadata); + ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name); + assert cfs != null; + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), (sstable) -> true, true); + if (DatabaseDescriptor.isAutoSnapshot()) + cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX)); + CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); + Keyspace.open(metadata.keyspace).dropCf(metadata.id); + SchemaDiagnostics.tableDropped(this, metadata); + } + + private void createTable(TableMetadata table) + { + SchemaDiagnostics.tableCreating(this, table); + Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true); + SchemaDiagnostics.tableCreated(this, table); + } + + private void createView(ViewMetadata view) + { + Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true); + } + + private void alterTable(TableMetadata updated) + { + SchemaDiagnostics.tableAltering(this, updated); + Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload(); + SchemaDiagnostics.tableAltered(this, updated); + } + + private void alterView(ViewMetadata updated) + { + Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload(); + } + + private void notifyCreateKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name)); + } + + private void notifyCreateTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, metadata.name)); + } + + private void notifyCreateView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onCreateView(view.keyspace(), view.name())); + } + + private void notifyCreateType(UserType ut) + { + changeListeners.forEach(l -> l.onCreateType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyCreateFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyCreateAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyAlterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after) + { + changeListeners.forEach(l -> l.onAlterKeyspace(after.name)); + } + + private void notifyAlterTable(TableMetadata before, TableMetadata after) + { + boolean changeAffectedPreparedStatements = before.changeAffectsPreparedStatements(after); + changeListeners.forEach(l -> l.onAlterTable(after.keyspace, after.name, changeAffectedPreparedStatements)); + } + + private void notifyAlterView(ViewMetadata before, ViewMetadata after) + { + boolean changeAffectedPreparedStatements = before.metadata.changeAffectsPreparedStatements(after.metadata); + changeListeners.forEach(l ->l.onAlterView(after.keyspace(), after.name(), changeAffectedPreparedStatements)); + } + + private void notifyAlterType(UserType before, UserType after) + { + changeListeners.forEach(l -> l.onAlterType(after.keyspace, after.getNameAsString())); + } + + private void notifyAlterFunction(UDFunction before, UDFunction after) + { + changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, after.name().name, after.argTypes())); + } + + private void notifyAlterAggregate(UDAggregate before, UDAggregate after) + { + changeListeners.forEach(l -> l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes())); + } + + private void notifyDropKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onDropKeyspace(ksm.name)); + } + + private void notifyDropTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, metadata.name)); + } + + private void notifyDropView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onDropView(view.keyspace(), view.name())); + } + + private void notifyDropType(UserType ut) + { + changeListeners.forEach(l -> l.onDropType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyDropFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyDropAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + + /** + * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null} + * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema. + */ + public static String schemaVersionToString(UUID version) + { + return version == null + ? "unknown" + : SchemaConstants.emptyVersion.equals(version) + ? "(empty)" + : version.toString(); + } +} diff --cc src/java/org/apache/cassandra/schema/SchemaTransformation.java index c19ac7c,0000000..e2290a3 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java +++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java @@@ -1,31 -1,0 +1,33 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + ++import java.net.UnknownHostException; ++ +public interface SchemaTransformation +{ + /** + * Apply a statement transformation to a schema snapshot. + * + * Implementing methods should be side-effect free. + * + * @param schema Keyspaces to base the transformation on + * @return Keyspaces transformed by the statement + */ - Keyspaces apply(Keyspaces schema); ++ Keyspaces apply(Keyspaces schema) throws UnknownHostException; +} diff --cc src/java/org/apache/cassandra/service/StartupChecks.java index 85b5836,cb10ab4..dadb0c5 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@@ -35,16 -35,11 +35,14 @@@ import com.google.common.collect.Iterab import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; +import net.jpountz.lz4.LZ4Factory; - import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; - import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.SchemaConstants; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SystemKeyspace; @@@ -55,12 -52,8 +53,14 @@@ import org.apache.cassandra.io.util.Fil import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JavaUtils; ++import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.SigarLibrary; ++import static java.lang.String.format; +import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION; +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME; + /** * Verifies that the system and environment is in a fit state to be started. * Used in CassandraDaemon#setup() to check various settings and invariants. @@@ -83,7 -76,7 +83,6 @@@ public class StartupChecks { private static final Logger logger = LoggerFactory.getLogger(StartupChecks.class); -- // List of checks to run before starting up. If any test reports failure, startup will be halted. private final List<StartupCheck> preFlightChecks = new ArrayList<>(); diff --cc src/java/org/apache/cassandra/service/StorageService.java index 8c5437c,65596aa..e0ac1b6 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -81,8 -71,11 +81,10 @@@ import org.apache.cassandra.dht.Range import org.apache.cassandra.dht.Token.TokenFactory; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.gms.*; -import org.apache.cassandra.hints.HintVerbHandler; import org.apache.cassandra.hints.HintsService; import org.apache.cassandra.io.sstable.SSTableLoader; + import org.apache.cassandra.io.sstable.format.SSTableFormat; + import org.apache.cassandra.io.sstable.format.VersionAndType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.StorageMetrics; @@@ -323,6 -293,44 +327,8 @@@ public class StorageService extends Not jmxObjectName = "org.apache.cassandra.db:type=StorageService"; MBeanWrapper.instance.registerMBean(this, jmxObjectName); MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME); + - legacyProgressSupport = new LegacyJMXProgressSupport(this, jmxObjectName); - - /* register the verb handlers */ - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PROPOSE, new ProposeVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.HINT, new HintVerbHandler()); - - // see BootStrapper for a summary of how the bootstrap verbs interact - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPAIR_MESSAGE, new RepairMessageVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_SHUTDOWN, new GossipShutdownVerbHandler()); - - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler()); - - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.DEFINITIONS_UPDATE, new DefinitionsUpdateVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler()); - - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler()); - - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler()); - + sstablesTracker = new SSTablesGlobalTracker(SSTableFormat.Type.current()); } public void registerDaemon(CassandraDaemon daemon) @@@ -940,13 -899,18 +946,14 @@@ // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. - getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress()); + getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddressAndPort()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId)); - appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress())); + appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort())); + appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress())); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); + appStates.put(ApplicationState.SSTABLE_VERSIONS, valueFactory.sstableVersions(sstablesTracker.versionsInUse())); - // load the persisted ring state. This used to be done earlier in the init process, - // but now we always perform a shadow round when preparing to join and we have to - // clear endpoint states after doing that. - loadRingState(); - logger.info("Starting up server gossip"); Gossiper.instance.register(this); Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java index e94c2c4,0000000..8d051d3 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStorage3to4UpgradeTest.java @@@ -1,57 -1,0 +1,61 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.shared.Versions; + ++import static org.apache.cassandra.distributed.api.Feature.GOSSIP; ++import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; ++import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + +public class CompactStorage3to4UpgradeTest extends UpgradeTestBase +{ + public static final String TABLE_NAME = "cs_tbl"; + + @Test + public void testNullClusteringValues() throws Throwable + { + new TestCase().nodes(1) + .upgrade(Versions.Major.v30, Versions.Major.v4) ++ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .setup(cluster -> { + String create = "CREATE TABLE %s.%s(k int, c1 int, c2 int, v int, PRIMARY KEY (k, c1, c2)) " + + "WITH compaction = { 'class':'LeveledCompactionStrategy', 'enabled':'false'} AND COMPACT STORAGE"; + cluster.schemaChange(String.format(create, KEYSPACE, TABLE_NAME)); + + String insert = "INSERT INTO %s.%s(k, c1, v) values (?, ?, ?)"; + cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 1, 1, 1); + cluster.get(1).flush(KEYSPACE); + + cluster.get(1).executeInternal(String.format(insert, KEYSPACE, TABLE_NAME), 2, 2, 2); + cluster.get(1).flush(KEYSPACE); + + cluster.schemaChange(String.format("ALTER TABLE %s.%s DROP COMPACT STORAGE", KEYSPACE, TABLE_NAME)); + }) + .runAfterNodeUpgrade((cluster, node) -> { + cluster.get(1).forceCompact(KEYSPACE, TABLE_NAME); + Object[][] actual = cluster.get(1).executeInternal(String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE_NAME)); + assertRows(actual, new Object[] {1, 1, null, 1}, new Object[] {2, 2, null, 2}); + }) + .run(); + } +} diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java index 46a7b1d,0000000..bac94a1 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CompactTableTest.java @@@ -1,116 -1,0 +1,115 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.validation.operations; + +import java.util.concurrent.ConcurrentMap; - import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaChangeListener; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CompactTableTest extends CQLTester +{ + @Test + public void dropCompactStorageTest() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk)) WITH COMPACT STORAGE;"); + execute("INSERT INTO %s (pk, ck) VALUES (1, 1)"); + alterTable("ALTER TABLE %s DROP COMPACT STORAGE"); + assertRows(execute( "SELECT * FROM %s"), + row(1, null, 1, null)); + + createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;"); + execute("INSERT INTO %s (pk, ck) VALUES (1, 1)"); + alterTable("ALTER TABLE %s DROP COMPACT STORAGE"); + assertRows(execute( "SELECT * FROM %s"), + row(1, 1, null)); + + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;"); + execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)"); + alterTable("ALTER TABLE %s DROP COMPACT STORAGE"); + assertRows(execute( "SELECT * FROM %s"), + row(1, 1, 1)); + } + + @Test + public void dropCompactStorageShouldInvalidatePreparedStatements() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE;"); + execute("INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)"); + String templateSelect = "SELECT * FROM %s WHERE pk = 1"; + assertRows(execute(templateSelect), row(1, 1, 1)); + + // Verify that the prepared statement has been added to the cache after our first query. + String formattedQuery = formatQuery(templateSelect); + ConcurrentMap<String, QueryHandler.Prepared> original = QueryProcessor.getInternalStatements(); + assertTrue(original.containsKey(formattedQuery)); + + // Verify that schema change listeners are told statements are affected with DROP COMPACT STORAGE. + SchemaChangeListener listener = new SchemaChangeListener() + { + public void onAlterTable(String keyspace, String table, boolean affectsStatements) + { + assertTrue(affectsStatements); + } + }; + + Schema.instance.registerListener(listener); + + try + { + alterTable("ALTER TABLE %s DROP COMPACT STORAGE"); + ConcurrentMap<String, QueryHandler.Prepared> postDrop = QueryProcessor.getInternalStatements(); + + // Verify that the prepared statement has been removed the cache after DROP COMPACT STORAGE. + assertFalse(postDrop.containsKey(formattedQuery)); + + // Verify that the prepared statement has been added back to the cache after our second query. + assertRows(execute(templateSelect), row(1, 1, 1)); + ConcurrentMap<String, QueryHandler.Prepared> postQuery = QueryProcessor.getInternalStatements(); + assertTrue(postQuery.containsKey(formattedQuery)); + } + finally + { + // Clean up the listener so this doesn't fail other tests. + Schema.instance.unregisterListener(listener); + } + } + + @Test + public void compactStorageSemanticsTest() throws Throwable + { + createTable("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE"); + execute("INSERT INTO %s (pk, ck) VALUES (?, ?)", 1, 1); + execute("DELETE FROM %s WHERE pk = ? AND ck = ?", 1, 1); + assertEmpty(execute("SELECT * FROM %s WHERE pk = ?", 1)); + + createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2)) WITH COMPACT STORAGE"); + execute("INSERT INTO %s (pk, ck1, v) VALUES (?, ?, ?)", 2, 2, 2); + assertRows(execute("SELECT * FROM %s WHERE pk = ?",2), + row(2, 2, null, 2)); + } +} diff --cc test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java index 910445f,7705d8b..4390b20 --- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java +++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java @@@ -148,17 -145,22 +148,22 @@@ public class TrackerTes @Test public void testAddInitialSSTables() { - ColumnFamilyStore cfs = MockSchema.newCFS(); + ColumnFamilyStore cfs = MockSchema.newCFS(metadata -> metadata.caching(CachingParams.CACHE_KEYS)); Tracker tracker = cfs.getTracker(); + MockListener listener = new MockListener(false); + tracker.subscribe(listener); List<SSTableReader> readers = ImmutableList.of(MockSchema.sstable(0, 17, cfs), MockSchema.sstable(1, 121, cfs), MockSchema.sstable(2, 9, cfs)); tracker.addInitialSSTables(copyOf(readers)); Assert.assertEquals(3, tracker.view.get().sstables.size()); + Assert.assertEquals(1, listener.senders.size()); + Assert.assertEquals(1, listener.received.size()); + Assert.assertTrue(listener.received.get(0) instanceof InitialSSTableAddedNotification); for (SSTableReader reader : readers) - Assert.assertTrue(reader.isKeyCacheSetup()); + Assert.assertTrue(reader.isKeyCacheEnabled()); Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount()); } @@@ -369,9 -367,8 +376,9 @@@ MockListener failListener = new MockListener(true); tracker.subscribe(failListener); tracker.subscribe(listener); - Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null, null)); - Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null)); ++ Assert.assertNotNull(tracker.notifyAdded(singleton(r1), false, null, null)); Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added); + Assert.assertFalse(((SSTableAddedNotification) listener.received.get(0)).memtable().isPresent()); listener.received.clear(); Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null)); Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
