http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 4ff8a23..e4e50a0 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -17,14 +17,12 @@ */ package org.apache.cassandra.schema; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; @@ -47,9 +45,14 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.Pair; +import static java.lang.String.format; + +import static java.util.stream.Collectors.toList; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.schema.CQLTypeParser.parse; /** * system_schema.* tables and methods for manipulating them. @@ -77,7 +80,6 @@ public final class SchemaKeyspace public static final String AGGREGATES = "aggregates"; public static final String INDEXES = "indexes"; - public static final List<String> ALL = ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); @@ -205,14 +207,13 @@ public final class SchemaKeyspace "CREATE TABLE %s (" + "keyspace_name text," + "function_name text," - + "signature frozen<list<text>>," - + "argument_names frozen<list<text>>," + "argument_types frozen<list<text>>," + + "argument_names frozen<list<text>>," + "body text," + "language text," + "return_type text," + "called_on_null_input boolean," - + "PRIMARY KEY ((keyspace_name), function_name, signature))"); + + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); private static final CFMetaData Aggregates = compile(AGGREGATES, @@ -220,14 +221,13 @@ public final class SchemaKeyspace "CREATE TABLE %s (" + "keyspace_name text," + "aggregate_name text," - + "signature frozen<list<text>>," + "argument_types frozen<list<text>>," + "final_func text," + "initcond blob," + "return_type text," + "state_func text," + "state_type text," - + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); + + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); public static final List<CFMetaData> ALL_TABLE_METADATA = ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes); @@ -267,35 +267,6 @@ public final class SchemaKeyspace makeCreateKeyspaceMutation(schema, timestamp + 1).apply(); } - public static List<KeyspaceMetadata> readSchemaFromSystemTables() - { - ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES); - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup)) - { - List<KeyspaceMetadata> keyspaces = new ArrayList<>(); - - while (schema.hasNext()) - { - try (RowIterator partition = schema.next()) - { - if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) - continue; - - DecoratedKey key = partition.partitionKey(); - - readSchemaPartitionForKeyspaceAndApply(TYPES, key, - types -> readSchemaPartitionForKeyspaceAndApply(TABLES, key, - tables -> readSchemaPartitionForKeyspaceAndApply(VIEWS, key, - views -> readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key, - functions -> readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key, - aggregates -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, views, types, functions, aggregates)))))) - ); - } - } - return keyspaces; - } - } - public static void truncate() { ALL.forEach(table -> getSchemaCFS(table).truncateBlocking()); @@ -397,336 +368,18 @@ public final class SchemaKeyspace } } - private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames) - { - Map<DecoratedKey, FilteredPartition> schema = new HashMap<>(); - - for (String keyspaceName : keyspaceNames) - { - // We don't to return the RowIterator directly because we should guarantee that this iterator - // will be closed, and putting it in a Map make that harder/more awkward. - readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName, - partition -> { - if (!partition.isEmpty()) - schema.put(partition.partitionKey(), FilteredPartition.create(partition)); - return null; - } - ); - } - - return schema; - } - private static ByteBuffer getSchemaKSKey(String ksName) { return AsciiType.instance.fromString(ksName); } - private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct) - { - return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSKey(keyspaceName), fct); - } - - private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, ByteBuffer keyspaceKey, Function<RowIterator, T> fct) - { - ColumnFamilyStore store = getSchemaCFS(schemaTableName); - return readSchemaPartitionForKeyspaceAndApply(store, store.decorateKey(keyspaceKey), fct); - } - - private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct) - { - return readSchemaPartitionForKeyspaceAndApply(getSchemaCFS(schemaTableName), keyspaceKey, fct); - } - - private static <T> T readSchemaPartitionForKeyspaceAndApply(ColumnFamilyStore store, DecoratedKey keyspaceKey, Function<RowIterator, T> fct) - { - int nowInSec = FBUtilities.nowInSeconds(); - try (OpOrder.Group op = store.readOrdering.start(); - RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey) - .queryMemtableAndDisk(store, op), nowInSec)) - { - return fct.apply(partition); - } - } - - private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct) - { - ColumnFamilyStore store = getSchemaCFS(schemaTableName); - - ClusteringComparator comparator = store.metadata.comparator; - Slices slices = Slices.with(comparator, Slice.make(comparator, tableName)); - int nowInSec = FBUtilities.nowInSeconds(); - try (OpOrder.Group op = store.readOrdering.start(); - RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices) - .queryMemtableAndDisk(store, op), nowInSec)) - { - return fct.apply(partition); - } - } - private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey) { return Schema.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey())); } - /** - * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects - * (which also involves fs operations on add/drop ks/cf) - * - * @param mutations the schema changes to apply - * - * @throws ConfigurationException If one of metadata attributes has invalid value - * @throws IOException If data was corrupted during transportation or failed to apply fs operations - */ - public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException, IOException - { - mergeSchema(mutations); - Schema.instance.updateVersionAndAnnounce(); - } - - public static synchronized void mergeSchema(Collection<Mutation> mutations) throws IOException - { - // compare before/after schemas of the affected keyspaces only - Set<String> keyspaces = new HashSet<>(mutations.size()); - for (Mutation mutation : mutations) - keyspaces.add(ByteBufferUtil.string(mutation.key().getKey())); - - // current state of the schema - Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces); - Map<DecoratedKey, FilteredPartition> oldViews = readSchemaForKeyspaces(VIEWS, keyspaces); - Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(TYPES, keyspaces); - Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); - - mutations.forEach(Mutation::apply); - - if (FLUSH_SCHEMA_TABLES) - flush(); - - // with new data applied - Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(TABLES, keyspaces); - Map<DecoratedKey, FilteredPartition> newViews = readSchemaForKeyspaces(VIEWS, keyspaces); - Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(TYPES, keyspaces); - Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); - - Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); - mergeTables(oldColumnFamilies, newColumnFamilies); - mergeViews(oldViews, newViews); - mergeTypes(oldTypes, newTypes); - mergeFunctions(oldFunctions, newFunctions); - mergeAggregates(oldAggregates, newAggregates); - - // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted - keyspacesToDrop.forEach(Schema.instance::dropKeyspace); - } - - private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - for (FilteredPartition newPartition : after.values()) - { - String name = AsciiType.instance.compose(newPartition.partitionKey().getKey()); - KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(newPartition.rowIterator()); - - FilteredPartition oldPartition = before.remove(newPartition.partitionKey()); - if (oldPartition == null || oldPartition.isEmpty()) - Schema.instance.addKeyspace(KeyspaceMetadata.create(name, params)); - else - Schema.instance.updateKeyspace(name, params); - } - - // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones. - return asKeyspaceNamesSet(before.keySet()); - } - - private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys) - { - Set<String> names = new HashSet<>(keys.size()); - for (DecoratedKey key : keys) - names.add(AsciiType.instance.compose(key.getKey())); - return names; - } - - private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - diffSchema(before, after, new Differ() - { - public void onDropped(UntypedResultSet.Row oldRow) - { - Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("table_name")); - } - - public void onAdded(UntypedResultSet.Row newRow) - { - Schema.instance.addTable(createTableFromTableRow(newRow)); - } - - public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) - { - Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("table_name")); - } - }); - } - - private static void mergeViews(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - diffSchema(before, after, new Differ() - { - public void onDropped(UntypedResultSet.Row oldRow) - { - Schema.instance.dropView(oldRow.getString("keyspace_name"), oldRow.getString("view_name")); - } - - public void onAdded(UntypedResultSet.Row newRow) - { - Schema.instance.addView(createViewFromViewRow(newRow)); - } - - public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) - { - Schema.instance.updateView(newRow.getString("keyspace_name"), newRow.getString("view_name")); - } - }); - } - - private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - diffSchema(before, after, new Differ() - { - public void onDropped(UntypedResultSet.Row oldRow) - { - Schema.instance.dropType(createTypeFromRow(oldRow)); - } - - public void onAdded(UntypedResultSet.Row newRow) - { - Schema.instance.addType(createTypeFromRow(newRow)); - } - - public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) - { - Schema.instance.updateType(createTypeFromRow(newRow)); - } - }); - } - - private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - diffSchema(before, after, new Differ() - { - public void onDropped(UntypedResultSet.Row oldRow) - { - Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow)); - } - - public void onAdded(UntypedResultSet.Row newRow) - { - Schema.instance.addFunction(createFunctionFromFunctionRow(newRow)); - } - - public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) - { - Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow)); - } - }); - } - - private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) - { - diffSchema(before, after, new Differ() - { - public void onDropped(UntypedResultSet.Row oldRow) - { - Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow)); - } - - public void onAdded(UntypedResultSet.Row newRow) - { - Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow)); - } - - public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) - { - Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow)); - } - }); - } - - public interface Differ - { - void onDropped(UntypedResultSet.Row oldRow); - void onAdded(UntypedResultSet.Row newRow); - void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow); - } - - private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ) - { - for (FilteredPartition newPartition : after.values()) - { - CFMetaData metadata = newPartition.metadata(); - DecoratedKey key = newPartition.partitionKey(); - - FilteredPartition oldPartition = before.remove(key); - - if (oldPartition == null || oldPartition.isEmpty()) - { - // Means everything is to be added - for (Row row : newPartition) - differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row)); - continue; - } - - Iterator<Row> oldIter = oldPartition.iterator(); - Iterator<Row> newIter = newPartition.iterator(); - - Row oldRow = oldIter.hasNext() ? oldIter.next() : null; - Row newRow = newIter.hasNext() ? newIter.next() : null; - while (oldRow != null && newRow != null) - { - int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering()); - if (cmp < 0) - { - differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); - oldRow = oldIter.hasNext() ? oldIter.next() : null; - } - else if (cmp > 0) - { - - differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); - newRow = newIter.hasNext() ? newIter.next() : null; - } - else - { - if (!oldRow.equals(newRow)) - differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); - - oldRow = oldIter.hasNext() ? oldIter.next() : null; - newRow = newIter.hasNext() ? newIter.next() : null; - } - } - - while (oldRow != null) - { - differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); - oldRow = oldIter.hasNext() ? oldIter.next() : null; - } - while (newRow != null) - { - differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); - newRow = newIter.hasNext() ? newIter.next() : null; - } - } - - // What remains is those keys that were only in before. - for (FilteredPartition partition : before.values()) - for (Row row : partition) - differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row)); - } - /* - * Keyspace metadata serialization/deserialization. + * Schema entities to mutations */ public static Mutation makeCreateKeyspaceMutation(String name, KeyspaceParams params, long timestamp) @@ -761,46 +414,6 @@ public final class SchemaKeyspace return mutation; } - private static KeyspaceMetadata createKeyspaceFromSchemaPartitions(RowIterator serializedParams, - RowIterator serializedTables, - RowIterator serializedViews, - RowIterator serializedTypes, - RowIterator serializedFunctions, - RowIterator serializedAggregates) - { - String name = AsciiType.instance.compose(serializedParams.partitionKey().getKey()); - - KeyspaceParams params = createKeyspaceParamsFromSchemaPartition(serializedParams); - Tables tables = createTablesFromTablesPartition(serializedTables); - Views views = createViewsFromViewsPartition(serializedViews); - Types types = createTypesFromPartition(serializedTypes); - - Collection<UDFunction> udfs = createFunctionsFromFunctionsPartition(serializedFunctions); - Functions functions = org.apache.cassandra.schema.Functions.builder().add(udfs).build(); - functions = createAggregatesFromAggregatesPartition(functions, serializedAggregates); - - return KeyspaceMetadata.create(name, params, tables, views, types, functions); - } - - /** - * Deserialize only Keyspace attributes without nested tables or types - * - * @param partition Keyspace attributes in serialized form - */ - - private static KeyspaceParams createKeyspaceParamsFromSchemaPartition(RowIterator partition) - { - String query = String.format("SELECT * FROM %s.%s", NAME, KEYSPACES); - UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one(); - - return KeyspaceParams.create(row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()), - row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString())); - } - - /* - * User type metadata serialization/deserialization. - */ - public static Mutation makeCreateTypeMutation(KeyspaceMetadata keyspace, UserType type, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). @@ -813,8 +426,8 @@ public final class SchemaKeyspace { RowUpdateBuilder adder = new RowUpdateBuilder(Types, timestamp, mutation) .clustering(type.getNameAsString()) - .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(Collectors.toList())) - .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::toString).collect(Collectors.toList())); + .frozenList("field_names", type.fieldNames().stream().map(SchemaKeyspace::bbToString).collect(toList())) + .frozenList("field_types", type.fieldTypes().stream().map(AbstractType::asCQL3Type).map(CQL3Type::toString).collect(toList())); adder.build(); } @@ -838,36 +451,6 @@ public final class SchemaKeyspace return RowUpdateBuilder.deleteRow(Types, timestamp, mutation, type.name); } - private static Types createTypesFromPartition(RowIterator partition) - { - String query = String.format("SELECT * FROM %s.%s", NAME, TYPES); - Types.Builder types = org.apache.cassandra.schema.Types.builder(); - QueryProcessor.resultify(query, partition).forEach(row -> types.add(createTypeFromRow(row))); - return types.build(); - } - - private static UserType createTypeFromRow(UntypedResultSet.Row row) - { - String keyspace = row.getString("keyspace_name"); - ByteBuffer name = ByteBufferUtil.bytes(row.getString("type_name")); - List<String> rawColumns = row.getFrozenList("field_names", UTF8Type.instance); - List<String> rawTypes = row.getFrozenList("field_types", UTF8Type.instance); - - List<ByteBuffer> columns = new ArrayList<>(rawColumns.size()); - for (String rawColumn : rawColumns) - columns.add(ByteBufferUtil.bytes(rawColumn)); - - List<AbstractType<?>> types = new ArrayList<>(rawTypes.size()); - for (String rawType : rawTypes) - types.add(parseType(rawType)); - - return new UserType(keyspace, name, columns, types); - } - - /* - * Table metadata serialization/deserialization. - */ - public static Mutation makeCreateTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). @@ -990,9 +573,7 @@ public final class SchemaKeyspace // updated indexes need to be updated for (MapDifference.ValueDifference<IndexMetadata> diff : indexesDiff.entriesDiffering().values()) - { addUpdatedIndexToSchemaMutation(newTable, diff.rightValue(), timestamp, mutation); - } return mutation; } @@ -1038,274 +619,59 @@ public final class SchemaKeyspace return mutation; } - public static CFMetaData createTableFromName(String keyspace, String table) + private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) { - return readSchemaPartitionForTableAndApply(TABLES, keyspace, table, partition -> - { - if (partition.isEmpty()) - throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table)); + RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString()); + + AbstractType<?> type = column.type; + if (type instanceof ReversedType) + type = ((ReversedType) type).baseType; - return createTableFromTablePartition(partition); - }); + adder.add("column_name_bytes", column.name.bytes) + .add("kind", column.kind.toString().toLowerCase()) + .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position()) + .add("clustering_order", column.clusteringOrder().toString().toLowerCase()) + .add("type", type.asCQL3Type().toString()) + .build(); } - /** - * Deserialize tables from low-level schema representation, all of them belong to the same keyspace - */ - private static Tables createTablesFromTablesPartition(RowIterator partition) + private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) { - String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); - Tables.Builder tables = org.apache.cassandra.schema.Tables.builder(); - QueryProcessor.resultify(query, partition).forEach(row -> tables.add(createTableFromTableRow(row))); - return tables.build(); + // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). + RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString()); } - private static List<ColumnDefinition> createColumnsFromColumnsPartition(RowIterator serializedColumns) + private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation) { - String query = String.format("SELECT * FROM %s.%s", NAME, COLUMNS); - return createColumnsFromColumnRows(QueryProcessor.resultify(query, serializedColumns)); + RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name); + + adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime))) + .add("type", expandUserTypes(column.type).asCQL3Type().toString()) + .build(); } - private static CFMetaData createTableFromTablePartition(RowIterator partition) + private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) { - String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); - return createTableFromTableRow(QueryProcessor.resultify(query, partition).one()); + new RowUpdateBuilder(Triggers, timestamp, mutation) + .clustering(table.cfName, trigger.name) + .frozenMap("options", Collections.singletonMap("class", trigger.classOption)) + .build(); } - public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator tablePartition, - RowIterator columnsPartition) + private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) { - List<ColumnDefinition> columns = createColumnsFromColumnsPartition(columnsPartition); - String query = String.format("SELECT * FROM %s.%s", NAME, TABLES); - return createTableFromTableRowAndColumns(QueryProcessor.resultify(query, tablePartition).one(), columns); + RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name); } - /** - * Deserialize table metadata from low-level representation - * - * @return Metadata deserialized from schema - */ - private static CFMetaData createTableFromTableRow(UntypedResultSet.Row row) + public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) { - String keyspace = row.getString("keyspace_name"); - String table = row.getString("table_name"); - - List<ColumnDefinition> columns = - readSchemaPartitionForTableAndApply(COLUMNS, keyspace, table, SchemaKeyspace::createColumnsFromColumnsPartition); - - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = - readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, table, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition); - - Triggers triggers = - readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition); - - CFMetaData cfm = createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns) - .triggers(triggers); - - // the CFMetaData itself is required to build the collection of indexes as - // the column definitions are needed because we store only the name each - // index's target columns and this is not enough to reconstruct a ColumnIdentifier - org.apache.cassandra.schema.Indexes indexes = - readSchemaPartitionForTableAndApply(INDEXES, keyspace, table, rowIterator -> createIndexesFromIndexesPartition(cfm, rowIterator)); - cfm.indexes(indexes); - - return cfm; + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addViewToSchemaMutation(view, timestamp, true, mutation); + return mutation; } - public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns) - { - String keyspace = row.getString("keyspace_name"); - String table = row.getString("table_name"); - UUID id = row.getUUID("id"); - - Set<CFMetaData.Flag> flags = row.has("flags") - ? CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance)) - : Collections.emptySet(); - - boolean isSuper = flags.contains(CFMetaData.Flag.SUPER); - boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); - boolean isDense = flags.contains(CFMetaData.Flag.DENSE); - boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); - - return CFMetaData.create(keyspace, - table, - id, - isDense, - isCompound, - isSuper, - isCounter, - false, - columns, - DatabaseDescriptor.getPartitioner()) - .params(createTableParamsFromRow(row)); - } - - private static TableParams createTableParamsFromRow(UntypedResultSet.Row row) - { - TableParams.Builder builder = TableParams.builder(); - - builder.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) - .caching(CachingParams.fromMap(row.getFrozenTextMap("caching"))) - .comment(row.getString("comment")) - .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction"))) - .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression"))) - .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) - .defaultTimeToLive(row.getInt("default_time_to_live")) - .gcGraceSeconds(row.getInt("gc_grace_seconds")) - .maxIndexInterval(row.getInt("max_index_interval")) - .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) - .minIndexInterval(row.getInt("min_index_interval")) - .readRepairChance(row.getDouble("read_repair_chance")) - .crcCheckChance(row.getDouble("crc_check_chance")) - .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); - - if (row.has("extensions")) - builder.extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance)); - - return builder.build(); - } - - /* - * Column metadata serialization/deserialization. - */ - - private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) - { - RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation).clustering(table.cfName, column.name.toString()); - - AbstractType<?> type = column.type; - if (type instanceof ReversedType) - type = ((ReversedType) type).baseType; - - adder.add("column_name_bytes", column.name.bytes) - .add("kind", column.kind.toString().toLowerCase()) - .add("position", column.isOnAllComponents() ? ColumnDefinition.NO_POSITION : column.position()) - .add("clustering_order", column.clusteringOrder().toString().toLowerCase()) - .add("type", type.toString()) - .build(); - } - - private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) - { - // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). - RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString()); - } - - private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows) -{ - List<ColumnDefinition> columns = new ArrayList<>(rows.size()); - rows.forEach(row -> columns.add(createColumnFromColumnRow(row))); - return columns; - } - - private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row) - { - String keyspace = row.getString("keyspace_name"); - String table = row.getString("table_name"); - - ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name")); - - ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()); - - int position = row.getInt("position"); - ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase()); - - AbstractType<?> type = parseType(row.getString("type")); - if (order == ClusteringOrder.DESC) - type = ReversedType.getInstance(type); - - return new ColumnDefinition(keyspace, table, name, type, position, kind); - } - - /* - * Dropped column metadata serialization/deserialization. - */ - - private static void addDroppedColumnToSchemaMutation(CFMetaData table, CFMetaData.DroppedColumn column, long timestamp, Mutation mutation) - { - RowUpdateBuilder adder = new RowUpdateBuilder(DroppedColumns, timestamp, mutation).clustering(table.cfName, column.name); - - adder.add("dropped_time", new Date(TimeUnit.MICROSECONDS.toMillis(column.droppedTime))) - .add("type", column.type.toString()) - .build(); - } - - private static Map<ByteBuffer, CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnsPartition(RowIterator serializedColumns) - { - String query = String.format("SELECT * FROM %s.%s", NAME, DROPPED_COLUMNS); - Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>(); - for (CFMetaData.DroppedColumn column : createDroppedColumnsFromDroppedColumnRows(QueryProcessor.resultify(query, serializedColumns))) - columns.put(UTF8Type.instance.decompose(column.name), column); - return columns; - } - - private static List<CFMetaData.DroppedColumn> createDroppedColumnsFromDroppedColumnRows(UntypedResultSet rows) - { - List<CFMetaData.DroppedColumn> columns = new ArrayList<>(rows.size()); - rows.forEach(row -> columns.add(createDroppedColumnFromDroppedColumnRow(row))); - return columns; - } - - private static CFMetaData.DroppedColumn createDroppedColumnFromDroppedColumnRow(UntypedResultSet.Row row) - { - String name = row.getString("column_name"); - AbstractType<?> type = TypeParser.parse(row.getString("type")); - long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time")); - - return new CFMetaData.DroppedColumn(name, type, droppedTime); - } - - /* - * Trigger metadata serialization/deserialization. - */ - - private static void addTriggerToSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) - { - new RowUpdateBuilder(Triggers, timestamp, mutation) - .clustering(table.cfName, trigger.name) - .frozenMap("options", Collections.singletonMap("class", trigger.classOption)) - .build(); - } - - private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerMetadata trigger, long timestamp, Mutation mutation) - { - RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name); - } - - /** - * Deserialize triggers from storage-level representation. - * - * @param partition storage-level partition containing the trigger definitions - * @return the list of processed TriggerDefinitions - */ - private static Triggers createTriggersFromTriggersPartition(RowIterator partition) - { - Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); - String query = String.format("SELECT * FROM %s.%s", NAME, TRIGGERS); - QueryProcessor.resultify(query, partition).forEach(row -> triggers.add(createTriggerFromTriggerRow(row))); - return triggers.build(); - } - - private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row) - { - String name = row.getString("trigger_name"); - String classOption = row.getFrozenTextMap("options").get("class"); - return new TriggerMetadata(name, classOption); - } - - /* - * View metadata serialization/deserialization. - */ - - public static Mutation makeCreateViewMutation(KeyspaceMetadata keyspace, ViewDefinition view, long timestamp) - { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - addViewToSchemaMutation(view, timestamp, true, mutation); - return mutation; - } - - private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation) + private static void addViewToSchemaMutation(ViewDefinition view, long timestamp, boolean includeColumns, Mutation mutation) { RowUpdateBuilder builder = new RowUpdateBuilder(Views, timestamp, mutation) .clustering(view.viewName); @@ -1363,9 +729,7 @@ public final class SchemaKeyspace // columns that are no longer needed for (ColumnDefinition column : columnDiff.entriesOnlyOnLeft().values()) - { dropColumnFromSchemaMutation(oldView.metadata, column, timestamp, mutation); - } // newly added columns for (ColumnDefinition column : columnDiff.entriesOnlyOnRight().values()) @@ -1390,122 +754,303 @@ public final class SchemaKeyspace return mutation; } - public static ViewDefinition createViewFromName(String keyspace, String view) + private static void addIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) { - return readSchemaPartitionForTableAndApply(VIEWS, keyspace, view, partition -> - { - if (partition.isEmpty()) - throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, view)); + RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name); - return createViewFromViewPartition(partition); - }); + builder.add("kind", index.kind.toString()); + builder.frozenMap("options", index.options); + builder.build(); + } + + private static void dropIndexFromSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) + { + RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name); } - private static ViewDefinition createViewFromViewPartition(RowIterator partition) + private static void addUpdatedIndexToSchemaMutation(CFMetaData table, + IndexMetadata index, + long timestamp, + Mutation mutation) { - String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS); - return createViewFromViewRow(QueryProcessor.resultify(query, partition).one()); + addIndexToSchemaMutation(table, index, timestamp, mutation); } - /** - * Deserialize views from storage-level representation. - * - * @param partition storage-level partition containing the view definitions - * @return the list of processed ViewDefinitions + public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addFunctionToSchemaMutation(function, timestamp, mutation); + return mutation; + } + + static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = + new RowUpdateBuilder(Functions, timestamp, mutation).clustering(function.name().name, functionArgumentsList(function)); + + adder.add("body", function.body()) + .add("language", function.language()) + .add("return_type", function.returnType().asCQL3Type().toString()) + .add("called_on_null_input", function.isCalledOnNullInput()) + .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(toList())); + + adder.build(); + } + + private static List<String> functionArgumentsList(AbstractFunction fun) + { + return fun.argTypes() + .stream() + .map(AbstractType::asCQL3Type) + .map(CQL3Type::toString) + .collect(toList()); + } + + public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionArgumentsList(function)); + } + + public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + addAggregateToSchemaMutation(aggregate, timestamp, mutation); + return mutation; + } + + static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) + { + RowUpdateBuilder adder = + new RowUpdateBuilder(Aggregates, timestamp, mutation) .clustering(aggregate.name().name, functionArgumentsList(aggregate)); + + adder.add("return_type", aggregate.returnType().asCQL3Type().toString()) + .add("state_func", aggregate.stateFunction().name().name) + .add("state_type", aggregate.stateType() != null ? aggregate.stateType().asCQL3Type().toString() : null) + .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null) + .add("initcond", aggregate.initialCondition()) + .build(); + } + + public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + { + // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). + Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); + return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionArgumentsList(aggregate)); + } + + /* + * Fetching schema */ - private static Views createViewsFromViewsPartition(RowIterator partition) + + public static Keyspaces fetchNonSystemKeyspaces() { - Views.Builder views = org.apache.cassandra.schema.Views.builder(); - String query = String.format("SELECT * FROM %s.%s", NAME, VIEWS); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + return fetchKeyspacesWithout(Schema.SYSTEM_KEYSPACE_NAMES); + } + + private static Keyspaces fetchKeyspacesWithout(Set<String> excludedKeyspaceNames) + { + String query = format("SELECT keyspace_name FROM %s.%s", NAME, KEYSPACES); + + Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); + for (UntypedResultSet.Row row : query(query)) { - ViewDefinition view = createViewFromViewRow(row); - views.add(view); + String keyspaceName = row.getString("keyspace_name"); + if (!excludedKeyspaceNames.contains(keyspaceName)) + keyspaces.add(fetchKeyspace(keyspaceName)); } - return views.build(); + return keyspaces.build(); } - private static ViewDefinition createViewFromViewRow(UntypedResultSet.Row row) + private static Keyspaces fetchKeyspacesOnly(Set<String> includedKeyspaceNames) { - String keyspace = row.getString("keyspace_name"); - String view = row.getString("view_name"); - UUID id = row.getUUID("id"); - UUID baseTableId = row.getUUID("base_table_id"); - String baseTableName = row.getString("base_table_name"); - boolean includeAll = row.getBoolean("include_all_columns"); - String whereClause = row.getString("where_clause"); + /* + * We know the keyspace names we are going to query, but we still want to run the SELECT IN + * query, to filter out the keyspaces that had been dropped by the applied mutation set. + */ + String query = format("SELECT keyspace_name FROM %s.%s WHERE keyspace_name IN ?", NAME, KEYSPACES); - List<ColumnDefinition> columns = - readSchemaPartitionForTableAndApply(COLUMNS, keyspace, view, SchemaKeyspace::createColumnsFromColumnsPartition); + Keyspaces.Builder keyspaces = org.apache.cassandra.schema.Keyspaces.builder(); + for (UntypedResultSet.Row row : query(query, new ArrayList<>(includedKeyspaceNames))) + keyspaces.add(fetchKeyspace(row.getString("keyspace_name"))); + return keyspaces.build(); + } - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = - readSchemaPartitionForTableAndApply(DROPPED_COLUMNS, keyspace, view, SchemaKeyspace::createDroppedColumnsFromDroppedColumnsPartition); + private static KeyspaceMetadata fetchKeyspace(String keyspaceName) + { + KeyspaceParams params = fetchKeyspaceParams(keyspaceName); + Types types = fetchTypes(keyspaceName); + Tables tables = fetchTables(keyspaceName, types); + Views views = fetchViews(keyspaceName, types); + Functions functions = fetchFunctions(keyspaceName, types); + return KeyspaceMetadata.create(keyspaceName, params, tables, views, types, functions); + } - CFMetaData cfm = CFMetaData.create(keyspace, - view, - id, - false, - true, - false, - false, - true, - columns, - DatabaseDescriptor.getPartitioner()) - .params(createTableParamsFromRow(row)) - .droppedColumns(droppedColumns); + private static KeyspaceParams fetchKeyspaceParams(String keyspaceName) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, KEYSPACES); + + UntypedResultSet.Row row = query(query, keyspaceName).one(); + boolean durableWrites = row.getBoolean(KeyspaceParams.Option.DURABLE_WRITES.toString()); + Map<String, String> replication = row.getFrozenTextMap(KeyspaceParams.Option.REPLICATION.toString()); + return KeyspaceParams.create(durableWrites, replication); + } - String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); - SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + private static Types fetchTypes(String keyspaceName) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, TYPES); - return new ViewDefinition(keyspace, view, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); + Types.RawBuilder types = org.apache.cassandra.schema.Types.rawBuilder(keyspaceName); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + { + String name = row.getString("type_name"); + List<String> fieldNames = row.getFrozenList("field_names", UTF8Type.instance); + List<String> fieldTypes = row.getFrozenList("field_types", UTF8Type.instance); + types.add(name, fieldNames, fieldTypes); + } + return types.build(); } - /* - * Secondary Index metadata serialization/deserialization. - */ + private static Tables fetchTables(String keyspaceName, Types types) + { + String query = format("SELECT table_name FROM %s.%s WHERE keyspace_name = ?", NAME, TABLES); - private static void addIndexToSchemaMutation(CFMetaData table, - IndexMetadata index, - long timestamp, - Mutation mutation) + Tables.Builder tables = org.apache.cassandra.schema.Tables.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + tables.add(fetchTable(keyspaceName, row.getString("table_name"), types)); + return tables.build(); + } + + private static CFMetaData fetchTable(String keyspaceName, String tableName, Types types) { - RowUpdateBuilder builder = new RowUpdateBuilder(Indexes, timestamp, mutation).clustering(table.cfName, index.name); + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TABLES); + UntypedResultSet rows = query(query, keyspaceName, tableName); + if (rows.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, tableName)); + UntypedResultSet.Row row = rows.one(); - builder.add("kind", index.kind.toString()); - builder.frozenMap("options", index.options); - builder.build(); + UUID id = row.getUUID("id"); + + Set<CFMetaData.Flag> flags = CFMetaData.flagsFromStrings(row.getFrozenSet("flags", UTF8Type.instance)); + + boolean isSuper = flags.contains(CFMetaData.Flag.SUPER); + boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); + boolean isDense = flags.contains(CFMetaData.Flag.DENSE); + boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); + + List<ColumnDefinition> columns = fetchColumns(keyspaceName, tableName, types); + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, tableName); + Indexes indexes = fetchIndexes(keyspaceName, tableName); + Triggers triggers = fetchTriggers(keyspaceName, tableName); + + return CFMetaData.create(keyspaceName, + tableName, + id, + isDense, + isCompound, + isSuper, + isCounter, + false, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)) + .droppedColumns(droppedColumns) + .indexes(indexes) + .triggers(triggers); + } + + public static TableParams createTableParamsFromRow(UntypedResultSet.Row row) + { + return TableParams.builder() + .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) + .caching(CachingParams.fromMap(row.getFrozenTextMap("caching"))) + .comment(row.getString("comment")) + .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction"))) + .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression"))) + .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) + .defaultTimeToLive(row.getInt("default_time_to_live")) + .extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance)) + .gcGraceSeconds(row.getInt("gc_grace_seconds")) + .maxIndexInterval(row.getInt("max_index_interval")) + .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) + .minIndexInterval(row.getInt("min_index_interval")) + .readRepairChance(row.getDouble("read_repair_chance")) + .crcCheckChance(row.getDouble("crc_check_chance")) + .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) + .build(); + } + + private static List<ColumnDefinition> fetchColumns(String keyspace, String table, Types types) + { + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, COLUMNS); + List<ColumnDefinition> columns = new ArrayList<>(); + query(query, keyspace, table).forEach(row -> columns.add(createColumnFromRow(row, types))); + return columns; } - private static void dropIndexFromSchemaMutation(CFMetaData table, - IndexMetadata index, - long timestamp, - Mutation mutation) + public static ColumnDefinition createColumnFromRow(UntypedResultSet.Row row, Types types) { - RowUpdateBuilder.deleteRow(Indexes, timestamp, mutation, table.cfName, index.name); + String keyspace = row.getString("keyspace_name"); + String table = row.getString("table_name"); + + ColumnIdentifier name = ColumnIdentifier.getInterned(row.getBytes("column_name_bytes"), row.getString("column_name")); + + ColumnDefinition.Kind kind = ColumnDefinition.Kind.valueOf(row.getString("kind").toUpperCase()); + + int position = row.getInt("position"); + ClusteringOrder order = ClusteringOrder.valueOf(row.getString("clustering_order").toUpperCase()); + + AbstractType<?> type = parse(keyspace, row.getString("type"), types); + if (order == ClusteringOrder.DESC) + type = ReversedType.getInstance(type); + + return new ColumnDefinition(keyspace, table, name, type, position, kind); } - private static void addUpdatedIndexToSchemaMutation(CFMetaData table, - IndexMetadata index, - long timestamp, - Mutation mutation) + private static Map<ByteBuffer, CFMetaData.DroppedColumn> fetchDroppedColumns(String keyspace, String table) { - addIndexToSchemaMutation(table, index, timestamp, mutation); + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, DROPPED_COLUMNS); + Map<ByteBuffer, CFMetaData.DroppedColumn> columns = new HashMap<>(); + for (UntypedResultSet.Row row : query(query, keyspace, table)) + { + CFMetaData.DroppedColumn column = createDroppedColumnFromRow(row); + columns.put(UTF8Type.instance.decompose(column.name), column); + } + return columns; } - /** - * Deserialize secondary indexes from storage-level representation. - * - * @param partition storage-level partition containing the index definitions - * @return the list of processed IndexMetadata - */ - private static Indexes createIndexesFromIndexesPartition(CFMetaData cfm, RowIterator partition) + + private static CFMetaData.DroppedColumn createDroppedColumnFromRow(UntypedResultSet.Row row) + { + String keyspace = row.getString("keyspace_name"); + String name = row.getString("column_name"); + /* + * we never store actual UDT names in dropped column types (so that we can safely drop types if nothing refers to + * them anymore), so before storing dropped columns in schema we expand UDTs to tuples. See expandUserTypes method. + * Because of that, we can safely pass Types.none() to parse() + */ + AbstractType<?> type = parse(keyspace, row.getString("type"), org.apache.cassandra.schema.Types.none()); + long droppedTime = TimeUnit.MILLISECONDS.toMicros(row.getLong("dropped_time")); + return new CFMetaData.DroppedColumn(name, type, droppedTime); + } + + private static Indexes fetchIndexes(String keyspace, String table) { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, INDEXES); Indexes.Builder indexes = org.apache.cassandra.schema.Indexes.builder(); - String query = String.format("SELECT * FROM %s.%s", NAME, INDEXES); - QueryProcessor.resultify(query, partition).forEach(row -> indexes.add(createIndexMetadataFromIndexesRow(row))); + query(query, keyspace, table).forEach(row -> indexes.add(createIndexMetadataFromRow(row))); return indexes.build(); } - private static IndexMetadata createIndexMetadataFromIndexesRow(UntypedResultSet.Row row) + private static IndexMetadata createIndexMetadataFromRow(UntypedResultSet.Row row) { String name = row.getString("index_name"); IndexMetadata.Kind type = IndexMetadata.Kind.valueOf(row.getString("kind")); @@ -1513,66 +1058,104 @@ public final class SchemaKeyspace return IndexMetadata.fromSchemaMetadata(name, type, options); } - /* - * UDF metadata serialization/deserialization. - */ + private static Triggers fetchTriggers(String keyspace, String table) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, TRIGGERS); + Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder(); + query(query, keyspace, table).forEach(row -> triggers.add(createTriggerFromRow(row))); + return triggers.build(); + } - public static Mutation makeCreateFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + private static TriggerMetadata createTriggerFromRow(UntypedResultSet.Row row) { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - addFunctionToSchemaMutation(function, timestamp, mutation); - return mutation; + String name = row.getString("trigger_name"); + String classOption = row.getFrozenTextMap("options").get("class"); + return new TriggerMetadata(name, classOption); } - static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) + private static Views fetchViews(String keyspaceName, Types types) { - RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation) - .clustering(function.name().name, functionSignatureWithTypes(function)); + String query = format("SELECT view_name FROM %s.%s WHERE keyspace_name = ?", NAME, VIEWS); - adder.add("body", function.body()) - .add("language", function.language()) - .add("return_type", function.returnType().toString()) - .add("called_on_null_input", function.isCalledOnNullInput()) - .frozenList("argument_names", function.argNames().stream().map((c) -> bbToString(c.bytes)).collect(Collectors.toList())) - .frozenList("argument_types", function.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList())); + Views.Builder views = org.apache.cassandra.schema.Views.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + views.add(fetchView(keyspaceName, row.getString("view_name"), types)); + return views.build(); + } - adder.build(); + private static ViewDefinition fetchView(String keyspaceName, String viewName, Types types) + { + String query = String.format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND view_name = ?", NAME, VIEWS); + UntypedResultSet rows = query(query, keyspaceName, viewName); + if (rows.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspaceName, viewName)); + UntypedResultSet.Row row = rows.one(); + + UUID id = row.getUUID("id"); + UUID baseTableId = row.getUUID("base_table_id"); + String baseTableName = row.getString("base_table_name"); + boolean includeAll = row.getBoolean("include_all_columns"); + String whereClause = row.getString("where_clause"); + + List<ColumnDefinition> columns = fetchColumns(keyspaceName, viewName, types); + + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = fetchDroppedColumns(keyspaceName, viewName); + + CFMetaData cfm = CFMetaData.create(keyspaceName, + viewName, + id, + false, + true, + false, + false, + true, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)) + .droppedColumns(droppedColumns); + + String rawSelect = View.buildSelectStatement(baseTableName, columns, whereClause); + SelectStatement.RawStatement rawStatement = (SelectStatement.RawStatement) QueryProcessor.parseStatement(rawSelect); + + return new ViewDefinition(keyspaceName, viewName, baseTableId, baseTableName, includeAll, rawStatement, whereClause, cfm); } - public static Mutation makeDropFunctionMutation(KeyspaceMetadata keyspace, UDFunction function, long timestamp) + private static Functions fetchFunctions(String keyspaceName, Types types) { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function)); + Functions udfs = fetchUDFs(keyspaceName, types); + Functions udas = fetchUDAs(keyspaceName, udfs, types); + + return org.apache.cassandra.schema.Functions.builder() + .add(udfs) + .add(udas) + .build(); } - private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition) + private static Functions fetchUDFs(String keyspaceName, Types types) { - List<UDFunction> functions = new ArrayList<>(); - String query = String.format("SELECT * FROM %s.%s", NAME, FUNCTIONS); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - functions.add(createFunctionFromFunctionRow(row)); - return functions; + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, FUNCTIONS); + + Functions.Builder functions = org.apache.cassandra.schema.Functions.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + functions.add(createUDFFromRow(row, types)); + return functions.build(); } - private static UDFunction createFunctionFromFunctionRow(UntypedResultSet.Row row) + private static UDFunction createUDFFromRow(UntypedResultSet.Row row, Types types) { String ksName = row.getString("keyspace_name"); String functionName = row.getString("function_name"); FunctionName name = new FunctionName(ksName, functionName); List<ColumnIdentifier> argNames = new ArrayList<>(); - if (row.has("argument_names")) - for (String arg : row.getFrozenList("argument_names", UTF8Type.instance)) - argNames.add(new ColumnIdentifier(arg, true)); + for (String arg : row.getFrozenList("argument_names", UTF8Type.instance)) + argNames.add(new ColumnIdentifier(arg, true)); List<AbstractType<?>> argTypes = new ArrayList<>(); - if (row.has("argument_types")) - for (String type : row.getFrozenList("argument_types", UTF8Type.instance)) - argTypes.add(parseType(type)); + for (String type : row.getFrozenList("argument_types", UTF8Type.instance)) + argTypes.add(parse(ksName, type, types)); - AbstractType<?> returnType = parseType(row.getString("return_type")); + AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); String language = row.getString("language"); String body = row.getString("body"); @@ -1609,70 +1192,33 @@ public final class SchemaKeyspace } } - /* - * Aggregate UDF metadata serialization/deserialization. - */ - - public static Mutation makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) - { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - addAggregateToSchemaMutation(aggregate, timestamp, mutation); - return mutation; - } - - static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) - { - RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation) - .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate)); - - adder.add("return_type", aggregate.returnType().toString()) - .add("state_func", aggregate.stateFunction().name().name) - .add("state_type", aggregate.stateType() != null ? aggregate.stateType().toString() : null) - .add("final_func", aggregate.finalFunction() != null ? aggregate.finalFunction().name().name : null) - .add("initcond", aggregate.initialCondition()) - .frozenList("argument_types", aggregate.argTypes().stream().map(AbstractType::toString).collect(Collectors.toList())) - .build(); - } - - private static Functions createAggregatesFromAggregatesPartition(Functions functions, RowIterator partition) + private static Functions fetchUDAs(String keyspaceName, Functions udfs, Types types) { - String query = String.format("SELECT * FROM %s.%s", NAME, AGGREGATES); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - functions = functions.with(createAggregateFromAggregateRow(functions, row)); - return functions; - } + String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?", NAME, AGGREGATES); - private static UDAggregate createAggregateFromAggregateRow(UntypedResultSet.Row row) - { - return createAggregateFromAggregateRow(Schema.instance.getKSMetaData(row.getString("keyspace_name")).functions, row); + Functions.Builder aggregates = org.apache.cassandra.schema.Functions.builder(); + for (UntypedResultSet.Row row : query(query, keyspaceName)) + aggregates.add(createUDAFromRow(row, udfs, types)); + return aggregates.build(); } - private static UDAggregate createAggregateFromAggregateRow(Functions functions, UntypedResultSet.Row row) + private static UDAggregate createUDAFromRow(UntypedResultSet.Row row, Functions functions, Types types) { String ksName = row.getString("keyspace_name"); String functionName = row.getString("aggregate_name"); FunctionName name = new FunctionName(ksName, functionName); - List<String> types = row.getFrozenList("argument_types", UTF8Type.instance); + List<AbstractType<?>> argTypes = + row.getFrozenList("argument_types", UTF8Type.instance) + .stream() + .map(t -> parse(ksName, t, types)) + .collect(toList()); - List<AbstractType<?>> argTypes; - if (types == null) - { - argTypes = Collections.emptyList(); - } - else - { - argTypes = new ArrayList<>(types.size()); - for (String type : types) - argTypes.add(parseType(type)); - } - - AbstractType<?> returnType = parseType(row.getString("return_type")); + AbstractType<?> returnType = parse(ksName, row.getString("return_type"), types); FunctionName stateFunc = new FunctionName(ksName, (row.getString("state_func"))); FunctionName finalFunc = row.has("final_func") ? new FunctionName(ksName, row.getString("final_func")) : null; - AbstractType<?> stateType = row.has("state_type") ? parseType(row.getString("state_type")) : null; + AbstractType<?> stateType = row.has("state_type") ? parse(ksName, row.getString("state_type"), types) : null; ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null; try @@ -1685,30 +1231,171 @@ public final class SchemaKeyspace } } - public static Mutation makeDropAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) + private static UntypedResultSet query(String query, Object... variables) { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation mutation = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate)); + return executeInternal(query, variables); + } + + /* + * Merging schema + */ + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + */ + public static synchronized void mergeSchemaAndAnnounceVersion(Collection<Mutation> mutations) throws ConfigurationException + { + mergeSchema(mutations); + Schema.instance.updateVersionAndAnnounce(); + } + + public static synchronized void mergeSchema(Collection<Mutation> mutations) + { + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = + mutations.stream() + .map(m -> UTF8Type.instance.compose(m.key().getKey())) + .collect(Collectors.toSet()); + + // fetch the current state of schema for the affected keyspaces only + Keyspaces before = Schema.instance.getKeyspaces(affectedKeyspaces); + + // apply the schema mutations and flush + mutations.forEach(Mutation::apply); + if (FLUSH_SCHEMA_TABLES) + flush(); + + // fetch the new state of schema from schema tables (not applied to Schema.instance yet) + Keyspaces after = fetchKeyspacesOnly(affectedKeyspaces); + + // deal with the diff + MapDifference<String, KeyspaceMetadata> keyspacesDiff = before.diff(after); + + // dropped keyspaces + for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnLeft().values()) + { + keyspace.functions.udas().forEach(Schema.instance::dropAggregate); + keyspace.functions.udfs().forEach(Schema.instance::dropFunction); + keyspace.views.forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); + keyspace.tables.forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); + keyspace.types.forEach(Schema.instance::dropType); + Schema.instance.dropKeyspace(keyspace.name); + } + + // new keyspaces + for (KeyspaceMetadata keyspace : keyspacesDiff.entriesOnlyOnRight().values()) + { + Schema.instance.addKeyspace(KeyspaceMetadata.create(keyspace.name, keyspace.params)); + keyspace.types.forEach(Schema.instance::addType); + keyspace.tables.forEach(Schema.instance::addTable); + keyspace.views.forEach(Schema.instance::addView); + keyspace.functions.udfs().forEach(Schema.instance::addFunction); + keyspace.functions.udas().forEach(Schema.instance::addAggregate); + } + + // updated keyspaces + for (Map.Entry<String, MapDifference.ValueDifference<KeyspaceMetadata>> diff : keyspacesDiff.entriesDiffering().entrySet()) + updateKeyspace(diff.getKey(), diff.getValue().leftValue(), diff.getValue().rightValue()); + } + + private static void updateKeyspace(String keyspaceName, KeyspaceMetadata keyspaceBefore, KeyspaceMetadata keyspaceAfter) + { + // calculate the deltas + MapDifference<String, CFMetaData> tablesDiff = keyspaceBefore.tables.diff(keyspaceAfter.tables); + MapDifference<String, ViewDefinition> viewsDiff = keyspaceBefore.views.diff(keyspaceAfter.views); + MapDifference<ByteBuffer, UserType> typesDiff = keyspaceBefore.types.diff(keyspaceAfter.types); + + Map<Pair<FunctionName, List<String>>, UDFunction> udfsBefore = new HashMap<>(); + keyspaceBefore.functions.udfs().forEach(f -> udfsBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + Map<Pair<FunctionName, List<String>>, UDFunction> udfsAfter = new HashMap<>(); + keyspaceAfter.functions.udfs().forEach(f -> udfsAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff = Maps.difference(udfsBefore, udfsAfter); + + Map<Pair<FunctionName, List<String>>, UDAggregate> udasBefore = new HashMap<>(); + keyspaceBefore.functions.udas().forEach(f -> udasBefore.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + Map<Pair<FunctionName, List<String>>, UDAggregate> udasAfter = new HashMap<>(); + keyspaceAfter.functions.udas().forEach(f -> udasAfter.put(Pair.create(f.name(), functionArgumentsList(f)), f)); + MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff = Maps.difference(udasBefore, udasAfter); + + // update keyspace params, if changed + if (!keyspaceBefore.params.equals(keyspaceAfter.params)) + Schema.instance.updateKeyspace(keyspaceName, keyspaceAfter.params); + + // drop everything removed + udasDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropAggregate); + udfsDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropFunction); + viewsDiff.entriesOnlyOnLeft().values().forEach(v -> Schema.instance.dropView(v.ksName, v.viewName)); + tablesDiff.entriesOnlyOnLeft().values().forEach(t -> Schema.instance.dropTable(t.ksName, t.cfName)); + typesDiff.entriesOnlyOnLeft().values().forEach(Schema.instance::dropType); + + // add everything created + typesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addType); + tablesDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addTable); + viewsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addView); + udfsDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addFunction); + udasDiff.entriesOnlyOnRight().values().forEach(Schema.instance::addAggregate); + + // update everything altered + for (MapDifference.ValueDifference<UserType> diff : typesDiff.entriesDiffering().values()) + Schema.instance.updateType(diff.rightValue()); + for (MapDifference.ValueDifference<CFMetaData> diff : tablesDiff.entriesDiffering().values()) + Schema.instance.updateTable(diff.rightValue()); + for (MapDifference.ValueDifference<ViewDefinition> diff : viewsDiff.entriesDiffering().values()) + Schema.instance.updateView(diff.rightValue()); + for (MapDifference.ValueDifference<UDFunction> diff : udfsDiff.entriesDiffering().values()) + Schema.instance.updateFunction(diff.rightValue()); + for (MapDifference.ValueDifference<UDAggregate> diff : udasDiff.entriesDiffering().values()) + Schema.instance.updateAggregate(diff.rightValue()); } - private static AbstractType<?> parseType(String str) + /* + * Type parsing and transformation + */ + + /* + * Recursively replaces any instances of UserType with an equivalent TupleType. + * We do it for dropped_columns, to allow safely dropping unused user types without retaining any references + * in dropped_columns. + */ + private static AbstractType<?> expandUserTypes(AbstractType<?> original) { - return TypeParser.parse(str); + if (original instanceof UserType) + return new TupleType(expandUserTypes(((UserType) original).fieldTypes())); + + if (original instanceof TupleType) + return new TupleType(expandUserTypes(((TupleType) original).allTypes())); + + if (original instanceof ListType<?>) + return ListType.getInstance(expandUserTypes(((ListType<?>) original).getElementsType()), original.isMultiCell()); + + if (original instanceof MapType<?,?>) + { + MapType<?, ?> mt = (MapType<?, ?>) original; + return MapType.getInstance(expandUserTypes(mt.getKeysType()), expandUserTypes(mt.getValuesType()), mt.isMultiCell()); + } + + if (original instanceof SetType<?>) + return SetType.getInstance(expandUserTypes(((SetType<?>) original).getElementsType()), original.isMultiCell()); + + // this is very unlikely to ever happen, but it's better to be safe than sorry + if (original instanceof ReversedType<?>) + return ReversedType.getInstance(expandUserTypes(((ReversedType) original).baseType)); + + if (original instanceof CompositeType) + return CompositeType.getInstance(expandUserTypes(original.getComponents())); + + return original; } - // We allow method overloads, so a function is not uniquely identified by its name only, but - // also by its argument types. To distinguish overloads of given function name in the schema - // we use a "signature" which is just a list of it's CQL argument types (we could replace that by - // using a "signature" UDT that would be comprised of the function name and argument types, - // which we could then use as clustering column. But as we haven't yet used UDT in system tables, - // We'll leave that decision to #6717). - public static ByteBuffer functionSignatureWithTypes(AbstractFunction fun) + private static List<AbstractType<?>> expandUserTypes(List<AbstractType<?>> types) { - ListType<String> list = ListType.getInstance(UTF8Type.instance, false); - List<String> strList = new ArrayList<>(fun.argTypes().size()); - for (AbstractType<?> argType : fun.argTypes()) - strList.add(argType.asCQL3Type().toString()); - return list.decompose(strList); + return types.stream() + .map(SchemaKeyspace::expandUserTypes) + .collect(toList()); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/340df43f/src/java/org/apache/cassandra/schema/Tables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Tables.java b/src/java/org/apache/cassandra/schema/Tables.java index 151697d..4f728d4 100644 --- a/src/java/org/apache/cassandra/schema/Tables.java +++ b/src/java/org/apache/cassandra/schema/Tables.java @@ -23,6 +23,8 @@ import java.util.Optional; import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; import org.apache.cassandra.config.CFMetaData; @@ -115,6 +117,11 @@ public final class Tables implements Iterable<CFMetaData> return builder().add(filter(this, t -> t != table)).build(); } + MapDifference<String, CFMetaData> diff(Tables other) + { + return Maps.difference(tables, other.tables); + } + @Override public boolean equals(Object o) {