http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/schema/LegacySchemaTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java new file mode 100644 index 0000000..047698c --- /dev/null +++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java @@ -0,0 +1,1480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.*; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.utils.FBUtilities.fromJsonMap; +import static org.apache.cassandra.utils.FBUtilities.json; + +/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */ +public class LegacySchemaTables +{ + private static final Logger logger = LoggerFactory.getLogger(LegacySchemaTables.class); + + public static final String KEYSPACES = "schema_keyspaces"; + public static final String COLUMNFAMILIES = "schema_columnfamilies"; + public static final String COLUMNS = "schema_columns"; + public static final String TRIGGERS = "schema_triggers"; + public static final String USERTYPES = "schema_usertypes"; + public static final String FUNCTIONS = "schema_functions"; + public static final String AGGREGATES = "schema_aggregates"; + + public static final List<String> ALL = Arrays.asList(KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, FUNCTIONS, AGGREGATES); + + private static final CFMetaData Keyspaces = + compile(KEYSPACES, + "keyspace definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "durable_writes boolean," + + "strategy_class text," + + "strategy_options text," + + "PRIMARY KEY ((keyspace_name))) " + + "WITH COMPACT STORAGE"); + + private static final CFMetaData Columnfamilies = + compile(COLUMNFAMILIES, + "table definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "bloom_filter_fp_chance double," + + "caching text," + + "cf_id uuid," // post-2.1 UUID cfid + + "comment text," + + "compaction_strategy_class text," + + "compaction_strategy_options text," + + "comparator text," + + "compression_parameters text," + + "default_time_to_live int," + + "default_validator text," + + "dropped_columns map<text, bigint>," + + "gc_grace_seconds int," + + "is_dense boolean," + + "key_validator text," + + "local_read_repair_chance double," + + "max_compaction_threshold int," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_compaction_threshold int," + + "min_index_interval int," + + "read_repair_chance double," + + "speculative_retry text," + + "subcomparator text," + + "type text," + + "PRIMARY KEY ((keyspace_name), columnfamily_name))"); + + private static final CFMetaData Columns = + compile(COLUMNS, + "column definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "column_name text," + + "component_index int," + + "index_name text," + + "index_options text," + + "index_type text," + + "type text," + + "validator text," + + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))"); + + private static final CFMetaData Triggers = + compile(TRIGGERS, + "trigger definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "trigger_name text," + + "trigger_options map<text, text>," + + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))"); + + private static final CFMetaData Usertypes = + compile(USERTYPES, + "user defined type definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "type_name text," + + "field_names list<text>," + + "field_types list<text>," + + "PRIMARY KEY ((keyspace_name), type_name))"); + + private static final CFMetaData Functions = + compile(FUNCTIONS, + "user defined function definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "function_name text," + + "signature blob," + + "argument_names list<text>," + + "argument_types list<text>," + + "body text," + + "is_deterministic boolean," + + "language text," + + "return_type text," + + "PRIMARY KEY ((keyspace_name), function_name, signature))"); + + private static final CFMetaData Aggregates = + compile(AGGREGATES, + "user defined aggregate definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "aggregate_name text," + + "signature blob," + + "argument_types list<text>," + + "final_func text," + + "initcond blob," + + "return_type text," + + "state_func text," + + "state_type text," + + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); + + public static final List<CFMetaData> All = Arrays.asList(Keyspaces, Columnfamilies, Columns, Triggers, Usertypes, Functions, Aggregates); + + private static CFMetaData compile(String name, String description, String schema) + { + return CFMetaData.compile(String.format(schema, name), SystemKeyspace.NAME) + .comment(description) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)); + } + + /** add entries to system.schema_* for the hardcoded system definitions */ + public static void saveSystemKeyspaceSchema() + { + KSMetaData keyspace = Schema.instance.getKSMetaData(SystemKeyspace.NAME); + // delete old, possibly obsolete entries in schema tables + for (String table : ALL) + executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", table), keyspace.name); + // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) + makeCreateKeyspaceMutation(keyspace, FBUtilities.timestampMicros() + 1).apply(); + } + + public static Collection<KSMetaData> readSchemaFromSystemTables() + { + List<Row> serializedSchema = getSchemaPartitionsForTable(KEYSPACES); + + List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size()); + + for (Row partition : serializedSchema) + { + if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition)) + continue; + + keyspaces.add(createKeyspaceFromSchemaPartitions(partition, + readSchemaPartitionForKeyspace(COLUMNFAMILIES, partition.key), + readSchemaPartitionForKeyspace(USERTYPES, partition.key))); + + // Will be moved away in #6717 + for (UDFunction function : createFunctionsFromFunctionsPartition(readSchemaPartitionForKeyspace(FUNCTIONS, partition.key)).values()) + org.apache.cassandra.cql3.functions.Functions.addFunction(function); + + // Will be moved away in #6717 + for (UDAggregate aggregate : createAggregatesFromAggregatesPartition(readSchemaPartitionForKeyspace(AGGREGATES, partition.key)).values()) + org.apache.cassandra.cql3.functions.Functions.addFunction(aggregate); + } + + return keyspaces; + } + + public static void truncateSchemaTables() + { + for (String table : ALL) + getSchemaCFS(table).truncateBlocking(); + } + + private static void flushSchemaTables() + { + for (String table : ALL) + SystemKeyspace.forceBlockingFlush(table); + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public static UUID calculateSchemaDigest() + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) + { + throw new RuntimeException(e); + } + + for (String table : ALL) + { + for (Row partition : getSchemaPartitionsForTable(table)) + { + if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition)) + continue; + + // we want to digest only live columns + ColumnFamilyStore.removeDeletedColumnsOnly(partition.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater); + partition.cf.purgeTombstones(Integer.MAX_VALUE); + partition.cf.updateDigest(digest); + } + } + + return UUID.nameUUIDFromBytes(digest.digest()); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema + * @return CFS responsible to hold low-level serialized schema + */ + private static ColumnFamilyStore getSchemaCFS(String schemaTableName) + { + return Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(schemaTableName); + } + + /** + * @param schemaTableName The name of the table responsible for part of the schema. + * @return low-level schema representation + */ + private static List<Row> getSchemaPartitionsForTable(String schemaTableName) + { + Token minToken = StorageService.getPartitioner().getMinimumToken(); + return getSchemaCFS(schemaTableName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()), + null, + new IdentityQueryFilter(), + Integer.MAX_VALUE, + System.currentTimeMillis()); + } + + public static Collection<Mutation> convertSchemaToMutations() + { + Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); + + for (String table : ALL) + convertSchemaToMutations(mutationMap, table); + + return mutationMap.values(); + } + + private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName) + { + for (Row partition : getSchemaPartitionsForTable(schemaTableName)) + { + if (isSystemKeyspaceSchemaPartition(partition)) + continue; + + Mutation mutation = mutationMap.get(partition.key); + if (mutation == null) + { + mutation = new Mutation(SystemKeyspace.NAME, partition.key.getKey()); + mutationMap.put(partition.key, mutation); + } + + mutation.add(partition.cf); + } + } + + private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames) + { + Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); + + for (String keyspaceName : keyspaceNames) + { + Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName); + if (schemaEntity.cf != null) + schema.put(schemaEntity.key, schemaEntity.cf); + } + + return schema; + } + + private static ByteBuffer getSchemaKSKey(String ksName) + { + return AsciiType.instance.fromString(ksName); + } + + private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName) + { + DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); + return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey); + } + + private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey) + { + QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis()); + return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter)); + } + + private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName) + { + DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); + ColumnFamilyStore store = getSchemaCFS(schemaTableName); + Composite prefix = store.getComparator().make(tableName); + ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis()); + return new Row(key, cells); + } + + private static boolean isEmptySchemaPartition(Row partition) + { + return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns()); + } + + private static boolean isSystemKeyspaceSchemaPartition(Row partition) + { + return getSchemaKSKey(SystemKeyspace.NAME).equals(partition.key.getKey()); + } + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + * @throws IOException If data was corrupted during transportation or failed to apply fs operations + */ + public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException + { + mergeSchema(mutations, true); + Schema.instance.updateVersionAndAnnounce(); + } + + public static synchronized void mergeSchema(Collection<Mutation> mutations, boolean doFlush) throws IOException + { + // compare before/after schemas of the affected keyspaces only + Set<String> keyspaces = new HashSet<>(mutations.size()); + for (Mutation mutation : mutations) + keyspaces.add(ByteBufferUtil.string(mutation.key())); + + // current state of the schema + Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); + Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); + Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + + for (Mutation mutation : mutations) + mutation.apply(); + + if (doFlush) + flushSchemaTables(); + + // with new data applied + Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); + Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); + Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + + Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); + mergeTables(oldColumnFamilies, newColumnFamilies); + mergeTypes(oldTypes, newTypes); + mergeFunctions(oldFunctions, newFunctions); + mergeAggregates(oldAggregates, newAggregates); + + // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted + for (String keyspaceToDrop : keyspacesToDrop) + Schema.instance.dropKeyspace(keyspaceToDrop); + } + + private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + { + List<Row> created = new ArrayList<>(); + List<String> altered = new ArrayList<>(); + Set<String> dropped = new HashSet<>(); + + /* + * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us + * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily + * there that only has the top-level deletion, if: + * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place + * b) a pulled dropped keyspace that got dropped before it could find a way to this node + * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns: + * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way + * to this node + */ + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + if (entry.getValue().hasColumns()) + created.add(new Row(entry.getKey(), entry.getValue())); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + { + String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); + + ColumnFamily pre = entry.getValue().leftValue(); + ColumnFamily post = entry.getValue().rightValue(); + + if (pre.hasColumns() && post.hasColumns()) + altered.add(keyspaceName); + else if (pre.hasColumns()) + dropped.add(keyspaceName); + else if (post.hasColumns()) // a (re)created keyspace + created.add(new Row(entry.getKey(), post)); + } + + for (Row row : created) + Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row)); + for (String name : altered) + Schema.instance.updateKeyspace(name); + return dropped; + } + + // see the comments for mergeKeyspaces() + private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + { + List<CFMetaData> created = new ArrayList<>(); + List<CFMetaData> altered = new ArrayList<>(); + List<CFMetaData> dropped = new ArrayList<>(); + + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + if (entry.getValue().hasColumns()) + created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values()); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + { + String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); + + ColumnFamily pre = entry.getValue().leftValue(); + ColumnFamily post = entry.getValue().rightValue(); + + if (pre.hasColumns() && post.hasColumns()) + { + MapDifference<String, CFMetaData> delta = + Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(), + createTablesFromTablesPartition(new Row(entry.getKey(), post))); + + dropped.addAll(delta.entriesOnlyOnLeft().values()); + created.addAll(delta.entriesOnlyOnRight().values()); + Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>() + { + public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair) + { + return pair.rightValue(); + } + })); + } + else if (pre.hasColumns()) + { + dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values()); + } + else if (post.hasColumns()) + { + created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values()); + } + } + + for (CFMetaData cfm : created) + Schema.instance.addTable(cfm); + for (CFMetaData cfm : altered) + Schema.instance.updateTable(cfm.ksName, cfm.cfName); + for (CFMetaData cfm : dropped) + Schema.instance.dropTable(cfm.ksName, cfm.cfName); + } + + // see the comments for mergeKeyspaces() + private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + { + List<UserType> created = new ArrayList<>(); + List<UserType> altered = new ArrayList<>(); + List<UserType> dropped = new ArrayList<>(); + + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + + // New keyspace with types + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + if (entry.getValue().hasColumns()) + created.addAll(createTypesFromPartition(new Row(entry.getKey(), entry.getValue())).values()); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + { + String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); + + ColumnFamily pre = entry.getValue().leftValue(); + ColumnFamily post = entry.getValue().rightValue(); + + if (pre.hasColumns() && post.hasColumns()) + { + MapDifference<ByteBuffer, UserType> delta = + Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(), + createTypesFromPartition(new Row(entry.getKey(), post))); + + dropped.addAll(delta.entriesOnlyOnLeft().values()); + created.addAll(delta.entriesOnlyOnRight().values()); + Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>() + { + public UserType apply(MapDifference.ValueDifference<UserType> pair) + { + return pair.rightValue(); + } + })); + } + else if (pre.hasColumns()) + { + dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values()); + } + else if (post.hasColumns()) + { + created.addAll(createTypesFromPartition(new Row(entry.getKey(), post)).values()); + } + } + + for (UserType type : created) + Schema.instance.addType(type); + for (UserType type : altered) + Schema.instance.updateType(type); + for (UserType type : dropped) + Schema.instance.dropType(type); + } + + // see the comments for mergeKeyspaces() + private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + { + List<UDFunction> created = new ArrayList<>(); + List<UDFunction> altered = new ArrayList<>(); + List<UDFunction> dropped = new ArrayList<>(); + + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + + // New keyspace with functions + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + if (entry.getValue().hasColumns()) + created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), entry.getValue())).values()); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + { + ColumnFamily pre = entry.getValue().leftValue(); + ColumnFamily post = entry.getValue().rightValue(); + + if (pre.hasColumns() && post.hasColumns()) + { + MapDifference<ByteBuffer, UDFunction> delta = + Maps.difference(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)), + createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post))); + + dropped.addAll(delta.entriesOnlyOnLeft().values()); + created.addAll(delta.entriesOnlyOnRight().values()); + Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>() + { + public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair) + { + return pair.rightValue(); + } + })); + } + else if (pre.hasColumns()) + { + dropped.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)).values()); + } + else if (post.hasColumns()) + { + created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)).values()); + } + } + + for (UDFunction udf : created) + Schema.instance.addFunction(udf); + for (UDFunction udf : altered) + Schema.instance.updateFunction(udf); + for (UDFunction udf : dropped) + Schema.instance.dropFunction(udf); + } + + // see the comments for mergeKeyspaces() + private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + { + List<UDAggregate> created = new ArrayList<>(); + List<UDAggregate> altered = new ArrayList<>(); + List<UDAggregate> dropped = new ArrayList<>(); + + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + + // New keyspace with functions + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + if (entry.getValue().hasColumns()) + created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values()); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + { + ColumnFamily pre = entry.getValue().leftValue(); + ColumnFamily post = entry.getValue().rightValue(); + + if (pre.hasColumns() && post.hasColumns()) + { + MapDifference<ByteBuffer, UDAggregate> delta = + Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)), + createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post))); + + dropped.addAll(delta.entriesOnlyOnLeft().values()); + created.addAll(delta.entriesOnlyOnRight().values()); + Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>() + { + public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair) + { + return pair.rightValue(); + } + })); + } + else if (pre.hasColumns()) + { + dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values()); + } + else if (post.hasColumns()) + { + created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values()); + } + } + + for (UDAggregate udf : created) + Schema.instance.addAggregate(udf); + for (UDAggregate udf : altered) + Schema.instance.updateAggregate(udf); + for (UDAggregate udf : dropped) + Schema.instance.dropAggregate(udf); + } + + /* + * Keyspace metadata serialization/deserialization. + */ + + public static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp) + { + return makeCreateKeyspaceMutation(keyspace, timestamp, true); + } + + private static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp, boolean withTablesAndTypesAndFunctions) + { + Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name)); + ColumnFamily cells = mutation.addOrGet(Keyspaces); + CFRowAdder adder = new CFRowAdder(cells, Keyspaces.comparator.builder().build(), timestamp); + + adder.add("durable_writes", keyspace.durableWrites); + adder.add("strategy_class", keyspace.strategyClass.getName()); + adder.add("strategy_options", json(keyspace.strategyOptions)); + + if (withTablesAndTypesAndFunctions) + { + for (UserType type : keyspace.userTypes.getAllTypes().values()) + addTypeToSchemaMutation(type, timestamp, mutation); + + for (CFMetaData table : keyspace.cfMetaData().values()) + addTableToSchemaMutation(table, timestamp, true, mutation); + } + + return mutation; + } + + public static Mutation makeDropKeyspaceMutation(KSMetaData keyspace, long timestamp) + { + Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name)); + for (String schemaTable : ALL) + mutation.delete(schemaTable, timestamp); + mutation.delete(SystemKeyspace.BUILT_INDEXES, timestamp); + return mutation; + } + + private static KSMetaData createKeyspaceFromSchemaPartitions(Row serializedKeyspace, Row serializedTables, Row serializedTypes) + { + Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables).values(); + UTMetaData types = new UTMetaData(createTypesFromPartition(serializedTypes)); + return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types); + } + + public static KSMetaData createKeyspaceFromName(String keyspace) + { + Row partition = readSchemaPartitionForKeyspace(KEYSPACES, keyspace); + + if (isEmptySchemaPartition(partition)) + throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES)); + + return createKeyspaceFromSchemaPartition(partition); + } + + /** + * Deserialize only Keyspace attributes without nested tables or types + * + * @param partition Keyspace attributes in serialized form + */ + private static KSMetaData createKeyspaceFromSchemaPartition(Row partition) + { + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES); + UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one(); + try + { + return new KSMetaData(row.getString("keyspace_name"), + AbstractReplicationStrategy.getClass(row.getString("strategy_class")), + fromJsonMap(row.getString("strategy_options")), + row.getBoolean("durable_writes")); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + /* + * User type metadata serialization/deserialization. + */ + + public static Mutation makeCreateTypeMutation(KSMetaData keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + addTypeToSchemaMutation(type, timestamp, mutation); + return mutation; + } + + private static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Usertypes); + + Composite prefix = Usertypes.comparator.make(type.name); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + + adder.resetCollection("field_names"); + adder.resetCollection("field_types"); + + for (int i = 0; i < type.size(); i++) + { + adder.addListEntry("field_names", type.fieldName(i)); + adder.addListEntry("field_types", type.fieldType(i).toString()); + } + } + + public static Mutation dropTypeFromSchemaMutation(KSMetaData keyspace, UserType type, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + + ColumnFamily cells = mutation.addOrGet(Usertypes); + int ldt = (int) (System.currentTimeMillis() / 1000); + + Composite prefix = Usertypes.comparator.make(type.name); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + + return mutation; + } + + private static Map<ByteBuffer, UserType> createTypesFromPartition(Row partition) + { + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, USERTYPES); + Map<ByteBuffer, UserType> types = new HashMap<>(); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + UserType type = createTypeFromRow(row); + types.put(type.name, type); + } + return types; + } + + private static UserType createTypeFromRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name")); + List<String> rawColumns = row.getList("field_names", UTF8Type.instance); + List<String> rawTypes = row.getList("field_types", UTF8Type.instance); + + List<ByteBuffer> columns = new ArrayList<>(rawColumns.size()); + for (String rawColumn : rawColumns) + columns.add(ByteBufferUtil.bytes(rawColumn)); + + List<AbstractType<?>> types = new ArrayList<>(rawTypes.size()); + for (String rawType : rawTypes) + types.add(parseType(rawType)); + + return new UserType(keyspace, name, columns, types); + } + + /* + * Table metadata serialization/deserialization. + */ + + public static Mutation makeCreateTableMutation(KSMetaData keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + addTableToSchemaMutation(table, timestamp, true, mutation); + return mutation; + } + + private static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean withColumnsAndTriggers, Mutation mutation) + { + // For property that can be null (and can be changed), we insert tombstones, to make sure + // we don't keep a property the user has removed + ColumnFamily cells = mutation.addOrGet(Columnfamilies); + Composite prefix = Columnfamilies.comparator.make(table.cfName); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + + adder.add("cf_id", table.cfId); + adder.add("type", table.cfType.toString()); + + if (table.isSuper()) + { + // We need to continue saving the comparator and subcomparator separatly, otherwise + // we won't know at deserialization if the subcomparator should be taken into account + // TODO: we should implement an on-start migration if we want to get rid of that. + adder.add("comparator", table.comparator.subtype(0).toString()); + adder.add("subcomparator", table.comparator.subtype(1).toString()); + } + else + { + adder.add("comparator", table.comparator.toString()); + } + + adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance()); + adder.add("caching", table.getCaching().toString()); + adder.add("comment", table.getComment()); + adder.add("compaction_strategy_class", table.compactionStrategyClass.getName()); + adder.add("compaction_strategy_options", json(table.compactionStrategyOptions)); + adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions())); + adder.add("default_time_to_live", table.getDefaultTimeToLive()); + adder.add("default_validator", table.getDefaultValidator().toString()); + adder.add("gc_grace_seconds", table.getGcGraceSeconds()); + adder.add("key_validator", table.getKeyValidator().toString()); + adder.add("local_read_repair_chance", table.getDcLocalReadRepairChance()); + adder.add("max_compaction_threshold", table.getMaxCompactionThreshold()); + adder.add("max_index_interval", table.getMaxIndexInterval()); + adder.add("memtable_flush_period_in_ms", table.getMemtableFlushPeriod()); + adder.add("min_compaction_threshold", table.getMinCompactionThreshold()); + adder.add("min_index_interval", table.getMinIndexInterval()); + adder.add("read_repair_chance", table.getReadRepairChance()); + adder.add("speculative_retry", table.getSpeculativeRetry().toString()); + + for (Map.Entry<ColumnIdentifier, Long> entry : table.getDroppedColumns().entrySet()) + adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue()); + + adder.add("is_dense", table.getIsDense()); + + if (withColumnsAndTriggers) + { + for (ColumnDefinition column : table.allColumns()) + addColumnToSchemaMutation(table, column, timestamp, mutation); + + for (TriggerDefinition trigger : table.getTriggers().values()) + addTriggerToSchemaMutation(table, trigger, timestamp, mutation); + } + } + + public static Mutation makeUpdateTableMutation(KSMetaData keyspace, + CFMetaData oldTable, + CFMetaData newTable, + long timestamp, + boolean fromThrift) + { + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + + addTableToSchemaMutation(newTable, timestamp, false, mutation); + + MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(oldTable.getColumnMetadata(), + newTable.getColumnMetadata()); + + // columns that are no longer needed + for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) + { + // Thrift only knows about the REGULAR ColumnDefinition type, so don't consider other type + // are being deleted just because they are not here. + if (fromThrift && column.kind != ColumnDefinition.Kind.REGULAR) + continue; + + dropColumnFromSchemaMutation(oldTable, column, timestamp, mutation); + } + + // newly added columns + for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) + addColumnToSchemaMutation(newTable, column, timestamp, mutation); + + // old columns with updated attributes + for (ByteBuffer name : columnDiff.entriesDiffering().keySet()) + addColumnToSchemaMutation(newTable, newTable.getColumnDefinition(name), timestamp, mutation); + + MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(oldTable.getTriggers(), newTable.getTriggers()); + + // dropped triggers + for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnLeft().values()) + dropTriggerFromSchemaMutation(oldTable, trigger, timestamp, mutation); + + // newly created triggers + for (TriggerDefinition trigger : triggerDiff.entriesOnlyOnRight().values()) + addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation); + + return mutation; + } + + public static Mutation makeDropTableMutation(KSMetaData keyspace, CFMetaData table, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + + ColumnFamily cells = mutation.addOrGet(Columnfamilies); + int ldt = (int) (System.currentTimeMillis() / 1000); + + Composite prefix = Columnfamilies.comparator.make(table.cfName); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + + for (ColumnDefinition column : table.allColumns()) + dropColumnFromSchemaMutation(table, column, timestamp, mutation); + + for (TriggerDefinition trigger : table.getTriggers().values()) + dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); + + // TODO: get rid of in #6717 + ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes); + for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes()) + indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp); + + return mutation; + } + + public static CFMetaData createTableFromName(String keyspace, String table) + { + Row partition = readSchemaPartitionForTable(COLUMNFAMILIES, keyspace, table); + + if (isEmptySchemaPartition(partition)) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table)); + + return createTableFromTablePartition(partition); + } + + /** + * Deserialize tables from low-level schema representation, all of them belong to the same keyspace + * + * @return map containing name of the table and its metadata for faster lookup + */ + private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition) + { + if (partition.cf == null) + return Collections.emptyMap(); + + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); + Map<String, CFMetaData> tables = new HashMap<>(); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + CFMetaData cfm = createTableFromTableRow(row); + tables.put(cfm.cfName, cfm); + } + return tables; + } + + public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns) + { + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); + return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns); + } + + private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns) + { + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS); + return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns)); + } + + private static CFMetaData createTableFromTablePartition(Row row) + { + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); + return createTableFromTableRow(QueryProcessor.resultify(query, row).one()); + } + + /** + * Deserialize table metadata from low-level representation + * + * @return Metadata deserialized from schema + */ + private static CFMetaData createTableFromTableRow(UntypedResultSet.Row result) + { + String ksName = result.getString("keyspace_name"); + String cfName = result.getString("columnfamily_name"); + + Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName); + CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns); + + Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName); + try + { + for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers)) + cfm.addTriggerDefinition(trigger); + } + catch (InvalidRequestException e) + { + throw new RuntimeException(e); + } + + return cfm; + } + + public static CFMetaData createTableFromTableRowAndColumnRows(UntypedResultSet.Row result, + UntypedResultSet serializedColumnDefinitions) + { + try + { + String ksName = result.getString("keyspace_name"); + String cfName = result.getString("columnfamily_name"); + + AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator")); + AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null; + ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type")); + + AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator); + + List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions, + ksName, + cfName, + fullRawComparator, + cfType == ColumnFamilyType.Super); + + boolean isDense = result.has("is_dense") + ? result.getBoolean("is_dense") + : CFMetaData.calculateIsDense(fullRawComparator, columnDefs); + + CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense); + + // if we are upgrading, we use id generated from names initially + UUID cfId = result.has("cf_id") + ? result.getUUID("cf_id") + : CFMetaData.generateLegacyCfId(ksName, cfName); + + CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId); + cfm.isDense(isDense); + + cfm.readRepairChance(result.getDouble("read_repair_chance")); + cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); + cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); + cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); + cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); + cfm.minCompactionThreshold(result.getInt("min_compaction_threshold")); + cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold")); + if (result.has("comment")) + cfm.comment(result.getString("comment")); + if (result.has("memtable_flush_period_in_ms")) + cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); + cfm.caching(CachingOptions.fromString(result.getString("caching"))); + if (result.has("default_time_to_live")) + cfm.defaultTimeToLive(result.getInt("default_time_to_live")); + if (result.has("speculative_retry")) + cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(result.getString("speculative_retry"))); + cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(result.getString("compaction_strategy_class"))); + cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); + cfm.compactionStrategyOptions(fromJsonMap(result.getString("compaction_strategy_options"))); + + if (result.has("min_index_interval")) + cfm.minIndexInterval(result.getInt("min_index_interval")); + + if (result.has("max_index_interval")) + cfm.maxIndexInterval(result.getInt("max_index_interval")); + + if (result.has("bloom_filter_fp_chance")) + cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); + else + cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); + + if (result.has("dropped_columns")) + cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance))); + + for (ColumnDefinition cd : columnDefs) + cfm.addOrReplaceColumnDefinition(cd); + + return cfm.rebuild(); + } + catch (SyntaxException | ConfigurationException e) + { + throw new RuntimeException(e); + } + } + + private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw) + { + Map<ColumnIdentifier, Long> converted = Maps.newHashMap(); + for (Map.Entry<String, Long> entry : raw.entrySet()) + converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue()); + return converted; + } + + /* + * Column metadata serialization/deserialization. + */ + + private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Columns); + Composite prefix = Columns.comparator.make(table.cfName, column.name.toString()); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + + adder.add("validator", column.type.toString()); + adder.add("type", serializeKind(column.kind)); + adder.add("component_index", column.isOnAllComponents() ? null : column.position()); + adder.add("index_name", column.getIndexName()); + adder.add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString()); + adder.add("index_options", json(column.getIndexOptions())); + } + + private static String serializeKind(ColumnDefinition.Kind kind) + { + // For backward compatibility we need to special case CLUSTERING_COLUMN + return kind == ColumnDefinition.Kind.CLUSTERING_COLUMN ? "clustering_key" : kind.toString().toLowerCase(); + } + + private static ColumnDefinition.Kind deserializeKind(String kind) + { + if (kind.equalsIgnoreCase("clustering_key")) + return ColumnDefinition.Kind.CLUSTERING_COLUMN; + return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase()); + } + + private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Columns); + int ldt = (int) (System.currentTimeMillis() / 1000); + + // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). + Composite prefix = Columns.comparator.make(table.cfName, column.name.toString()); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + } + + private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows, + String keyspace, + String table, + AbstractType<?> rawComparator, + boolean isSuper) + { + List<ColumnDefinition> columns = new ArrayList<>(); + for (UntypedResultSet.Row row : rows) + columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper)); + return columns; + } + + private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row, + String keyspace, + String table, + AbstractType<?> rawComparator, + boolean isSuper) + { + ColumnDefinition.Kind kind = deserializeKind(row.getString("type")); + + Integer componentIndex = null; + if (row.has("component_index")) + componentIndex = row.getInt("component_index"); + else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper) + componentIndex = 1; // A ColumnDefinition for super columns applies to the column component + + // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we + // we need to use the comparator fromString method + AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR + ? getComponentComparator(rawComparator, componentIndex) + : UTF8Type.instance; + ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator); + + AbstractType<?> validator = parseType(row.getString("validator")); + + IndexType indexType = null; + if (row.has("index_type")) + indexType = IndexType.valueOf(row.getString("index_type")); + + Map<String, String> indexOptions = null; + if (row.has("index_options")) + indexOptions = fromJsonMap(row.getString("index_options")); + + String indexName = null; + if (row.has("index_name")) + indexName = row.getString("index_name"); + + return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind); + } + + private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex) + { + return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType))) + ? rawComparator + : ((CompositeType)rawComparator).types.get(componentIndex); + } + + /* + * Trigger metadata serialization/deserialization. + */ + + private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Triggers); + Composite prefix = Triggers.comparator.make(table.cfName, trigger.name); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + adder.addMapEntry("trigger_options", "class", trigger.classOption); + } + + private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Triggers); + int ldt = (int) (System.currentTimeMillis() / 1000); + + Composite prefix = Triggers.comparator.make(table.cfName, trigger.name); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + } + + /** + * Deserialize triggers from storage-level representation. + * + * @param partition storage-level partition containing the trigger definitions + * @return the list of processed TriggerDefinitions + */ + private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition) + { + List<TriggerDefinition> triggers = new ArrayList<>(); + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + String name = row.getString("trigger_name"); + String classOption = row.getMap("trigger_options", UTF8Type.instance, UTF8Type.instance).get("class"); + triggers.add(new TriggerDefinition(name, classOption)); + } + return triggers; + } + + /* + * UDF metadata serialization/deserialization. + */ + + public static Mutation makeCreateFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + addFunctionToSchemaMutation(function, timestamp, mutation); + return mutation; + } + + private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Functions); + Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function)); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + + adder.resetCollection("argument_names"); + adder.resetCollection("argument_types"); + + for (int i = 0; i < function.argNames().size(); i++) + { + adder.addListEntry("argument_names", function.argNames().get(i).bytes); + adder.addListEntry("argument_types", function.argTypes().get(i).toString()); + } + + adder.add("body", function.body()); + adder.add("is_deterministic", function.isDeterministic()); + adder.add("language", function.language()); + adder.add("return_type", function.returnType().toString()); + } + + public static Mutation makeDropFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + + ColumnFamily cells = mutation.addOrGet(Functions); + int ldt = (int) (System.currentTimeMillis() / 1000); + + Composite prefix = Functions.comparator.make(function.name().name, UDHelper.calculateSignature(function)); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + + return mutation; + } + + private static Map<ByteBuffer, UDFunction> createFunctionsFromFunctionsPartition(Row partition) + { + Map<ByteBuffer, UDFunction> functions = new HashMap<>(); + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, FUNCTIONS); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + UDFunction function = createFunctionFromFunctionRow(row); + functions.put(UDHelper.calculateSignature(function), function); + } + return functions; + } + + private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("function_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List<ColumnIdentifier> argNames = new ArrayList<>(); + if (row.has("argument_names")) + for (String arg : row.getList("argument_names", UTF8Type.instance)) + argNames.add(new ColumnIdentifier(arg, true)); + + List<AbstractType<?>> argTypes = new ArrayList<>(); + if (row.has("argument_types")) + for (String type : row.getList("argument_types", UTF8Type.instance)) + argTypes.add(parseType(type)); + + AbstractType<?> returnType = parseType(row.getString("return_type")); + + boolean isDeterministic = row.getBoolean("is_deterministic"); + String language = row.getString("language"); + String body = row.getString("body"); + + try + { + return UDFunction.create(name, argNames, argTypes, returnType, language, body, isDeterministic); + } + catch (InvalidRequestException e) + { + logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e); + return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, language, body, e); + } + } + + /* + * Aggregate UDF metadata serialization/deserialization. + */ + + public static Mutation makeCreateAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + addAggregateToSchemaMutation(aggregate, timestamp, mutation); + return mutation; + } + + private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) + { + ColumnFamily cells = mutation.addOrGet(Aggregates); + Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate)); + CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + + adder.resetCollection("argument_types"); + adder.add("return_type", aggregate.returnType().toString()); + adder.add("state_func", aggregate.stateFunction().name().name); + if (aggregate.stateType() != null) + adder.add("state_type", aggregate.stateType().toString()); + if (aggregate.finalFunction() != null) + adder.add("final_func", aggregate.finalFunction().name().name); + if (aggregate.initialCondition() != null) + adder.add("initcond", aggregate.initialCondition()); + + for (AbstractType<?> argType : aggregate.argTypes()) + adder.addListEntry("argument_types", argType.toString()); + } + + private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition) + { + Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>(); + String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + UDAggregate aggregate = createAggregateFromAggregateRow(row); + aggregates.put(UDHelper.calculateSignature(aggregate), aggregate); + } + return aggregates; + } + + private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row) + { + String ksName = row.getString("keyspace_name"); + String functionName = row.getString("aggregate_name"); + FunctionName name = new FunctionName(ksName, functionName); + + List<String> types = row.getList("argument_types", UTF8Type.instance); + + List<AbstractType<?>> argTypes; + if (types == null) + { + argTypes = Collections.emptyList(); + } + else + { + argTypes = new ArrayList<>(types.size()); + for (String type : types) + argTypes.add(parseType(type)); + } + + AbstractType<?> returnType = parseType(row.getString("return_type")); + + FunctionName stateFunc = new FunctionName(ksName, row.getString("state_func")); + FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; + AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null; + ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; + + try + { + return UDAggregate.create(name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond); + } + catch (InvalidRequestException reason) + { + return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason); + } + } + + public static Mutation makeDropAggregateMutation(KSMetaData keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); + + ColumnFamily cells = mutation.addOrGet(Aggregates); + int ldt = (int) (System.currentTimeMillis() / 1000); + + Composite prefix = Aggregates.comparator.make(aggregate.name().name, UDHelper.calculateSignature(aggregate)); + cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + + return mutation; + } + + private static AbstractType<?> parseType(String str) + { + try + { + return TypeParser.parse(str); + } + catch (SyntaxException | ConfigurationException e) + { + // We only use this when reading the schema where we shouldn't get an error + throw new RuntimeException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 99a2d87..edc91ab 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -34,7 +34,6 @@ import javax.management.StandardMBean; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; -import org.hyperic.sigar.SigarException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,6 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.functions.Functions; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; @@ -253,10 +251,8 @@ public class CassandraDaemon exitOrFail(100, "Fatal exception during initialization", e); } - - // load keyspace && function descriptions. - DatabaseDescriptor.loadSchemas(); - Functions.loadUDFFromSchema(); + // load schema from disk + Schema.instance.loadFromDisk(); // clean up compaction leftovers Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 9c8d7d8..dcdf838 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -58,7 +59,7 @@ public class ClientState { // We want these system cfs to be always readable to authenticated users since many tools rely on them // (nodetool, cqlsh, bulkloader, etc.) - for (String cf : Iterables.concat(Arrays.asList(SystemKeyspace.LOCAL_TABLE, SystemKeyspace.PEERS_TABLE), SystemKeyspace.ALL_SCHEMA_TABLES)) + for (String cf : Iterables.concat(Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS), LegacySchemaTables.ALL)) READABLE_SYSTEM_RESOURCES.add(DataResource.columnFamily(SystemKeyspace.NAME, cf)); PROTECTED_AUTH_RESOURCES.addAll(DatabaseDescriptor.getAuthenticator().protectedResources()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index c3fe1fa..fe32559 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -37,7 +37,6 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.UTMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.cql3.functions.UDFunction; @@ -50,6 +49,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; @@ -63,7 +63,7 @@ public class MigrationManager public static final int MIGRATION_DELAY_IN_MS = 60000; - private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>(); + private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<>(); private MigrationManager() {} @@ -182,52 +182,40 @@ public class MigrationManager listener.onCreateFunction(udf.name().keyspace, udf.name().name); } - public void notifyUpdateFunction(UDFunction udf) - { - for (IMigrationListener listener : listeners) - listener.onUpdateFunction(udf.name().keyspace, udf.name().name); - } - - public void notifyDropFunction(UDFunction udf) - { - for (IMigrationListener listener : listeners) - listener.onDropFunction(udf.name().keyspace, udf.name().name); - } - public void notifyCreateAggregate(UDAggregate udf) { for (IMigrationListener listener : listeners) listener.onCreateAggregate(udf.name().keyspace, udf.name().name); } - public void notifyUpdateAggregate(UDAggregate udf) + public void notifyUpdateKeyspace(KSMetaData ksm) { for (IMigrationListener listener : listeners) - listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); + listener.onUpdateKeyspace(ksm.name); } - public void notifyDropAggregate(UDAggregate udf) + public void notifyUpdateColumnFamily(CFMetaData cfm) { for (IMigrationListener listener : listeners) - listener.onDropAggregate(udf.name().keyspace, udf.name().name); + listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); } - public void notifyUpdateKeyspace(KSMetaData ksm) + public void notifyUpdateUserType(UserType ut) { for (IMigrationListener listener : listeners) - listener.onUpdateKeyspace(ksm.name); + listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); } - public void notifyUpdateColumnFamily(CFMetaData cfm) + public void notifyUpdateFunction(UDFunction udf) { for (IMigrationListener listener : listeners) - listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName); + listener.onUpdateFunction(udf.name().keyspace, udf.name().name); } - public void notifyUpdateUserType(UserType ut) + public void notifyUpdateAggregate(UDAggregate udf) { for (IMigrationListener listener : listeners) - listener.onUpdateUserType(ut.keyspace, ut.getNameAsString()); + listener.onUpdateAggregate(udf.name().keyspace, udf.name().name); } public void notifyDropKeyspace(KSMetaData ksm) @@ -248,6 +236,18 @@ public class MigrationManager listener.onDropUserType(ut.keyspace, ut.getNameAsString()); } + public void notifyDropFunction(UDFunction udf) + { + for (IMigrationListener listener : listeners) + listener.onDropFunction(udf.name().keyspace, udf.name().name); + } + + public void notifyDropAggregate(UDAggregate udf) + { + for (IMigrationListener listener : listeners) + listener.onDropAggregate(udf.name().keyspace, udf.name().name); + } + public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException { announceNewKeyspace(ksm, false); @@ -266,7 +266,7 @@ public class MigrationManager throw new AlreadyExistsException(ksm.name); logger.info(String.format("Create new Keyspace: %s", ksm)); - announce(ksm.toSchema(timestamp), announceLocally); + announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); } public static void announceNewColumnFamily(CFMetaData cfm) throws ConfigurationException @@ -285,17 +285,27 @@ public class MigrationManager throw new AlreadyExistsException(cfm.ksName, cfm.cfName); logger.info(String.format("Create new table: %s", cfm)); - announce(addSerializedKeyspace(cfm.toSchema(FBUtilities.timestampMicros()), cfm.ksName), announceLocally); + announce(LegacySchemaTables.makeCreateTableMutation(ksm, cfm, FBUtilities.timestampMicros()), announceLocally); } - public static void announceNewType(UserType newType) + public static void announceNewType(UserType newType, boolean announceLocally) { - announceNewType(newType, false); + KSMetaData ksm = Schema.instance.getKSMetaData(newType.keyspace); + announce(LegacySchemaTables.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); } - public static void announceNewType(UserType newType, boolean announceLocally) + public static void announceNewFunction(UDFunction udf, boolean announceLocally) { - announce(addSerializedKeyspace(UTMetaData.toSchema(newType, FBUtilities.timestampMicros()), newType.keyspace), announceLocally); + logger.info(String.format("Create scalar function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + } + + public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) + { + logger.info(String.format("Create aggregate function '%s'", udf.name())); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); } public static void announceKeyspaceUpdate(KSMetaData ksm) throws ConfigurationException @@ -312,7 +322,7 @@ public class MigrationManager throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); logger.info(String.format("Update Keyspace '%s' From %s To %s", ksm.name, oldKsm, ksm)); - announce(oldKsm.toSchemaUpdate(ksm, FBUtilities.timestampMicros()), announceLocally); + announce(LegacySchemaTables.makeCreateKeyspaceMutation(ksm, FBUtilities.timestampMicros()), announceLocally); } public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift) throws ConfigurationException @@ -327,16 +337,12 @@ public class MigrationManager CFMetaData oldCfm = Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName); if (oldCfm == null) throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", cfm.cfName, cfm.ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); oldCfm.validateCompatility(cfm); logger.info(String.format("Update table '%s/%s' From %s To %s", cfm.ksName, cfm.cfName, oldCfm, cfm)); - announce(addSerializedKeyspace(oldCfm.toSchemaUpdate(cfm, FBUtilities.timestampMicros(), fromThrift), cfm.ksName), announceLocally); - } - - public static void announceTypeUpdate(UserType updatedType) - { - announceTypeUpdate(updatedType, false); + announce(LegacySchemaTables.makeUpdateTableMutation(ksm, oldCfm, cfm, FBUtilities.timestampMicros(), fromThrift), announceLocally); } public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) @@ -356,7 +362,7 @@ public class MigrationManager throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); logger.info(String.format("Drop Keyspace '%s'", oldKsm.name)); - announce(oldKsm.dropFromSchema(FBUtilities.timestampMicros()), announceLocally); + announce(LegacySchemaTables.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); } public static void announceColumnFamilyDrop(String ksName, String cfName) throws ConfigurationException @@ -369,16 +375,10 @@ public class MigrationManager CFMetaData oldCfm = Schema.instance.getCFMetaData(ksName, cfName); if (oldCfm == null) throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); + KSMetaData ksm = Schema.instance.getKSMetaData(ksName); logger.info(String.format("Drop table '%s/%s'", oldCfm.ksName, oldCfm.cfName)); - announce(addSerializedKeyspace(oldCfm.dropFromSchema(FBUtilities.timestampMicros()), ksName), announceLocally); - } - - // Include the serialized keyspace for when a target node missed the CREATE KEYSPACE migration (see #5631). - private static Mutation addSerializedKeyspace(Mutation migration, String ksName) - { - migration.add(SystemKeyspace.readSchemaRow(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, ksName).cf); - return migration; + announce(LegacySchemaTables.makeDropTableMutation(ksm, oldCfm, FBUtilities.timestampMicros()), announceLocally); } public static void announceTypeDrop(UserType droppedType) @@ -388,35 +388,22 @@ public class MigrationManager public static void announceTypeDrop(UserType droppedType, boolean announceLocally) { - announce(addSerializedKeyspace(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()), droppedType.keyspace), announceLocally); + KSMetaData ksm = Schema.instance.getKSMetaData(droppedType.keyspace); + announce(LegacySchemaTables.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); } public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) { - Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros()); logger.info(String.format("Drop scalar function overload '%s' args '%s'", udf.name(), udf.argTypes())); - announce(mutation, announceLocally); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); } public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) { - Mutation mutation = udf.toSchemaDrop(FBUtilities.timestampMicros()); logger.info(String.format("Drop aggregate function overload '%s' args '%s'", udf.name(), udf.argTypes())); - announce(mutation, announceLocally); - } - - public static void announceNewFunction(UDFunction udf, boolean announceLocally) - { - Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros()); - logger.info(String.format("Create scalar function '%s'", udf.name())); - announce(mutation, announceLocally); - } - - public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) - { - Mutation mutation = udf.toSchemaUpdate(FBUtilities.timestampMicros()); - logger.info(String.format("Create aggregate function '%s'", udf.name())); - announce(mutation, announceLocally); + KSMetaData ksm = Schema.instance.getKSMetaData(udf.name().keyspace); + announce(LegacySchemaTables.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); } /** @@ -429,7 +416,7 @@ public class MigrationManager { try { - DefsTables.mergeSchemaInternal(Collections.singletonList(schema), false); + LegacySchemaTables.mergeSchema(Collections.singletonList(schema), false); } catch (IOException e) { @@ -457,7 +444,7 @@ public class MigrationManager { protected void runMayThrow() throws IOException, ConfigurationException { - DefsTables.mergeSchema(schema); + LegacySchemaTables.mergeSchema(schema); } }); @@ -497,9 +484,7 @@ public class MigrationManager logger.debug("Truncating schema tables..."); - // truncate schema tables - for (String cf : SystemKeyspace.ALL_SCHEMA_TABLES) - SystemKeyspace.schemaCFS(cf).truncateBlocking(); + LegacySchemaTables.truncateSchemaTables(); logger.debug("Clearing local schema keyspace definitions..."); @@ -536,7 +521,7 @@ public class MigrationManager public Collection<Mutation> deserialize(DataInput in, int version) throws IOException { int count = in.readInt(); - Collection<Mutation> schema = new ArrayList<Mutation>(count); + Collection<Mutation> schema = new ArrayList<>(count); for (int i = 0; i < count; i++) schema.add(Mutation.serializer.deserialize(in, version)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index 9fdbff4..2a67e6d 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.DefsTables; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageIn; @@ -63,7 +63,7 @@ class MigrationTask extends WrappedRunnable { try { - DefsTables.mergeSchema(message.payload); + LegacySchemaTables.mergeSchema(message.payload); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9c57946..d6eb317 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -713,7 +713,7 @@ public class StorageProxy implements StorageProxyMBean null, WriteType.SIMPLE); Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid)); - mutation.delete(SystemKeyspace.BATCHLOG_TABLE, FBUtilities.timestampMicros()); + mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); MessageOut<Mutation> message = mutation.createMessage(); for (InetAddress target : endpoints) {
