http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Indexes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java index eb49d39..81d400e 100644 --- a/src/java/org/apache/cassandra/schema/Indexes.java +++ b/src/java/org/apache/cassandra/schema/Indexes.java @@ -15,14 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.schema; import java.util.*; +import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.exceptions.ConfigurationException; + +import static java.lang.String.format; import static com.google.common.collect.Iterables.filter; @@ -35,7 +37,7 @@ import static com.google.common.collect.Iterables.filter; * support is added for multiple target columns per-index and for indexes with * TargetType.ROW */ -public class Indexes implements Iterable<IndexMetadata> +public final class Indexes implements Iterable<IndexMetadata> { private final ImmutableMap<String, IndexMetadata> indexesByName; private final ImmutableMap<UUID, IndexMetadata> indexesById; @@ -56,11 +58,26 @@ public class Indexes implements Iterable<IndexMetadata> return builder().build(); } + public static Indexes of(IndexMetadata... indexes) + { + return builder().add(indexes).build(); + } + + public static Indexes of(Iterable<IndexMetadata> indexes) + { + return builder().add(indexes).build(); + } + public Iterator<IndexMetadata> iterator() { return indexesByName.values().iterator(); } + public Stream<IndexMetadata> stream() + { + return indexesById.values().stream(); + } + public int size() { return indexesByName.size(); @@ -121,7 +138,7 @@ public class Indexes implements Iterable<IndexMetadata> public Indexes with(IndexMetadata index) { if (get(index.name).isPresent()) - throw new IllegalStateException(String.format("Index %s already exists", index.name)); + throw new IllegalStateException(format("Index %s already exists", index.name)); return builder().add(this).add(index).build(); } @@ -131,7 +148,7 @@ public class Indexes implements Iterable<IndexMetadata> */ public Indexes without(String name) { - IndexMetadata index = get(name).orElseThrow(() -> new IllegalStateException(String.format("Index %s doesn't exist", name))); + IndexMetadata index = get(name).orElseThrow(() -> new IllegalStateException(format("Index %s doesn't exist", name))); return builder().add(filter(this, v -> v != index)).build(); } @@ -149,6 +166,25 @@ public class Indexes implements Iterable<IndexMetadata> return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName)); } + public void validate(TableMetadata table) + { + /* + * Index name check is duplicated in Keyspaces, for the time being. + * The reason for this is that schema altering statements are not calling + * Keyspaces.validate() as of yet. TODO: remove this once they do (on CASSANDRA-9425 completion) + */ + Set<String> indexNames = new HashSet<>(); + for (IndexMetadata index : indexesByName.values()) + { + if (indexNames.contains(index.name)) + throw new ConfigurationException(format("Duplicate index name %s for table %s", index.name, table)); + + indexNames.add(index.name); + } + + indexesByName.values().forEach(i -> i.validate(table)); + } + @Override public int hashCode() { @@ -164,7 +200,7 @@ public class Indexes implements Iterable<IndexMetadata> public static String getAvailableIndexName(String ksName, String cfName, String indexNameRoot) { - KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); Set<String> existingNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(null); String baseName = IndexMetadata.getDefaultIndexName(cfName, indexNameRoot); String acceptedName = baseName; @@ -196,6 +232,13 @@ public class Indexes implements Iterable<IndexMetadata> return this; } + public Builder add(IndexMetadata... indexes) + { + for (IndexMetadata index : indexes) + add(index); + return this; + } + public Builder add(Iterable<IndexMetadata> indexes) { indexes.forEach(this::add);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java index 4fefd44..80a3869 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java @@ -27,11 +27,10 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.Iterables; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.SchemaConstants; -import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.exceptions.ConfigurationException; +import static java.lang.String.format; + /** * An immutable representation of keyspace metadata (name, params, tables, types, and functions). */ @@ -94,15 +93,15 @@ public final class KeyspaceMetadata return new KeyspaceMetadata(name, params, tables, views, types, functions); } - public Iterable<CFMetaData> tablesAndViews() + public Iterable<TableMetadata> tablesAndViews() { return Iterables.concat(tables, views.metadatas()); } @Nullable - public CFMetaData getTableOrViewNullable(String tableOrViewName) + public TableMetadata getTableOrViewNullable(String tableOrViewName) { - ViewDefinition view = views.getNullable(tableOrViewName); + ViewMetadata view = views.getNullable(tableOrViewName); return view == null ? tables.getNullable(tableOrViewName) : view.metadata; @@ -111,18 +110,18 @@ public final class KeyspaceMetadata public Set<String> existingIndexNames(String cfToExclude) { Set<String> indexNames = new HashSet<>(); - for (CFMetaData table : tables) - if (cfToExclude == null || !table.cfName.equals(cfToExclude)) - for (IndexMetadata index : table.getIndexes()) + for (TableMetadata table : tables) + if (cfToExclude == null || !table.name.equals(cfToExclude)) + for (IndexMetadata index : table.indexes) indexNames.add(index.name); return indexNames; } - public Optional<CFMetaData> findIndexedTable(String indexName) + public Optional<TableMetadata> findIndexedTable(String indexName) { - for (CFMetaData cfm : tablesAndViews()) - if (cfm.getIndexes().has(indexName)) - return Optional.of(cfm); + for (TableMetadata table : tablesAndViews()) + if (table.indexes.has(indexName)) + return Optional.of(table); return Optional.empty(); } @@ -167,12 +166,28 @@ public final class KeyspaceMetadata public void validate() { - if (!CFMetaData.isNameValid(name)) - throw new ConfigurationException(String.format("Keyspace name must not be empty, more than %s characters long, " - + "or contain non-alphanumeric-underscore characters (got \"%s\")", - SchemaConstants.NAME_LENGTH, - name)); + if (!SchemaConstants.isValidName(name)) + { + throw new ConfigurationException(format("Keyspace name must not be empty, more than %s characters long, " + + "or contain non-alphanumeric-underscore characters (got \"%s\")", + SchemaConstants.NAME_LENGTH, + name)); + } + params.validate(name); - tablesAndViews().forEach(CFMetaData::validate); + + tablesAndViews().forEach(TableMetadata::validate); + + Set<String> indexNames = new HashSet<>(); + for (TableMetadata table : tables) + { + for (IndexMetadata index : table.indexes) + { + if (indexNames.contains(index.name)) + throw new ConfigurationException(format("Duplicate index name %s in keyspace %s", index.name, name)); + + indexNames.add(index.name); + } + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Keyspaces.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java b/src/java/org/apache/cassandra/schema/Keyspaces.java index 8c0a63e..1692f88 100644 --- a/src/java/org/apache/cassandra/schema/Keyspaces.java +++ b/src/java/org/apache/cassandra/schema/Keyspaces.java @@ -18,9 +18,12 @@ package org.apache.cassandra.schema; import java.util.Iterator; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; +import javax.annotation.Nullable; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; @@ -28,10 +31,12 @@ import com.google.common.collect.Maps; public final class Keyspaces implements Iterable<KeyspaceMetadata> { private final ImmutableMap<String, KeyspaceMetadata> keyspaces; + private final ImmutableMap<TableId, TableMetadata> tables; private Keyspaces(Builder builder) { keyspaces = builder.keyspaces.build(); + tables = builder.tables.build(); } public static Builder builder() @@ -59,6 +64,23 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> return keyspaces.values().stream(); } + public Set<String> names() + { + return keyspaces.keySet(); + } + + @Nullable + public KeyspaceMetadata getNullable(String name) + { + return keyspaces.get(name); + } + + @Nullable + public TableMetadata getTableOrViewNullable(TableId id) + { + return tables.get(id); + } + public Keyspaces filter(Predicate<KeyspaceMetadata> predicate) { Builder builder = builder(); @@ -66,6 +88,25 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> return builder.build(); } + /** + * Creates a Keyspaces instance with the keyspace with the provided name removed + */ + public Keyspaces without(String name) + { + KeyspaceMetadata keyspace = getNullable(name); + if (keyspace == null) + throw new IllegalStateException(String.format("Keyspace %s doesn't exists", name)); + + return builder().add(filter(k -> k != keyspace)).build(); + } + + public Keyspaces withAddedOrUpdated(KeyspaceMetadata keyspace) + { + return builder().add(filter(k -> !k.name.equals(keyspace.name))) + .add(keyspace) + .build(); + } + MapDifference<String, KeyspaceMetadata> diff(Keyspaces other) { return Maps.difference(keyspaces, other.keyspaces); @@ -92,6 +133,7 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> public static final class Builder { private final ImmutableMap.Builder<String, KeyspaceMetadata> keyspaces = new ImmutableMap.Builder<>(); + private final ImmutableMap.Builder<TableId, TableMetadata> tables = new ImmutableMap.Builder<>(); private Builder() { @@ -105,6 +147,10 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> public Builder add(KeyspaceMetadata keyspace) { keyspaces.put(keyspace.name, keyspace); + + keyspace.tables.forEach(t -> tables.put(t.id, t)); + keyspace.views.forEach(v -> tables.put(v.metadata.id, v.metadata)); + return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java new file mode 100644 index 0000000..d34a096 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.*; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; + +public class MigrationManager +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); + + public static final MigrationManager instance = new MigrationManager(); + + private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + + private static final int MIGRATION_DELAY_IN_MS = 60000; + + private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); + + private MigrationManager() {} + + public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) + { + VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); + + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) + maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); + } + + /** + * If versions differ this node sends request with local migration list to the endpoint + * and expecting to receive a list of migrations to apply locally. + */ + private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) + { + if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) + { + logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); + return; + } + + if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + { + // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately + logger.debug("Submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + } + else + { + // Include a delay to make sure we have a chance to apply any changes being + // pushed out simultaneously. See CASSANDRA-5025 + Runnable runnable = () -> + { + // grab the latest version of the schema since it may have changed again since the initial scheduling + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (epState == null) + { + logger.debug("epState vanished for {}, not submitting migration task", endpoint); + return; + } + VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); + UUID currentVersion = UUID.fromString(value.value); + if (Schema.instance.getVersion().equals(currentVersion)) + { + logger.debug("not submitting migration task for {} because our versions match", endpoint); + return; + } + logger.debug("submitting migration task for {}", endpoint); + submitMigrationTask(endpoint); + }; + ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + } + } + + private static Future<?> submitMigrationTask(InetAddress endpoint) + { + /* + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + } + + static boolean shouldPullSchemaFrom(InetAddress endpoint) + { + /* + * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) + * Don't request schema from fat clients + */ + return MessagingService.instance().knowsVersion(endpoint) + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version + && !Gossiper.instance.isGossipOnlyMember(endpoint); + } + + public static boolean isReadyForBootstrap() + { + return MigrationTask.getInflightTasks().isEmpty(); + } + + public static void waitUntilReadyForBootstrap() + { + CountDownLatch completionLatch; + while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) + { + try + { + if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) + logger.error("Migration task failed to complete"); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + logger.error("Migration task was interrupted"); + } + } + } + + public static void announceNewKeyspace(KeyspaceMetadata ksm) throws ConfigurationException + { + announceNewKeyspace(ksm, false); + } + + public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException + { + announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally); + } + + public static void announceNewKeyspace(KeyspaceMetadata ksm, long timestamp, boolean announceLocally) throws ConfigurationException + { + ksm.validate(); + + if (Schema.instance.getKeyspaceMetadata(ksm.name) != null) + throw new AlreadyExistsException(ksm.name); + + logger.info("Create new Keyspace: {}", ksm); + announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); + } + + public static void announceNewTable(TableMetadata cfm) throws ConfigurationException + { + announceNewTable(cfm, false); + } + + public static void announceNewTable(TableMetadata cfm, boolean announceLocally) + { + announceNewTable(cfm, announceLocally, true); + } + + /** + * Announces the table even if the definition is already know locally. + * This should generally be avoided but is used internally when we want to force the most up to date version of + * a system table schema (Note that we don't know if the schema we force _is_ the most recent version or not, we + * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceUpdateColumnFamily, + * it would for instance delete new columns if this is not called with the most up-to-date version) + * + * Note that this is only safe for system tables where we know the id is fixed and will be the same whatever version + * of the definition is used. + */ + public static void forceAnnounceNewTable(TableMetadata cfm) + { + announceNewTable(cfm, false, false); + } + + private static void announceNewTable(TableMetadata cfm, boolean announceLocally, boolean throwOnDuplicate) + { + cfm.validate(); + + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(cfm.keyspace); + if (ksm == null) + throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace)); + // If we have a table or a view which has the same name, we can't add a new one + else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != null) + throw new AlreadyExistsException(cfm.keyspace, cfm.name); + + logger.info("Create new table: {}", cfm); + announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewView(ViewMetadata view, boolean announceLocally) throws ConfigurationException + { + view.metadata.validate(); + + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(view.keyspace); + if (ksm == null) + throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", view.name, view.keyspace)); + else if (ksm.getTableOrViewNullable(view.name) != null) + throw new AlreadyExistsException(view.keyspace, view.name); + + logger.info("Create new view: {}", view); + announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewType(UserType newType, boolean announceLocally) + { + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(newType.keyspace); + announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewFunction(UDFunction udf, boolean announceLocally) + { + logger.info("Create scalar function '{}'", udf.name()); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); + announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) + { + logger.info("Create aggregate function '{}'", udf.name()); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); + announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws ConfigurationException + { + announceKeyspaceUpdate(ksm, false); + } + + public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException + { + ksm.validate(); + + KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksm.name); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); + + logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm); + announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceTableUpdate(TableMetadata tm) throws ConfigurationException + { + announceTableUpdate(tm, false); + } + + public static void announceTableUpdate(TableMetadata updated, boolean announceLocally) throws ConfigurationException + { + updated.validate(); + + TableMetadata current = Schema.instance.getTableMetadata(updated.keyspace, updated.name); + if (current == null) + throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(current.keyspace); + + current.validateCompatibility(updated); + + logger.info("Update table '{}/{}' From {} To {}", current.keyspace, current.name, current, updated); + announce(SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceViewUpdate(ViewMetadata view, boolean announceLocally) throws ConfigurationException + { + view.metadata.validate(); + + ViewMetadata oldView = Schema.instance.getView(view.keyspace, view.name); + if (oldView == null) + throw new ConfigurationException(String.format("Cannot update non existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(view.keyspace); + + oldView.metadata.validateCompatibility(view.metadata); + + logger.info("Update view '{}/{}' From {} To {}", view.keyspace, view.name, oldView, view); + announce(SchemaKeyspace.makeUpdateViewMutation(ksm, oldView, view, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) + { + logger.info("Update type '{}.{}' to {}", updatedType.keyspace, updatedType.getNameAsString(), updatedType); + announceNewType(updatedType, announceLocally); + } + + public static void announceKeyspaceDrop(String ksName) throws ConfigurationException + { + announceKeyspaceDrop(ksName, false); + } + + public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException + { + KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); + + logger.info("Drop Keyspace '{}'", oldKsm.name); + announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceTableDrop(String ksName, String cfName) throws ConfigurationException + { + announceTableDrop(ksName, cfName, false); + } + + public static void announceTableDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException + { + TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName); + if (tm == null) + throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); + + logger.info("Drop table '{}/{}'", tm.keyspace, tm.name); + announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceViewDrop(String ksName, String viewName, boolean announceLocally) throws ConfigurationException + { + ViewMetadata view = Schema.instance.getView(ksName, viewName); + if (view == null) + throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", viewName, ksName)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); + + logger.info("Drop table '{}/{}'", view.keyspace, view.name); + announce(SchemaKeyspace.makeDropViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceTypeDrop(UserType droppedType, boolean announceLocally) + { + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(droppedType.keyspace); + announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) + { + logger.info("Drop scalar function overload '{}' args '{}'", udf.name(), udf.argTypes()); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); + announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) + { + logger.info("Drop aggregate function overload '{}' args '{}'", udf.name(), udf.argTypes()); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); + announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + /** + * actively announce a new version to active hosts via rpc + * @param schema The schema mutation to be applied + */ + private static void announce(Mutation.SimpleBuilder schema, boolean announceLocally) + { + List<Mutation> mutations = Collections.singletonList(schema.build()); + + if (announceLocally) + Schema.instance.merge(mutations); + else + FBUtilities.waitOnFuture(announce(mutations)); + } + + private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema) + { + MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, + schema, + MigrationsSerializer.instance); + MessagingService.instance().sendOneWay(msg, endpoint); + } + + // Returns a future on the local application of the schema + private static Future<?> announce(final Collection<Mutation> schema) + { + Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() + { + protected void runMayThrow() throws ConfigurationException + { + Schema.instance.mergeAndAnnounceVersion(schema); + } + }); + + for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) + { + // only push schema to nodes with known and equal versions + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + MessagingService.instance().knowsVersion(endpoint) && + MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) + pushSchemaMutation(endpoint, schema); + } + + return f; + } + + /** + * Announce my version passively over gossip. + * Used to notify nodes as they arrive in the cluster. + * + * @param version The schema version to announce + */ + static void passiveAnnounce(UUID version) + { + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + logger.debug("Gossiping my schema version {}", version); + } + + /** + * Clear all locally stored schema information and reset schema to initial state. + * Called by user (via JMX) who wants to get rid of schema disagreement. + */ + public static void resetLocalSchema() + { + logger.info("Starting local schema reset..."); + + logger.debug("Truncating schema tables..."); + + SchemaKeyspace.truncate(); + + logger.debug("Clearing local schema keyspace definitions..."); + + Schema.instance.clear(); + + Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + + // force migration if there are nodes around + for (InetAddress node : liveEndpoints) + { + if (shouldPullSchemaFrom(node)) + { + logger.debug("Requesting schema from {}", node); + FBUtilities.waitOnFuture(submitMigrationTask(node)); + break; + } + } + + logger.info("Local schema reset is complete."); + } + + public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>> + { + public static MigrationsSerializer instance = new MigrationsSerializer(); + + public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException + { + out.writeInt(schema.size()); + for (Mutation mutation : schema) + Mutation.serializer.serialize(mutation, out, version); + } + + public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException + { + int count = in.readInt(); + Collection<Mutation> schema = new ArrayList<>(count); + + for (int i = 0; i < count; i++) + schema.add(Mutation.serializer.deserialize(in, version)); + + return schema; + } + + public long serializedSize(Collection<Mutation> schema, int version) + { + int size = TypeSizes.sizeof(schema.size()); + for (Mutation mutation : schema) + size += Mutation.serializer.serializedSize(mutation, version); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java new file mode 100644 index 0000000..a785e17 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/MigrationTask.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.SystemKeyspace.BootstrapState; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.WrappedRunnable; + +final class MigrationTask extends WrappedRunnable +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); + + private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>(); + + private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); + + private final InetAddress endpoint; + + MigrationTask(InetAddress endpoint) + { + this.endpoint = endpoint; + } + + static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() + { + return inflightTasks; + } + + public void runMayThrow() throws Exception + { + // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), + // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with + // a higher major. + if (!MigrationManager.shouldPullSchemaFrom(endpoint)) + { + logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint); + return; + } + + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.debug("Can't send schema pull request: node {} is down.", endpoint); + return; + } + + MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); + + final CountDownLatch completionLatch = new CountDownLatch(1); + + IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() + { + @Override + public void response(MessageIn<Collection<Mutation>> message) + { + try + { + Schema.instance.mergeAndAnnounceVersion(message.payload); + } + catch (ConfigurationException e) + { + logger.error("Configuration exception merging remote schema", e); + } + finally + { + completionLatch.countDown(); + } + } + + public boolean isLatencyForSnitch() + { + return false; + } + }; + + // Only save the latches if we need bootstrap or are bootstrapping + if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) + inflightTasks.offer(completionLatch); + + MessagingService.instance().sendRR(message, endpoint, cb); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java new file mode 100644 index 0000000..5786804 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -0,0 +1,804 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Sets; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.KeyspaceNotDefinedException; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.UnknownTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.utils.Pair; +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +import static java.lang.String.format; + +import static com.google.common.collect.Iterables.size; + +public final class Schema +{ + public static final Schema instance = new Schema(); + + private volatile Keyspaces keyspaces = Keyspaces.none(); + + // UUID -> mutable metadata ref map. We have to update these in place every time a table changes. + private final Map<TableId, TableMetadataRef> metadataRefs = new NonBlockingHashMap<>(); + + // (keyspace name, index name) -> mutable metadata ref map. We have to update these in place every time an index changes. + private final Map<Pair<String, String>, TableMetadataRef> indexMetadataRefs = new NonBlockingHashMap<>(); + + // Keyspace objects, one per keyspace. Only one instance should ever exist for any given keyspace. + private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<>(); + + private volatile UUID version; + + private final List<SchemaChangeListener> changeListeners = new CopyOnWriteArrayList<>(); + + /** + * Initialize empty schema object and load the hardcoded system tables + */ + private Schema() + { + if (DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized()) + { + load(SchemaKeyspace.metadata()); + load(SystemKeyspace.metadata()); + } + } + + /** + * Validates that the provided keyspace is not one of the system keyspace. + * + * @param keyspace the keyspace name to validate. + * + * @throws InvalidRequestException if {@code keyspace} is the name of a + * system keyspace. + */ + public static void validateKeyspaceNotSystem(String keyspace) + { + if (SchemaConstants.isSystemKeyspace(keyspace)) + throw new InvalidRequestException(format("%s keyspace is not user-modifiable", keyspace)); + } + + /** + * load keyspace (keyspace) definitions, but do not initialize the keyspace instances. + * Schema version may be updated as the result. + */ + public void loadFromDisk() + { + loadFromDisk(true); + } + + /** + * Load schema definitions from disk. + * + * @param updateVersion true if schema version needs to be updated + */ + public void loadFromDisk(boolean updateVersion) + { + load(SchemaKeyspace.fetchNonSystemKeyspaces()); + if (updateVersion) + updateVersion(); + } + + /** + * Load up non-system keyspaces + * + * @param keyspaceDefs The non-system keyspace definitions + */ + private void load(Iterable<KeyspaceMetadata> keyspaceDefs) + { + keyspaceDefs.forEach(this::load); + } + + /** + * Update (or insert) new keyspace definition + * + * @param ksm The metadata about keyspace + */ + synchronized public void load(KeyspaceMetadata ksm) + { + KeyspaceMetadata previous = keyspaces.getNullable(ksm.name); + + if (previous == null) + loadNew(ksm); + else + reload(previous, ksm); + + keyspaces = keyspaces.withAddedOrUpdated(ksm); + } + + private void loadNew(KeyspaceMetadata ksm) + { + ksm.tablesAndViews() + .forEach(metadata -> metadataRefs.put(metadata.id, new TableMetadataRef(metadata))); + + ksm.tables + .indexTables() + .forEach((name, metadata) -> indexMetadataRefs.put(Pair.create(ksm.name, name), new TableMetadataRef(metadata))); + } + + private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated) + { + Keyspace keyspace = getKeyspaceInstance(updated.name); + if (keyspace != null) + keyspace.setMetadata(updated); + + MapDifference<TableId, TableMetadata> tablesDiff = previous.tables.diff(updated.tables); + MapDifference<TableId, ViewMetadata> viewsDiff = previous.views.diff(updated.views); + MapDifference<String, TableMetadata> indexesDiff = previous.tables.indexesDiff(updated.tables); + + // clean up after removed entries + + tablesDiff.entriesOnlyOnLeft() + .values() + .forEach(table -> metadataRefs.remove(table.id)); + + viewsDiff.entriesOnlyOnLeft() + .values() + .forEach(view -> metadataRefs.remove(view.metadata.id)); + + indexesDiff.entriesOnlyOnLeft() + .values() + .forEach(indexTable -> indexMetadataRefs.remove(Pair.create(indexTable.keyspace, indexTable.indexName().get()))); + + // load up new entries + + tablesDiff.entriesOnlyOnRight() + .values() + .forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table))); + + viewsDiff.entriesOnlyOnRight() + .values() + .forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata))); + + indexesDiff.entriesOnlyOnRight() + .values() + .forEach(indexTable -> indexMetadataRefs.put(Pair.create(indexTable.keyspace, indexTable.indexName().get()), new TableMetadataRef(indexTable))); + + // refresh refs to updated ones + + tablesDiff.entriesDiffering() + .values() + .forEach(diff -> metadataRefs.get(diff.rightValue().id).set(diff.rightValue())); + + viewsDiff.entriesDiffering() + .values() + .forEach(diff -> metadataRefs.get(diff.rightValue().metadata.id).set(diff.rightValue().metadata)); + + indexesDiff.entriesDiffering() + .values() + .stream() + .map(MapDifference.ValueDifference::rightValue) + .forEach(indexTable -> indexMetadataRefs.get(Pair.create(indexTable.keyspace, indexTable.indexName().get())).set(indexTable)); + } + + public void registerListener(SchemaChangeListener listener) + { + changeListeners.add(listener); + } + + @SuppressWarnings("unused") + public void unregisterListener(SchemaChangeListener listener) + { + changeListeners.remove(listener); + } + + /** + * Get keyspace instance by name + * + * @param keyspaceName The name of the keyspace + * + * @return Keyspace object or null if keyspace was not found + */ + public Keyspace getKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.get(keyspaceName); + } + + public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id) + { + TableMetadata metadata = getTableMetadata(id); + if (metadata == null) + return null; + + Keyspace instance = getKeyspaceInstance(metadata.keyspace); + if (instance == null) + return null; + + return instance.hasColumnFamilyStore(metadata.id) + ? instance.getColumnFamilyStore(metadata.id) + : null; + } + + /** + * Store given Keyspace instance to the schema + * + * @param keyspace The Keyspace instance to store + * + * @throws IllegalArgumentException if Keyspace is already stored + */ + public void storeKeyspaceInstance(Keyspace keyspace) + { + if (keyspaceInstances.containsKey(keyspace.getName())) + throw new IllegalArgumentException(String.format("Keyspace %s was already initialized.", keyspace.getName())); + + keyspaceInstances.put(keyspace.getName(), keyspace); + } + + /** + * Remove keyspace from schema + * + * @param keyspaceName The name of the keyspace to remove + * + * @return removed keyspace instance or null if it wasn't found + */ + public Keyspace removeKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.remove(keyspaceName); + } + + /** + * Remove keyspace definition from system + * + * @param ksm The keyspace definition to remove + */ + synchronized void unload(KeyspaceMetadata ksm) + { + keyspaces = keyspaces.without(ksm.name); + + ksm.tablesAndViews() + .forEach(t -> metadataRefs.remove(t.id)); + + ksm.tables + .indexTables() + .keySet() + .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, name))); + } + + public int getNumberOfTables() + { + return keyspaces.stream().mapToInt(k -> size(k.tablesAndViews())).sum(); + } + + public ViewMetadata getView(String keyspaceName, String viewName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + return (ksm == null) ? null : ksm.views.getNullable(viewName); + } + + /** + * Get metadata about keyspace by its name + * + * @param keyspaceName The name of the keyspace + * + * @return The keyspace metadata or null if it wasn't found + */ + public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName) + { + assert keyspaceName != null; + return keyspaces.getNullable(keyspaceName); + } + + private Set<String> getNonSystemKeyspacesSet() + { + return Sets.difference(keyspaces.names(), SchemaConstants.SYSTEM_KEYSPACE_NAMES); + } + + /** + * @return collection of the non-system keyspaces (note that this count as system only the + * non replicated keyspaces, so keyspace like system_traces which are replicated are actually + * returned. See getUserKeyspace() below if you don't want those) + */ + public ImmutableList<String> getNonSystemKeyspaces() + { + return ImmutableList.copyOf(getNonSystemKeyspacesSet()); + } + + /** + * @return a collection of keyspaces that do not use LocalStrategy for replication + */ + public List<String> getNonLocalStrategyKeyspaces() + { + return keyspaces.stream() + .filter(keyspace -> keyspace.params.replication.klass != LocalStrategy.class) + .map(keyspace -> keyspace.name) + .collect(Collectors.toList()); + } + + /** + * @return collection of the user defined keyspaces + */ + public List<String> getUserKeyspaces() + { + return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES)); + } + + /** + * Get metadata about keyspace inner ColumnFamilies + * + * @param keyspaceName The name of the keyspace + * + * @return metadata about ColumnFamilies the belong to the given keyspace + */ + public Iterable<TableMetadata> getTablesAndViews(String keyspaceName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + assert ksm != null; + return ksm.tablesAndViews(); + } + + /** + * @return collection of the all keyspace names registered in the system (system and non-system) + */ + public Set<String> getKeyspaces() + { + return keyspaces.names(); + } + + /* TableMetadata/Ref query/control methods */ + + /** + * Given a keyspace name and table/view name, get the table metadata + * reference. If the keyspace name or table/view name is not present + * this method returns null. + * + * @return TableMetadataRef object or null if it wasn't found + */ + public TableMetadataRef getTableMetadataRef(String keyspace, String table) + { + TableMetadata tm = getTableMetadata(keyspace, table); + return tm == null + ? null + : metadataRefs.get(tm.id); + } + + public TableMetadataRef getIndexTableMetadataRef(String keyspace, String index) + { + return indexMetadataRefs.get(Pair.create(keyspace, index)); + } + + /** + * Get Table metadata by its identifier + * + * @param id table or view identifier + * + * @return metadata about Table or View + */ + public TableMetadataRef getTableMetadataRef(TableId id) + { + return metadataRefs.get(id); + } + + public TableMetadataRef getTableMetadataRef(Descriptor descriptor) + { + return getTableMetadataRef(descriptor.ksname, descriptor.cfname); + } + + /** + * Given a keyspace name and table name, get the table + * meta data. If the keyspace name or table name is not valid + * this function returns null. + * + * @param keyspace The keyspace name + * @param table The table name + * + * @return TableMetadata object or null if it wasn't found + */ + public TableMetadata getTableMetadata(String keyspace, String table) + { + assert keyspace != null; + assert table != null; + + KeyspaceMetadata ksm = keyspaces.getNullable(keyspace); + return ksm == null + ? null + : ksm.getTableOrViewNullable(table); + } + + public TableMetadata getTableMetadata(TableId id) + { + return keyspaces.getTableOrViewNullable(id); + } + + public TableMetadata validateTable(String keyspaceName, String tableName) + { + if (tableName.isEmpty()) + throw new InvalidRequestException("non-empty table is required"); + + KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName); + if (keyspace == null) + throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName)); + + TableMetadata metadata = keyspace.getTableOrViewNullable(tableName); + if (metadata == null) + throw new InvalidRequestException(format("table %s does not exist", tableName)); + + return metadata; + } + + public TableMetadata getTableMetadata(Descriptor descriptor) + { + return getTableMetadata(descriptor.ksname, descriptor.cfname); + } + + /** + * @throws UnknownTableException if the table couldn't be found in the metadata + */ + public TableMetadata getExistingTableMetadata(TableId id) throws UnknownTableException + { + TableMetadata metadata = getTableMetadata(id); + if (metadata != null) + return metadata; + + String message = + String.format("Couldn't find table with id %s. If a table was just created, this is likely due to the schema" + + "not being fully propagated. Please wait for schema agreement on table creation.", + id); + throw new UnknownTableException(message, id); + } + + /* Function helpers */ + + /** + * Get all function overloads with the specified name + * + * @param name fully qualified function name + * @return an empty list if the keyspace or the function name are not found; + * a non-empty collection of {@link Function} otherwise + */ + public Collection<Function> getFunctions(FunctionName name) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Collections.emptyList() + : ksm.functions.get(name); + } + + /** + * Find the function with the specified name + * + * @param name fully qualified function name + * @param argTypes function argument types + * @return an empty {@link Optional} if the keyspace or the function name are not found; + * a non-empty optional of {@link Function} otherwise + */ + public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Optional.empty() + : ksm.functions.find(name, argTypes); + } + + /* Version control */ + + /** + * @return current schema version + */ + public UUID getVersion() + { + return version; + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public void updateVersion() + { + version = SchemaKeyspace.calculateSchemaDigest(); + SystemKeyspace.updateSchemaVersion(version); + } + + /* + * Like updateVersion, but also announces via gossip + */ + public void updateVersionAndAnnounce() + { + updateVersion(); + MigrationManager.passiveAnnounce(version); + } + + /** + * Clear all KS/CF metadata and reset version. + */ + public synchronized void clear() + { + getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k))); + updateVersionAndAnnounce(); + } + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + */ + synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations) + { + merge(mutations); + updateVersionAndAnnounce(); + } + + synchronized void merge(Collection<Mutation> mutations) + { + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(mutations); + + // fetch the current state of schema for the affected keyspaces only + Keyspaces before = keyspaces.filter(k -> affectedKeyspaces.contains(k.name)); + + // apply the schema mutations + SchemaKeyspace.applyChanges(mutations); + + // apply the schema mutations and fetch the new versions of the altered keyspaces + Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces); + + MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after); + + // dropped keyspaces + keyspacesDiff.entriesOnlyOnLeft().values().forEach(this::dropKeyspace); + + // new keyspaces + keyspacesDiff.entriesOnlyOnRight().values().forEach(this::createKeyspace); + + // updated keyspaces + keyspacesDiff.entriesDiffering().entrySet().forEach(diff -> alterKeyspace(diff.getValue().leftValue(), diff.getValue().rightValue())); + } + + private void alterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after) + { + // calculate the deltas + MapDifference<TableId, TableMetadata> tablesDiff = before.tables.diff(after.tables); + MapDifference<TableId, ViewMetadata> viewsDiff = before.views.diff(after.views); + MapDifference<ByteBuffer, UserType> typesDiff = before.types.diff(after.types); + MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = before.functions.udfsDiff(after.functions); + MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = before.functions.udasDiff(after.functions); + + // drop tables and views + viewsDiff.entriesOnlyOnLeft().values().forEach(this::dropView); + tablesDiff.entriesOnlyOnLeft().values().forEach(this::dropTable); + + load(after); + + // add tables and views + tablesDiff.entriesOnlyOnRight().values().forEach(this::createTable); + viewsDiff.entriesOnlyOnRight().values().forEach(this::createView); + + // update tables and views + tablesDiff.entriesDiffering().values().forEach(diff -> alterTable(diff.rightValue())); + viewsDiff.entriesDiffering().values().forEach(diff -> alterView(diff.rightValue())); + + // deal with all removed, added, and altered views + Keyspace.open(before.name).viewManager.reload(); + + // notify on everything dropped + udasDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropAggregate); + udfsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropFunction); + viewsDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropView); + tablesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropTable); + typesDiff.entriesOnlyOnLeft().values().forEach(this::notifyDropType); + + // notify on everything created + typesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateType); + tablesDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateTable); + viewsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateView); + udfsDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateFunction); + udasDiff.entriesOnlyOnRight().values().forEach(this::notifyCreateAggregate); + + // notify on everything altered + if (!before.params.equals(after.params)) + notifyAlterKeyspace(after); + typesDiff.entriesDiffering().values().forEach(diff -> notifyAlterType(diff.rightValue())); + tablesDiff.entriesDiffering().values().forEach(diff -> notifyAlterTable(diff.leftValue(), diff.rightValue())); + viewsDiff.entriesDiffering().values().forEach(diff -> notifyAlterView(diff.leftValue(), diff.rightValue())); + udfsDiff.entriesDiffering().values().forEach(diff -> notifyAlterFunction(diff.rightValue())); + udasDiff.entriesDiffering().values().forEach(diff -> notifyAlterAggregate(diff.rightValue())); + } + + private void createKeyspace(KeyspaceMetadata keyspace) + { + load(keyspace); + Keyspace.open(keyspace.name); + + notifyCreateKeyspace(keyspace); + keyspace.types.forEach(this::notifyCreateType); + keyspace.tables.forEach(this::notifyCreateTable); + keyspace.views.forEach(this::notifyCreateView); + keyspace.functions.udfs().forEach(this::notifyCreateFunction); + keyspace.functions.udas().forEach(this::notifyCreateAggregate); + } + + private void dropKeyspace(KeyspaceMetadata keyspace) + { + keyspace.views.forEach(this::dropView); + keyspace.tables.forEach(this::dropTable); + + // remove the keyspace from the static instances. + Keyspace.clear(keyspace.name); + unload(keyspace); + Keyspace.writeOrder.awaitNewBarrier(); + + keyspace.functions.udas().forEach(this::notifyDropAggregate); + keyspace.functions.udfs().forEach(this::notifyDropFunction); + keyspace.views.forEach(this::notifyDropView); + keyspace.tables.forEach(this::notifyDropTable); + keyspace.types.forEach(this::notifyDropType); + notifyDropKeyspace(keyspace); + } + + private void dropView(ViewMetadata metadata) + { + dropTable(metadata.metadata); + } + + private void dropTable(TableMetadata metadata) + { + ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name); + assert cfs != null; + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), true); + if (DatabaseDescriptor.isAutoSnapshot()) + cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX)); + CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); + Keyspace.open(metadata.keyspace).dropCf(metadata.id); + } + + private void createTable(TableMetadata table) + { + Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true); + } + + private void createView(ViewMetadata view) + { + Keyspace.open(view.keyspace).initCf(metadataRefs.get(view.metadata.id), true); + } + + private void alterTable(TableMetadata updated) + { + Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload(); + } + + private void alterView(ViewMetadata updated) + { + Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload(); + } + + private void notifyCreateKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name)); + } + + private void notifyCreateTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, metadata.name)); + } + + private void notifyCreateView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onCreateView(view.keyspace, view.name)); + } + + private void notifyCreateType(UserType ut) + { + changeListeners.forEach(l -> l.onCreateType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyCreateFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyCreateAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyAlterKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onAlterKeyspace(ksm.name)); + } + + private void notifyAlterTable(TableMetadata current, TableMetadata updated) + { + boolean changeAffectedPreparedStatements = current.changeAffectsPreparedStatements(updated); + changeListeners.forEach(l -> l.onAlterTable(updated.keyspace, updated.name, changeAffectedPreparedStatements)); + } + + private void notifyAlterView(ViewMetadata current, ViewMetadata updated) + { + boolean changeAffectedPreparedStatements = current.metadata.changeAffectsPreparedStatements(updated.metadata); + changeListeners.forEach(l ->l.onAlterView(updated.keyspace, updated.name, changeAffectedPreparedStatements)); + } + + private void notifyAlterType(UserType ut) + { + changeListeners.forEach(l -> l.onAlterType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyAlterFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onAlterFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyAlterAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onAlterAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyDropKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onDropKeyspace(ksm.name)); + } + + private void notifyDropTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, metadata.name)); + } + + private void notifyDropView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onDropView(view.keyspace, view.name)); + } + + private void notifyDropType(UserType ut) + { + changeListeners.forEach(l -> l.onDropType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyDropFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyDropAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaChangeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaChangeListener.java b/src/java/org/apache/cassandra/schema/SchemaChangeListener.java new file mode 100644 index 0000000..4390309 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaChangeListener.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.List; + +import org.apache.cassandra.db.marshal.AbstractType; + +public abstract class SchemaChangeListener +{ + public void onCreateKeyspace(String keyspace) + { + } + + public void onCreateTable(String keyspace, String table) + { + } + + public void onCreateView(String keyspace, String view) + { + onCreateTable(keyspace, view); + } + + public void onCreateType(String keyspace, String type) + { + } + + public void onCreateFunction(String keyspace, String function, List<AbstractType<?>> argumentTypes) + { + } + + public void onCreateAggregate(String keyspace, String aggregate, List<AbstractType<?>> argumentTypes) + { + } + + public void onAlterKeyspace(String keyspace) + { + } + + // the boolean flag indicates whether the change that triggered this event may have a substantive + // impact on statements using the column family. + public void onAlterTable(String keyspace, String table, boolean affectsStatements) + { + } + + public void onAlterView(String keyspace, String view, boolean affectsStataments) + { + onAlterTable(keyspace, view, affectsStataments); + } + + public void onAlterType(String keyspace, String type) + { + } + + public void onAlterFunction(String keyspace, String function, List<AbstractType<?>> argumentTypes) + { + } + + public void onAlterAggregate(String keyspace, String aggregate, List<AbstractType<?>> argumentTypes) + { + } + + public void onDropKeyspace(String keyspace) + { + } + + public void onDropTable(String keyspace, String table) + { + } + + public void onDropView(String keyspace, String view) + { + onDropTable(keyspace, view); + } + + public void onDropType(String keyspace, String type) + { + } + + public void onDropFunction(String keyspace, String function, List<AbstractType<?>> argumentTypes) + { + } + + public void onDropAggregate(String keyspace, String aggregate, List<AbstractType<?>> argumentTypes) + { + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/af3fe39d/src/java/org/apache/cassandra/schema/SchemaConstants.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java new file mode 100644 index 0000000..5340f69 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Set; +import java.util.UUID; +import java.util.regex.Pattern; + +import com.google.common.collect.ImmutableSet; + +public final class SchemaConstants +{ + public static final Pattern PATTERN_WORD_CHARS = Pattern.compile("\\w+"); + + public static final String SYSTEM_KEYSPACE_NAME = "system"; + public static final String SCHEMA_KEYSPACE_NAME = "system_schema"; + + public static final String TRACE_KEYSPACE_NAME = "system_traces"; + public static final String AUTH_KEYSPACE_NAME = "system_auth"; + public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed"; + + /* system keyspace names (the ones with LocalStrategy replication strategy) */ + public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME); + + /* replicate system keyspace names (the ones with a "true" replication strategy) */ + public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TRACE_KEYSPACE_NAME, + AUTH_KEYSPACE_NAME, + DISTRIBUTED_KEYSPACE_NAME); + /** + * longest permissible KS or CF name. Our main concern is that filename not be more than 255 characters; + * the filename will contain both the KS and CF names. Since non-schema-name components only take up + * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than + * 255 characters, so a lower limit here helps avoid problems. See CASSANDRA-4110. + */ + public static final int NAME_LENGTH = 48; + + // 59adb24e-f3cd-3e02-97f0-5b395827453f + public static final UUID emptyVersion; + + public static boolean isValidName(String name) + { + return name != null && !name.isEmpty() && name.length() <= NAME_LENGTH && PATTERN_WORD_CHARS.matcher(name).matches(); + } + + static + { + try + { + emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest()); + } + catch (NoSuchAlgorithmException e) + { + throw new AssertionError(); + } + } + + /** + * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded) + */ + public static boolean isSystemKeyspace(String keyspaceName) + { + return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase()); + } +}
