http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java new file mode 100644 index 0000000..13e173f --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.*; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.Functions.FunctionsDiff; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static java.util.stream.Collectors.toList; + +public final class CreateFunctionStatement extends AlterSchemaStatement +{ + private final String functionName; + private final List<ColumnIdentifier> argumentNames; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawReturnType; + private final boolean calledOnNullInput; + private final String language; + private final String body; + private final boolean orReplace; + private final boolean ifNotExists; + + public CreateFunctionStatement(String keyspaceName, + String functionName, + List<ColumnIdentifier> argumentNames, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawReturnType, + boolean calledOnNullInput, + String language, + String body, + boolean orReplace, + boolean ifNotExists) + { + super(keyspaceName); + this.functionName = functionName; + this.argumentNames = argumentNames; + this.rawArgumentTypes = rawArgumentTypes; + this.rawReturnType = rawReturnType; + this.calledOnNullInput = calledOnNullInput; + this.language = language; + this.body = body; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + // TODO: replace affected aggregates !! + public Keyspaces apply(Keyspaces schema) + { + if (ifNotExists && orReplace) + throw ire("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); + + UDFunction.assertUdfsEnabled(language); + + if (new HashSet<>(argumentNames).size() != argumentNames.size()) + throw ire("Duplicate argument names for given function %s with argument names %s", functionName, argumentNames); + + rawArgumentTypes.stream() + .filter(CQL3Type.Raw::isFrozen) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + + if (rawReturnType.isFrozen()) + throw ire("Return type '%s' cannot be frozen; remove frozen<> modifier from '%s'", rawReturnType, rawReturnType); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + List<AbstractType<?>> argumentTypes = + rawArgumentTypes.stream() + .map(t -> t.prepare(keyspaceName, keyspace.types).getType()) + .collect(toList()); + AbstractType<?> returnType = rawReturnType.prepare(keyspaceName, keyspace.types).getType(); + + UDFunction function = + UDFunction.create(new FunctionName(keyspaceName, functionName), + argumentNames, + argumentTypes, + returnType, + calledOnNullInput, + language, + body); + + Function existingFunction = keyspace.functions.find(function.name(), argumentTypes).orElse(null); + if (null != existingFunction) + { + if (existingFunction.isAggregate()) + throw ire("Function '%s' cannot replace an aggregate", functionName); + + if (ifNotExists) + return schema; + + if (!orReplace) + throw ire("Function '%s' already exists", functionName); + + if (calledOnNullInput != ((UDFunction) existingFunction).isCalledOnNullInput()) + { + throw ire("Function '%s' must have %s directive", + functionName, + calledOnNullInput ? "CALLED ON NULL INPUT" : "RETURNS NULL ON NULL INPUT"); + } + + if (!returnType.isCompatibleWith(existingFunction.returnType())) + { + throw ire("Cannot replace function '%s', the new return type %s is not compatible with the return type %s of existing function", + functionName, + returnType.asCQL3Type(), + existingFunction.returnType().asCQL3Type()); + } + + // TODO: update dependent aggregates + } + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(function))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs; + + assert udfsDiff.created.size() + udfsDiff.altered.size() == 1; + boolean created = !udfsDiff.created.isEmpty(); + + return new SchemaChange(created ? Change.CREATED : Change.UPDATED, + Target.FUNCTION, + keyspaceName, + functionName, + rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList())); + } + + public void authorize(ClientState client) + { + FunctionName name = new FunctionName(keyspaceName, functionName); + + if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace) + client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes)); + else + client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName)); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDFunction> udfsDiff = diff.altered.get(0).udfs; + + assert udfsDiff.created.size() + udfsDiff.altered.size() == 1; + + return udfsDiff.created.isEmpty() + ? ImmutableSet.of() + : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes)); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_FUNCTION, keyspaceName, functionName); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<ColumnIdentifier> argumentNames; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawReturnType; + private final boolean calledOnNullInput; + private final String language; + private final String body; + private final boolean orReplace; + private final boolean ifNotExists; + + public Raw(FunctionName name, + List<ColumnIdentifier> argumentNames, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawReturnType, + boolean calledOnNullInput, + String language, + String body, + boolean orReplace, + boolean ifNotExists) + { + this.name = name; + this.argumentNames = argumentNames; + this.rawArgumentTypes = rawArgumentTypes; + this.rawReturnType = rawReturnType; + this.calledOnNullInput = calledOnNullInput; + this.language = language; + this.body = body; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public CreateFunctionStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + + return new CreateFunctionStatement(keyspaceName, + name.name, + argumentNames, + rawArgumentTypes, + rawReturnType, + calledOnNullInput, + language, + body, + orReplace, + ifNotExists); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java new file mode 100644 index 0000000..0065a4c --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.*; + +import com.google.common.collect.Lists; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QualifiedName; +import org.apache.cassandra.cql3.statements.schema.IndexTarget.Type; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Iterables.tryFind; + +public final class CreateIndexStatement extends AlterSchemaStatement +{ + private final String indexName; + private final String tableName; + private final List<IndexTarget.Raw> rawIndexTargets; + private final IndexAttributes attrs; + private final boolean ifNotExists; + + public CreateIndexStatement(String keyspaceName, + String tableName, + String indexName, + List<IndexTarget.Raw> rawIndexTargets, + IndexAttributes attrs, + boolean ifNotExists) + { + super(keyspaceName); + this.tableName = tableName; + this.indexName = indexName; + this.rawIndexTargets = rawIndexTargets; + this.attrs = attrs; + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + attrs.validate(); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + TableMetadata table = keyspace.getTableOrViewNullable(tableName); + if (null == table) + throw ire("Table '%s' doesn't exist", tableName); + + if (null != indexName && keyspace.hasIndex(indexName)) + { + if (ifNotExists) + return schema; + + throw ire("Index '%s' already exists", indexName); + } + + if (table.isCounter()) + throw ire("Secondary indexes on counter tables aren't supported"); + + if (table.isView()) + throw ire("Secondary indexes on materialized views aren't supported"); + + List<IndexTarget> indexTargets = Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table))); + + if (indexTargets.isEmpty() && !attrs.isCustom) + throw ire("Only CUSTOM indexes can be created without specifying a target column"); + + if (indexTargets.size() > 1) + { + if (!attrs.isCustom) + throw ire("Only CUSTOM indexes support multiple columns"); + + Set<ColumnIdentifier> columns = new HashSet<>(); + for (IndexTarget target : indexTargets) + if (!columns.add(target.column)) + throw ire("Duplicate column '%s' in index target list", target.column); + } + + indexTargets.forEach(t -> validateIndexTarget(table, t)); + + String name = null == indexName ? generateIndexName(keyspace, indexTargets) : indexName; + + IndexMetadata.Kind kind = attrs.isCustom ? IndexMetadata.Kind.CUSTOM : IndexMetadata.Kind.COMPOSITES; + + Map<String, String> options = attrs.isCustom ? attrs.getOptions() : Collections.emptyMap(); + + IndexMetadata index = IndexMetadata.fromIndexTargets(indexTargets, name, kind, options); + + // check to disallow creation of an index which duplicates an existing one in all but name + IndexMetadata equalIndex = tryFind(table.indexes, i -> i.equalsWithoutName(index)).orNull(); + if (null != equalIndex) + { + if (ifNotExists) + return schema; + + throw ire("Index %s is a duplicate of existing index %s", index.name, equalIndex.name); + } + + TableMetadata newTable = table.withSwapped(table.indexes.with(index)); + newTable.validate(); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable))); + } + + private void validateIndexTarget(TableMetadata table, IndexTarget target) + { + ColumnMetadata column = table.getColumn(target.column); + + if (null == column) + throw ire("Column '%s' doesn't exist", target.column); + + if (column.type.referencesDuration()) + { + if (column.type.isCollection()) + throw ire("Secondary indexes are not supported on collections containing durations"); + + if (column.type.isTuple()) + throw ire("Secondary indexes are not supported on tuples containing durations"); + + if (column.type.isUDT()) + throw ire("Secondary indexes are not supported on UDTs containing durations"); + + throw ire("Secondary indexes are not supported on duration columns"); + } + + if (column.isPartitionKey() && table.partitionKeyColumns().size() == 1) + throw ire("Cannot create secondary index on the only partition key column %s", column); + + if (column.type.isFrozenCollection() && target.type != Type.FULL) + throw ire("Cannot create %s() index on frozen column %s. Frozen collections only support full() indexes", target.type, column); + + if (!column.type.isFrozenCollection() && target.type == Type.FULL) + throw ire("full() indexes can only be created on frozen collections"); + + if (!column.type.isCollection() && target.type != Type.SIMPLE) + throw ire("Cannot create %s() index on %s. Non-collection columns only support simple indexes", target.type, column); + + if (!(column.type instanceof MapType && column.type.isMultiCell()) && (target.type == Type.KEYS || target.type == Type.KEYS_AND_VALUES)) + throw ire("Cannot create index on %s of column %s with non-map type", target.type, column); + + if (column.type.isUDT() && column.type.isMultiCell()) + throw ire("Cannot create index on non-frozen UDT column %s", column); + } + + private String generateIndexName(KeyspaceMetadata keyspace, List<IndexTarget> targets) + { + String baseName = targets.size() == 1 + ? IndexMetadata.generateDefaultIndexName(tableName, targets.get(0).column) + : IndexMetadata.generateDefaultIndexName(tableName); + return keyspace.findAvailableIndexName(baseName); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, tableName); + } + + public void authorize(ClientState client) + { + client.ensureTablePermission(keyspaceName, tableName, Permission.ALTER); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_INDEX, keyspaceName, indexName); + } + + public static final class Raw extends CQLStatement.Raw + { + private final QualifiedName tableName; + private final QualifiedName indexName; + private final List<IndexTarget.Raw> rawIndexTargets; + private final IndexAttributes attrs; + private final boolean ifNotExists; + + public Raw(QualifiedName tableName, + QualifiedName indexName, + List<IndexTarget.Raw> rawIndexTargets, + IndexAttributes attrs, + boolean ifNotExists) + { + this.tableName = tableName; + this.indexName = indexName; + this.rawIndexTargets = rawIndexTargets; + this.attrs = attrs; + this.ifNotExists = ifNotExists; + } + + public CreateIndexStatement prepare(ClientState state) + { + String keyspaceName = tableName.hasKeyspace() + ? tableName.getKeyspace() + : indexName.hasKeyspace() ? indexName.getKeyspace() : state.getKeyspace(); + + if (tableName.hasKeyspace() && !keyspaceName.equals(tableName.getKeyspace())) + throw ire("Keyspace name '%s' doesn't match table name '%s'", keyspaceName, tableName); + + if (indexName.hasKeyspace() && !keyspaceName.equals(indexName.getKeyspace())) + throw ire("Keyspace name '%s' doesn't match index name '%s'", keyspaceName, tableName); + + return new CreateIndexStatement(keyspaceName, tableName.getName(), indexName.getName(), rawIndexTargets, attrs, ifNotExists); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java new file mode 100644 index 0000000..ecd19ed --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.*; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams.Option; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; + +public final class CreateKeyspaceStatement extends AlterSchemaStatement +{ + private final KeyspaceAttributes attrs; + private final boolean ifNotExists; + + public CreateKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs, boolean ifNotExists) + { + super(keyspaceName); + this.attrs = attrs; + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + attrs.validate(); + + if (!attrs.hasOption(Option.REPLICATION)) + throw ire("Missing mandatory option '%s'", Option.REPLICATION); + + if (schema.containsKeyspace(keyspaceName)) + { + if (ifNotExists) + return schema; + + throw new AlreadyExistsException(keyspaceName); + } + + KeyspaceMetadata keyspace = KeyspaceMetadata.create(keyspaceName, attrs.asNewKeyspaceParams()); + + if (keyspace.params.replication.klass.equals(LocalStrategy.class)) + throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use."); + + keyspace.params.validate(keyspaceName); + + return schema.withAddedOrUpdated(keyspace); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.CREATED, keyspaceName); + } + + public void authorize(ClientState client) + { + client.ensureAllKeyspacesPermission(Permission.CREATE); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + return ImmutableSet.of(DataResource.keyspace(keyspaceName), FunctionResource.keyspace(keyspaceName)); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_KEYSPACE, keyspaceName); + } + + public static final class Raw extends CQLStatement.Raw + { + public final String keyspaceName; + private final KeyspaceAttributes attrs; + private final boolean ifNotExists; + + public Raw(String keyspaceName, KeyspaceAttributes attrs, boolean ifNotExists) + { + this.keyspaceName = keyspaceName; + this.attrs = attrs; + this.ifNotExists = ifNotExists; + } + + public CreateKeyspaceStatement prepare(ClientState state) + { + return new CreateKeyspaceStatement(keyspaceName, attrs, ifNotExists); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java new file mode 100644 index 0000000..ff26f0d --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.*; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.DataResource; +import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static java.util.Comparator.comparing; + +import static com.google.common.collect.Iterables.concat; + +public final class CreateTableStatement extends AlterSchemaStatement +{ + private final String tableName; + + private final Map<ColumnIdentifier, CQL3Type.Raw> rawColumns; + private final Set<ColumnIdentifier> staticColumns; + private final List<ColumnIdentifier> partitionKeyColumns; + private final List<ColumnIdentifier> clusteringColumns; + + private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder; + private final TableAttributes attrs; + + private final boolean ifNotExists; + + public CreateTableStatement(String keyspaceName, + String tableName, + + Map<ColumnIdentifier, CQL3Type.Raw> rawColumns, + Set<ColumnIdentifier> staticColumns, + List<ColumnIdentifier> partitionKeyColumns, + List<ColumnIdentifier> clusteringColumns, + + LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder, + TableAttributes attrs, + + boolean ifNotExists) + { + super(keyspaceName); + this.tableName = tableName; + + this.rawColumns = rawColumns; + this.staticColumns = staticColumns; + this.partitionKeyColumns = partitionKeyColumns; + this.clusteringColumns = clusteringColumns; + + this.clusteringOrder = clusteringOrder; + this.attrs = attrs; + + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + if (keyspace.hasTable(tableName)) + { + if (ifNotExists) + return schema; + + throw new AlreadyExistsException(keyspaceName, tableName); + } + + TableMetadata table = builder(keyspace.types).build(); + table.validate(); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.CREATED, Target.TABLE, keyspaceName, tableName); + } + + public void authorize(ClientState client) + { + client.ensureKeyspacePermission(keyspaceName, Permission.CREATE); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + return ImmutableSet.of(DataResource.table(keyspaceName, tableName)); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_TABLE, keyspaceName, tableName); + } + + public TableMetadata.Builder builder(Types types) + { + attrs.validate(); + TableParams params = attrs.asNewTableParams(); + + // use a TreeMap to preserve ordering across JDK versions (see CASSANDRA-9492) - important for stable unit tests + Map<ColumnIdentifier, CQL3Type> columns = new TreeMap<>(comparing(o -> o.bytes)); + rawColumns.forEach((column, type) -> columns.put(column, type.prepare(keyspaceName, types))); + + // check for nested non-frozen UDTs or collections in a non-frozen UDT + columns.forEach((column, type) -> + { + if (type.isUDT() && type.getType().isMultiCell()) + { + ((UserType) type.getType()).fieldTypes().forEach(field -> + { + if (field.isMultiCell()) + throw ire("Non-frozen UDTs with nested non-frozen collections are not supported"); + }); + } + }); + + /* + * Deal with PRIMARY KEY columns + */ + + HashSet<ColumnIdentifier> primaryKeyColumns = new HashSet<>(); + concat(partitionKeyColumns, clusteringColumns).forEach(column -> + { + CQL3Type type = columns.get(column); + if (null == type) + throw ire("Unknown column '%s' referenced in PRIMARY KEY for table '%s'", column, tableName); + + if (!primaryKeyColumns.add(column)) + throw ire("Duplicate column '%s' in PRIMARY KEY clause for table '%s'", column, tableName); + + if (type.getType().isMultiCell()) + { + if (type.isCollection()) + throw ire("Invalid non-frozen collection type %s for PRIMARY KEY column '%s'", type, column); + else + throw ire("Invalid non-frozen user-defined type %s for PRIMARY KEY column '%s'", type, column); + } + + if (type.getType().isCounter()) + throw ire("counter type is not supported for PRIMARY KEY column '%s'", column); + + if (type.getType().referencesDuration()) + throw ire("duration type is not supported for PRIMARY KEY column '%s'", column); + + if (staticColumns.contains(column)) + throw ire("Static column '%s' cannot be part of the PRIMARY KEY", column); + }); + + List<AbstractType<?>> partitionKeyTypes = new ArrayList<>(); + List<AbstractType<?>> clusteringTypes = new ArrayList<>(); + + partitionKeyColumns.forEach(column -> + { + CQL3Type type = columns.remove(column); + partitionKeyTypes.add(type.getType()); + }); + + clusteringColumns.forEach(column -> + { + CQL3Type type = columns.remove(column); + boolean reverse = !clusteringOrder.getOrDefault(column, true); + clusteringTypes.add(reverse ? ReversedType.getInstance(type.getType()) : type.getType()); + }); + + // If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK + // This wasn't previously enforced because of a bug in the implementation + if (!clusteringOrder.isEmpty() && !clusteringColumns.equals(new ArrayList<>(clusteringOrder.keySet()))) + throw ire("Clustering key columns must exactly match columns in CLUSTERING ORDER BY directive"); + + // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway + if (clusteringColumns.isEmpty() && !staticColumns.isEmpty()) + throw ire("Static columns are only useful (and thus allowed) if the table has at least one clustering column"); + + /* + * Counter table validation + */ + + boolean hasCounters = rawColumns.values().stream().anyMatch(CQL3Type.Raw::isCounter); + if (hasCounters) + { + // We've handled anything that is not a PRIMARY KEY so columns only contains NON-PK columns. So + // if it's a counter table, make sure we don't have non-counter types + if (columns.values().stream().anyMatch(t -> !t.getType().isCounter())) + throw ire("Cannot mix counter and non counter columns in the same table"); + + if (params.defaultTimeToLive > 0) + throw ire("Cannot set %s on a table with counters", TableParams.Option.DEFAULT_TIME_TO_LIVE); + } + + /* + * Create the builder + */ + + TableMetadata.Builder builder = TableMetadata.builder(keyspaceName, tableName); + + if (attrs.hasProperty(TableAttributes.ID)) + builder.id(attrs.getId()); + + builder.isCounter(hasCounters) + .params(params); + + for (int i = 0; i < partitionKeyColumns.size(); i++) + builder.addPartitionKeyColumn(partitionKeyColumns.get(i), partitionKeyTypes.get(i)); + + for (int i = 0; i < clusteringColumns.size(); i++) + builder.addClusteringColumn(clusteringColumns.get(i), clusteringTypes.get(i)); + + columns.forEach((column, type) -> + { + if (staticColumns.contains(column)) + builder.addStaticColumn(column, type.getType()); + else + builder.addRegularColumn(column, type.getType()); + }); + + return builder; + } + + public static TableMetadata.Builder parse(String cql, String keyspace) + { + return CQLFragmentParser.parseAny(CqlParser::createTableStatement, cql, "CREATE TABLE") + .keyspace(keyspace) + .prepare(null) // works around a messy ClientState/QueryProcessor class init deadlock + .builder(Types.none()); + } + + public final static class Raw extends CQLStatement.Raw + { + private final QualifiedName name; + private final boolean ifNotExists; + + private final Map<ColumnIdentifier, CQL3Type.Raw> rawColumns = new HashMap<>(); + private final Set<ColumnIdentifier> staticColumns = new HashSet<>(); + private final List<ColumnIdentifier> clusteringColumns = new ArrayList<>(); + + private List<ColumnIdentifier> partitionKeyColumns; + + private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder = new LinkedHashMap<>(); + public final TableAttributes attrs = new TableAttributes(); + + public Raw(QualifiedName name, boolean ifNotExists) + { + this.name = name; + this.ifNotExists = ifNotExists; + } + + public CreateTableStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : state.getKeyspace(); + + if (null == partitionKeyColumns) + throw ire("No PRIMARY KEY specifed for table '%s' (exactly one required)", name); + + return new CreateTableStatement(keyspaceName, + name.getName(), + + rawColumns, + staticColumns, + partitionKeyColumns, + clusteringColumns, + + clusteringOrder, + attrs, + + ifNotExists); + } + + public String keyspace() + { + return name.getKeyspace(); + } + + public Raw keyspace(String keyspace) + { + name.setKeyspace(keyspace, true); + return this; + } + + public String table() + { + return name.getName(); + } + + public void addColumn(ColumnIdentifier column, CQL3Type.Raw type, boolean isStatic) + { + if (null != rawColumns.put(column, type)) + throw ire("Duplicate column '%s' declaration for table '%s'", column, name); + + if (isStatic) + staticColumns.add(column); + } + + public void setPartitionKeyColumn(ColumnIdentifier column) + { + setPartitionKeyColumns(Collections.singletonList(column)); + } + + public void setPartitionKeyColumns(List<ColumnIdentifier> columns) + { + if (null != partitionKeyColumns) + throw ire("Multiple PRIMARY KEY specified for table '%s' (exactly one required)", name); + + partitionKeyColumns = columns; + } + + public void markClusteringColumn(ColumnIdentifier column) + { + clusteringColumns.add(column); + } + + public void extendClusteringOrder(ColumnIdentifier column, boolean ascending) + { + if (null != clusteringOrder.put(column, ascending)) + throw ire("Duplicate column '%s' in CLUSTERING ORDER BY clause for table '%s'", column, name); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java new file mode 100644 index 0000000..cb6d14e --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QualifiedName; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.triggers.TriggerExecutor; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +public final class CreateTriggerStatement extends AlterSchemaStatement +{ + private final String tableName; + private final String triggerName; + private final String triggerClass; + private final boolean ifNotExists; + + public CreateTriggerStatement(String keyspaceName, String tableName, String triggerName, String triggerClass, boolean ifNotExists) + { + super(keyspaceName); + this.tableName = tableName; + this.triggerName = triggerName; + this.triggerClass = triggerClass; + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + TableMetadata table = keyspace.getTableOrViewNullable(tableName); + if (null == table) + throw ire("Table '%s' doesn't exist", tableName); + + if (table.isView()) + throw ire("Cannot CREATE TRIGGER for a materialized view"); + + TriggerMetadata existingTrigger = table.triggers.get(triggerName).orElse(null); + if (null != existingTrigger) + { + if (ifNotExists) + return schema; + + throw ire("Trigger '%s' already exists", triggerName); + } + + try + { + TriggerExecutor.instance.loadTriggerInstance(triggerClass); + } + catch (Exception e) + { + throw ire("Trigger class '%s' couldn't be loaded", triggerClass); + } + + TableMetadata newTable = table.withSwapped(table.triggers.with(TriggerMetadata.create(triggerName, triggerClass))); + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.withSwapped(newTable))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.UPDATED, Target.TABLE, keyspaceName, tableName); + } + + public void authorize(ClientState client) + { + client.ensureIsSuperuser("Only superusers are allowed to perform CREATE TRIGGER queries"); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_TRIGGER, keyspaceName, triggerName); + } + + public static final class Raw extends CQLStatement.Raw + { + private final QualifiedName tableName; + private final String triggerName; + private final String triggerClass; + private final boolean ifNotExists; + + public Raw(QualifiedName tableName, String triggerName, String triggerClass, boolean ifNotExists) + { + this.tableName = tableName; + this.triggerName = triggerName; + this.triggerClass = triggerClass; + this.ifNotExists = ifNotExists; + } + + public CreateTriggerStatement prepare(ClientState state) + { + String keyspaceName = tableName.hasKeyspace() ? tableName.getKeyspace() : state.getKeyspace(); + return new CreateTriggerStatement(keyspaceName, tableName.getName(), triggerName, triggerClass, ifNotExists); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java new file mode 100644 index 0000000..c328eb7 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.*; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.FieldIdentifier; +import org.apache.cassandra.cql3.UTName; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +import static java.util.stream.Collectors.toList; + +public final class CreateTypeStatement extends AlterSchemaStatement +{ + private final String typeName; + private final List<FieldIdentifier> fieldNames; + private final List<CQL3Type.Raw> rawFieldTypes; + private final boolean ifNotExists; + + public CreateTypeStatement(String keyspaceName, + String typeName, + List<FieldIdentifier> fieldNames, + List<CQL3Type.Raw> rawFieldTypes, + boolean ifNotExists) + { + super(keyspaceName); + this.typeName = typeName; + this.fieldNames = fieldNames; + this.rawFieldTypes = rawFieldTypes; + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + UserType existingType = keyspace.types.getNullable(bytes(typeName)); + if (null != existingType) + { + if (ifNotExists) + return schema; + + throw ire("A user type with name '%s' already exists", typeName); + } + + Set<FieldIdentifier> usedNames = new HashSet<>(); + for (FieldIdentifier name : fieldNames) + if (!usedNames.add(name)) + throw ire("Duplicate field name '%s' in type '%s'", name, typeName); + + for (CQL3Type.Raw type : rawFieldTypes) + { + if (type.isCounter()) + throw ire("A user type cannot contain counters"); + + if (type.isUDT() && !type.isFrozen()) + throw ire("A user type cannot contain non-frozen UDTs"); + } + + List<AbstractType<?>> fieldTypes = + rawFieldTypes.stream() + .map(t -> t.prepare(keyspaceName, keyspace.types).getType()) + .collect(toList()); + + UserType udt = new UserType(keyspaceName, bytes(typeName), fieldNames, fieldTypes, true); + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.types.with(udt))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.CREATED, Target.TYPE, keyspaceName, typeName); + } + + public void authorize(ClientState client) + { + client.ensureKeyspacePermission(keyspaceName, Permission.CREATE); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_TYPE, keyspaceName, typeName); + } + + public static final class Raw extends CQLStatement.Raw + { + private final UTName name; + private final boolean ifNotExists; + + private final List<FieldIdentifier> fieldNames = new ArrayList<>(); + private final List<CQL3Type.Raw> rawFieldTypes = new ArrayList<>(); + + public Raw(UTName name, boolean ifNotExists) + { + this.name = name; + this.ifNotExists = ifNotExists; + } + + public CreateTypeStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.getKeyspace() : state.getKeyspace(); + return new CreateTypeStatement(keyspaceName, name.getStringTypeName(), fieldNames, rawFieldTypes, ifNotExists); + } + + public void addField(FieldIdentifier name, CQL3Type.Raw type) + { + fieldNames.add(name); + rawFieldTypes.add(type); + } + + public void addToRawBuilder(Types.RawBuilder builder) + { + builder.add(name.getStringTypeName(), + fieldNames.stream().map(FieldIdentifier::toString).collect(toList()), + rawFieldTypes.stream().map(CQL3Type.Raw::toString).collect(toList())); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java new file mode 100644 index 0000000..f97b0fe --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.*; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.restrictions.StatementRestrictions; +import org.apache.cassandra.cql3.selection.RawSelector; +import org.apache.cassandra.cql3.selection.Selectable; +import org.apache.cassandra.cql3.statements.StatementType; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; + +import static java.lang.String.join; + +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.transform; + +public final class CreateViewStatement extends AlterSchemaStatement +{ + private final String tableName; + private final String viewName; + + private final List<RawSelector> rawColumns; + private final List<ColumnIdentifier> partitionKeyColumns; + private final List<ColumnIdentifier> clusteringColumns; + + private final WhereClause whereClause; + + private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder; + private final TableAttributes attrs; + + private final boolean ifNotExists; + + public CreateViewStatement(String keyspaceName, + String tableName, + String viewName, + + List<RawSelector> rawColumns, + List<ColumnIdentifier> partitionKeyColumns, + List<ColumnIdentifier> clusteringColumns, + + WhereClause whereClause, + + LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder, + TableAttributes attrs, + + boolean ifNotExists) + { + super(keyspaceName); + this.tableName = tableName; + this.viewName = viewName; + + this.rawColumns = rawColumns; + this.partitionKeyColumns = partitionKeyColumns; + this.clusteringColumns = clusteringColumns; + + this.whereClause = whereClause; + + this.clusteringOrder = clusteringOrder; + this.attrs = attrs; + + this.ifNotExists = ifNotExists; + } + + public Keyspaces apply(Keyspaces schema) + { + if (!DatabaseDescriptor.enableMaterializedViews()) + throw ire("Materialized views are disabled. Enable in cassandra.yaml to use."); + + /* + * Basic dependency validations + */ + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw ire("Keyspace '%s' doesn't exist", keyspaceName); + + TableMetadata table = keyspace.tables.getNullable(tableName); + if (null == table) + throw ire("Base table '%s' doesn't exist", tableName); + + if (keyspace.hasTable(viewName)) + throw ire("Cannot create materialized view '%s' - a table with the same name already exists", viewName); + + if (keyspace.hasView(viewName)) + { + if (ifNotExists) + return schema; + + throw new AlreadyExistsException(keyspaceName, viewName); + } + + /* + * Base table validation + */ + + if (table.isCounter()) + throw ire("Materialized views are not supported on counter tables"); + + if (table.isView()) + throw ire("Materialized views cannot be created against other materialized views"); + + if (table.params.gcGraceSeconds == 0) + { + throw ire("Cannot create materialized view '%s' for base table " + + "'%s' with gc_grace_seconds of 0, since this value is " + + "used to TTL undelivered updates. Setting gc_grace_seconds" + + " too low might cause undelivered updates to expire " + + "before being replayed.", + viewName, tableName); + } + + /* + * Process SELECT clause + */ + + Set<ColumnIdentifier> selectedColumns = new HashSet<>(); + + if (rawColumns.isEmpty()) // SELECT * + table.columns().forEach(c -> selectedColumns.add(c.name)); + + rawColumns.forEach(selector -> + { + if (null != selector.alias) + throw ire("Cannot use aliases when defining a materialized view (got %s)", selector); + + if (!(selector.selectable instanceof Selectable.RawIdentifier)) + throw ire("Can only select columns by name when defining a materialized view (got %s)", selector.selectable); + + // will throw IRE if the column doesn't exist in the base table + ColumnMetadata column = (ColumnMetadata) selector.selectable.prepare(table); + + selectedColumns.add(column.name); + }); + + selectedColumns.stream() + .map(table::getColumn) + .filter(ColumnMetadata::isStatic) + .findAny() + .ifPresent(c -> { throw ire("Cannot include static column '%s' in materialized view '%s'", c, viewName); }); + + /* + * Process PRIMARY KEY columns and CLUSTERING ORDER BY clause + */ + + if (partitionKeyColumns.isEmpty()) + throw ire("Must provide at least one partition key column for materialized view '%s'", viewName); + + HashSet<ColumnIdentifier> primaryKeyColumns = new HashSet<>(); + + concat(partitionKeyColumns, clusteringColumns).forEach(name -> + { + ColumnMetadata column = table.getColumn(name); + if (null == column || !selectedColumns.contains(name)) + throw ire("Unknown column '%s' referenced in PRIMARY KEY for materialized view '%s'", name, viewName); + + if (!primaryKeyColumns.add(name)) + throw ire("Duplicate column '%s' in PRIMARY KEY clause for materialized view '%s'", name, viewName); + + AbstractType<?> type = column.type; + + if (type.isMultiCell()) + { + if (type.isCollection()) + throw ire("Invalid non-frozen collection type '%s' for PRIMARY KEY column '%s'", type, name); + else + throw ire("Invalid non-frozen user-defined type '%s' for PRIMARY KEY column '%s'", type, name); + } + + if (type.isCounter()) + throw ire("counter type is not supported for PRIMARY KEY column '%s'", name); + + if (type.referencesDuration()) + throw ire("duration type is not supported for PRIMARY KEY column '%s'", name); + }); + + // If we give a clustering order, we must explicitly do so for all aliases and in the order of the PK + if (!clusteringOrder.isEmpty() && !clusteringColumns.equals(new ArrayList<>(clusteringOrder.keySet()))) + throw ire("Clustering key columns must exactly match columns in CLUSTERING ORDER BY directive"); + + /* + * We need to include all of the primary key columns from the base table in order to make sure that we do not + * overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in + * the view because if we need to generate a tombstone, we have no way of knowing which value is currently being + * used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require + * that they include all of the columns. We provide them with a list of all of the columns left to include. + */ + List<ColumnIdentifier> missingPrimaryKeyColumns = + Lists.newArrayList(filter(transform(table.primaryKeyColumns(), c -> c.name), c -> !primaryKeyColumns.contains(c))); + + if (!missingPrimaryKeyColumns.isEmpty()) + { + throw ire("Cannot create materialized view '%s' without primary key columns %s from base table '%s'", + viewName, join(", ", transform(missingPrimaryKeyColumns, ColumnIdentifier::toString)), tableName); + } + + Set<ColumnIdentifier> regularBaseTableColumnsInViewPrimaryKey = new HashSet<>(primaryKeyColumns); + transform(table.primaryKeyColumns(), c -> c.name).forEach(regularBaseTableColumnsInViewPrimaryKey::remove); + if (regularBaseTableColumnsInViewPrimaryKey.size() > 1) + { + throw ire("Cannot include more than one non-primary key column in materialized view primary key (got %s)", + join(", ", transform(regularBaseTableColumnsInViewPrimaryKey, ColumnIdentifier::toString))); + } + + /* + * Process WHERE clause + */ + + if (whereClause.containsCustomExpressions()) + throw ire("WHERE clause for materialized view '%s' cannot contain custom index expressions", viewName); + + StatementRestrictions restrictions = + new StatementRestrictions(StatementType.SELECT, + table, + whereClause, + VariableSpecifications.empty(), + false, + false, + true, + true); + + List<ColumnIdentifier> nonRestrictedPrimaryKeyColumns = + Lists.newArrayList(filter(primaryKeyColumns, name -> !restrictions.isRestricted(table.getColumn(name)))); + + if (!nonRestrictedPrimaryKeyColumns.isEmpty()) + { + throw ire("Primary key columns %s must be restricted with 'IS NOT NULL' or otherwise", + join(", ", transform(nonRestrictedPrimaryKeyColumns, ColumnIdentifier::toString))); + } + + // See CASSANDRA-13798 + Set<ColumnMetadata> restrictedNonPrimaryKeyColumns = restrictions.nonPKRestrictedColumns(false); + if (!restrictedNonPrimaryKeyColumns.isEmpty() && !Boolean.getBoolean("cassandra.mv.allow_filtering_nonkey_columns_unsafe")) + { + throw ire("Non-primary key columns can only be restricted with 'IS NOT NULL' (got: %s restricted illegally)", + join(",", transform(restrictedNonPrimaryKeyColumns, ColumnMetadata::toString))); + } + + /* + * Validate WITH params + */ + + attrs.validate(); + + if (attrs.hasOption(TableParams.Option.DEFAULT_TIME_TO_LIVE)) + { + throw ire("Cannot set default_time_to_live for a materialized view. " + + "Data in a materialized view always expire at the same time than " + + "the corresponding data in the parent table."); + } + + /* + * Build the thing + */ + + TableMetadata.Builder builder = TableMetadata.builder(keyspaceName, viewName); + + if (attrs.hasProperty(TableAttributes.ID)) + builder.id(attrs.getId()); + + builder.params(attrs.asNewTableParams()) + .kind(TableMetadata.Kind.VIEW); + + partitionKeyColumns.forEach(name -> builder.addPartitionKeyColumn(name, getType(table, name))); + clusteringColumns.forEach(name -> builder.addClusteringColumn(name, getType(table, name))); + + selectedColumns.stream() + .filter(name -> !primaryKeyColumns.contains(name)) + .forEach(name -> builder.addRegularColumn(name, getType(table, name))); + + ViewMetadata view = new ViewMetadata(table.id, table.name, rawColumns.isEmpty(), whereClause, builder.build()); + view.metadata.validate(); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.with(view))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + return new SchemaChange(Change.CREATED, Target.TABLE, keyspaceName, viewName); + } + + public void authorize(ClientState client) + { + client.ensureTablePermission(keyspaceName, tableName, Permission.ALTER); + } + + private AbstractType<?> getType(TableMetadata table, ColumnIdentifier name) + { + AbstractType<?> type = table.getColumn(name).type; + boolean reverse = !clusteringOrder.getOrDefault(name, true); + + if (type.isReversed() && !reverse) + return ((ReversedType) type).baseType; + else if (!type.isReversed() && reverse) + return ReversedType.getInstance(type); + else + return type; + } + + @Override + Set<String> clientWarnings(KeyspacesDiff diff) + { + return ImmutableSet.of("Materialized views are experimental and are not recommended for production use."); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.CREATE_VIEW, keyspaceName, viewName); + } + + public final static class Raw extends CQLStatement.Raw + { + private final QualifiedName tableName; + private final QualifiedName viewName; + private final boolean ifNotExists; + + private final List<RawSelector> rawColumns; + private final List<ColumnIdentifier> clusteringColumns = new ArrayList<>(); + private List<ColumnIdentifier> partitionKeyColumns; + + private final WhereClause whereClause; + + private final LinkedHashMap<ColumnIdentifier, Boolean> clusteringOrder = new LinkedHashMap<>(); + public final TableAttributes attrs = new TableAttributes(); + + public Raw(QualifiedName tableName, QualifiedName viewName, List<RawSelector> rawColumns, WhereClause whereClause, boolean ifNotExists) + { + this.tableName = tableName; + this.viewName = viewName; + this.rawColumns = rawColumns; + this.whereClause = whereClause; + this.ifNotExists = ifNotExists; + } + + public CreateViewStatement prepare(ClientState state) + { + String keyspaceName = viewName.hasKeyspace() ? viewName.getKeyspace() : state.getKeyspace(); + + if (tableName.hasKeyspace() && !keyspaceName.equals(tableName.getKeyspace())) + throw ire("Cannot create a materialized view on a table in a different keyspace"); + + if (!bindVariables.isEmpty()) + throw ire("Bind variables are not allowed in CREATE MATERIALIZED VIEW statements"); + + if (null == partitionKeyColumns) + throw ire("No PRIMARY KEY specifed for view '%s' (exactly one required)", viewName); + + return new CreateViewStatement(keyspaceName, + tableName.getName(), + viewName.getName(), + + rawColumns, + partitionKeyColumns, + clusteringColumns, + + whereClause, + + clusteringOrder, + attrs, + + ifNotExists); + } + + public void setPartitionKeyColumns(List<ColumnIdentifier> columns) + { + partitionKeyColumns = columns; + } + + public void markClusteringColumn(ColumnIdentifier column) + { + clusteringColumns.add(column); + } + + public void extendClusteringOrder(ColumnIdentifier column, boolean ascending) + { + if (null != clusteringOrder.put(column, ascending)) + throw ire("Duplicate column '%s' in CLUSTERING ORDER BY clause for view '%s'", column, viewName); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java new file mode 100644 index 0000000..564f267 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; + +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.transform; + +public final class DropAggregateStatement extends AlterSchemaStatement +{ + private final String aggregateName; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpeficied; + private final boolean ifExists; + + public DropAggregateStatement(String keyspaceName, + String aggregateName, + List<CQL3Type.Raw> arguments, + boolean argumentsSpeficied, + boolean ifExists) + { + super(keyspaceName); + this.aggregateName = aggregateName; + this.arguments = arguments; + this.argumentsSpeficied = argumentsSpeficied; + this.ifExists = ifExists; + } + + public Keyspaces apply(Keyspaces schema) + { + String name = + argumentsSpeficied + ? format("%s.%s(%s)", keyspaceName, aggregateName, join(", ", transform(arguments, CQL3Type.Raw::toString))) + : format("%s.%s", keyspaceName, aggregateName); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + { + if (ifExists) + return schema; + + throw ire("Aggregate '%s' doesn't exist", name); + } + + Collection<Function> aggregates = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName)); + if (aggregates.size() > 1 && !argumentsSpeficied) + { + throw ire("'DROP AGGREGATE %s' matches multiple function definitions; " + + "specify the argument types by issuing a statement like " + + "'DROP AGGREGATE %s (type, type, ...)'. You can use cqlsh " + + "'DESCRIBE AGGREGATE %s' command to find all overloads", + aggregateName, aggregateName, aggregateName); + } + + arguments.stream() + .filter(CQL3Type.Raw::isFrozen) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + + List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types); + + Predicate<Function> filter = Functions.Filter.UDA; + if (argumentsSpeficied) + filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes)); + + Function aggregate = aggregates.stream().filter(filter).findAny().orElse(null); + if (null == aggregate) + { + if (ifExists) + return schema; + + throw ire("Aggregate '%s' doesn't exist", name); + } + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(aggregate))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + Functions dropped = diff.altered.get(0).udas.dropped; + assert dropped.size() == 1; + return SchemaChange.forAggregate(Change.DROPPED, (UDAggregate) dropped.iterator().next()); + } + + public void authorize(ClientState client) + { + KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (null == keyspace) + return; + + Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, aggregateName)).stream(); + if (argumentsSpeficied) + functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types))); + + functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f))); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.DROP_AGGREGATE, keyspaceName, aggregateName); + } + + private List<AbstractType<?>> prepareArgumentTypes(Types types) + { + return arguments.stream() + .map(t -> t.prepare(keyspaceName, types)) + .map(CQL3Type::getType) + .collect(toList()); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpecified; + private final boolean ifExists; + + public Raw(FunctionName name, + List<CQL3Type.Raw> arguments, + boolean argumentsSpecified, + boolean ifExists) + { + this.name = name; + this.arguments = arguments; + this.argumentsSpecified = argumentsSpecified; + this.ifExists = ifExists; + } + + public DropAggregateStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + return new DropAggregateStatement(keyspaceName, name.name, arguments, argumentsSpecified, ifExists); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java new file mode 100644 index 0000000..9433833 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements.schema; + +import java.util.Collection; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.cassandra.audit.AuditLogContext; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.auth.FunctionResource; +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; + +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.transform; + +public final class DropFunctionStatement extends AlterSchemaStatement +{ + private final String functionName; + private final Collection<CQL3Type.Raw> arguments; + private final boolean argumentsSpeficied; + private final boolean ifExists; + + public DropFunctionStatement(String keyspaceName, + String functionName, + Collection<CQL3Type.Raw> arguments, + boolean argumentsSpeficied, + boolean ifExists) + { + super(keyspaceName); + this.functionName = functionName; + this.arguments = arguments; + this.argumentsSpeficied = argumentsSpeficied; + this.ifExists = ifExists; + } + + public Keyspaces apply(Keyspaces schema) + { + String name = + argumentsSpeficied + ? format("%s.%s(%s)", keyspaceName, functionName, join(", ", transform(arguments, CQL3Type.Raw::toString))) + : format("%s.%s", keyspaceName, functionName); + + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + { + if (ifExists) + return schema; + + throw ire("Function '%s' doesn't exist", name); + } + + Collection<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName)); + if (functions.size() > 1 && !argumentsSpeficied) + { + throw ire("'DROP FUNCTION %s' matches multiple function definitions; " + + "specify the argument types by issuing a statement like " + + "'DROP FUNCTION %s (type, type, ...)'. You can use cqlsh " + + "'DESCRIBE FUNCTION %s' command to find all overloads", + functionName, functionName, functionName); + } + + arguments.stream() + .filter(CQL3Type.Raw::isFrozen) + .findFirst() + .ifPresent(t -> { throw ire("Argument '%s' cannot be frozen; remove frozen<> modifier from '%s'", t, t); }); + + List<AbstractType<?>> argumentTypes = prepareArgumentTypes(keyspace.types); + + Predicate<Function> filter = Functions.Filter.UDF; + if (argumentsSpeficied) + filter = filter.and(f -> Functions.typesMatch(f.argTypes(), argumentTypes)); + + Function function = functions.stream().filter(filter).findAny().orElse(null); + if (null == function) + { + if (ifExists) + return schema; + + throw ire("Function '%s' doesn't exist", name); + } + + String dependentAggregates = + keyspace.functions + .aggregatesUsingFunction(function) + .map(a -> a.name().toString()) + .collect(joining(", ")); + + if (!dependentAggregates.isEmpty()) + throw ire("Function '%s' is still referenced by aggregates %s", name, dependentAggregates); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.without(function))); + } + + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + Functions dropped = diff.altered.get(0).udfs.dropped; + assert dropped.size() == 1; + return SchemaChange.forFunction(Change.DROPPED, (UDFunction) dropped.iterator().next()); + } + + public void authorize(ClientState client) + { + KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName); + if (null == keyspace) + return; + + Stream<Function> functions = keyspace.functions.get(new FunctionName(keyspaceName, functionName)).stream(); + if (argumentsSpeficied) + functions = functions.filter(f -> Functions.typesMatch(f.argTypes(), prepareArgumentTypes(keyspace.types))); + + functions.forEach(f -> client.ensurePermission(Permission.DROP, FunctionResource.function(f))); + } + + @Override + public AuditLogContext getAuditLogContext() + { + return new AuditLogContext(AuditLogEntryType.DROP_FUNCTION, keyspaceName, functionName); + } + + private List<AbstractType<?>> prepareArgumentTypes(Types types) + { + return arguments.stream() + .map(t -> t.prepare(keyspaceName, types)) + .map(CQL3Type::getType) + .collect(toList()); + } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName name; + private final List<CQL3Type.Raw> arguments; + private final boolean argumentsSpecified; + private final boolean ifExists; + + public Raw(FunctionName name, + List<CQL3Type.Raw> arguments, + boolean argumentsSpecified, + boolean ifExists) + { + this.name = name; + this.arguments = arguments; + this.argumentsSpecified = argumentsSpecified; + this.ifExists = ifExists; + } + + public DropFunctionStatement prepare(ClientState state) + { + String keyspaceName = name.hasKeyspace() ? name.keyspace : state.getKeyspace(); + return new DropFunctionStatement(keyspaceName, name.name, arguments, argumentsSpecified, ifExists); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
