13426: CREATE OR REPLACE AGGREGATE
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b30c6da2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b30c6da2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b30c6da2 Branch: refs/heads/13426 Commit: b30c6da2da0b69faaa303bc2245b0b7b9a641701 Parents: aaddbd4 Author: Aleksey Yeschenko <[email protected]> Authored: Fri May 19 13:21:35 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed May 31 18:51:15 2017 +0100 ---------------------------------------------------------------------- src/antlr/Cql.g | 1 + src/antlr/Parser.g | 4 +- .../apache/cassandra/auth/FunctionResource.java | 5 + .../statements/CreateAggregateStatement.java | 256 ------------------- .../schema/CreateAggregateStatement.java | 136 ++++++++-- .../schema/CreateFunctionStatement.java | 1 - .../org/apache/cassandra/schema/Functions.java | 2 +- .../cassandra/schema/MigrationManager.java | 8 - .../apache/cassandra/schema/SchemaKeyspace.java | 12 +- .../validation/operations/AggregationTest.java | 41 ++- 10 files changed, 145 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/antlr/Cql.g ---------------------------------------------------------------------- diff --git a/src/antlr/Cql.g b/src/antlr/Cql.g index e0d5b3f..576a65d 100644 --- a/src/antlr/Cql.g +++ b/src/antlr/Cql.g @@ -48,6 +48,7 @@ import Parser,Lexer; import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement; import org.apache.cassandra.cql3.statements.schema.AlterViewStatement; + import org.apache.cassandra.cql3.statements.schema.CreateAggregateStatement; import org.apache.cassandra.cql3.statements.schema.CreateIndexStatement; import org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/antlr/Parser.g ---------------------------------------------------------------------- diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g index e93dac9..1b230c6 100644 --- a/src/antlr/Parser.g +++ b/src/antlr/Parser.g @@ -630,7 +630,7 @@ batchStatementObjective returns [ModificationStatement.Parsed statement] | d=deleteStatement { $statement = d; } ; -createAggregateStatement returns [CreateAggregateStatement expr] +createAggregateStatement returns [CreateAggregateStatement.Raw stmt] @init { boolean orReplace = false; boolean ifNotExists = false; @@ -655,7 +655,7 @@ createAggregateStatement returns [CreateAggregateStatement expr] ( K_INITCOND ival = term )? - { $expr = new CreateAggregateStatement(fn, argsTypes, sfunc, stype, ffunc, ival, orReplace, ifNotExists); } + { $stmt = new CreateAggregateStatement.Raw(fn, argsTypes, stype, sfunc, ffunc, ival, orReplace, ifNotExists); } ; dropAggregateStatement returns [DropAggregateStatement.Raw stmt] http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/auth/FunctionResource.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/FunctionResource.java b/src/java/org/apache/cassandra/auth/FunctionResource.java index 3e112cb..79497ef 100644 --- a/src/java/org/apache/cassandra/auth/FunctionResource.java +++ b/src/java/org/apache/cassandra/auth/FunctionResource.java @@ -162,6 +162,11 @@ public class FunctionResource implements IResource return new FunctionResource(keyspace, name, abstractTypes); } + public static FunctionResource functionFromCql(FunctionName name, List<CQL3Type.Raw> argTypes) + { + return functionFromCql(name.keyspace, name.name, argTypes); + } + /** * Parses a resource name into a FunctionResource instance. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java deleted file mode 100644 index 878195f..0000000 --- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.statements; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Objects; -import java.util.List; - -import org.apache.cassandra.auth.*; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.functions.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.schema.MigrationManager; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.transport.Event; -import org.apache.cassandra.transport.ProtocolVersion; - -/** - * A {@code CREATE AGGREGATE} statement parsed from a CQL query. - */ -public final class CreateAggregateStatement extends SchemaAlteringStatement -{ - private final boolean orReplace; - private final boolean ifNotExists; - private FunctionName functionName; - private FunctionName stateFunc; - private FunctionName finalFunc; - private final CQL3Type.Raw stateTypeRaw; - - private final List<CQL3Type.Raw> argRawTypes; - private final Term.Raw ival; - - private List<AbstractType<?>> argTypes; - private AbstractType<?> returnType; - private ScalarFunction stateFunction; - private ScalarFunction finalFunction; - private ByteBuffer initcond; - - public CreateAggregateStatement(FunctionName functionName, - List<CQL3Type.Raw> argRawTypes, - String stateFunc, - CQL3Type.Raw stateType, - String finalFunc, - Term.Raw ival, - boolean orReplace, - boolean ifNotExists) - { - this.functionName = functionName; - this.argRawTypes = argRawTypes; - this.stateFunc = new FunctionName(functionName.keyspace, stateFunc); - this.finalFunc = finalFunc != null ? new FunctionName(functionName.keyspace, finalFunc) : null; - this.stateTypeRaw = stateType; - this.ival = ival; - this.orReplace = orReplace; - this.ifNotExists = ifNotExists; - } - - public CreateAggregateStatement prepare(ClientState state) - { - argTypes = new ArrayList<>(argRawTypes.size()); - for (CQL3Type.Raw rawType : argRawTypes) - argTypes.add(prepareType("arguments", rawType)); - - AbstractType<?> stateType = prepareType("state type", stateTypeRaw); - - List<AbstractType<?>> stateArgs = stateArguments(stateType, argTypes); - - Function f = Schema.instance.findFunction(stateFunc, stateArgs).orElse(null); - if (!(f instanceof ScalarFunction)) - throw new InvalidRequestException("State function " + stateFuncSig(stateFunc, stateTypeRaw, argRawTypes) + " does not exist or is not a scalar function"); - stateFunction = (ScalarFunction)f; - - AbstractType<?> stateReturnType = stateFunction.returnType(); - if (!stateReturnType.equals(stateType)) - throw new InvalidRequestException("State function " + stateFuncSig(stateFunction.name(), stateTypeRaw, argRawTypes) + " return type must be the same as the first argument type - check STYPE, argument and return types"); - - if (finalFunc != null) - { - List<AbstractType<?>> finalArgs = Collections.<AbstractType<?>>singletonList(stateType); - f = Schema.instance.findFunction(finalFunc, finalArgs).orElse(null); - if (!(f instanceof ScalarFunction)) - throw new InvalidRequestException("Final function " + finalFunc + '(' + stateTypeRaw + ") does not exist or is not a scalar function"); - finalFunction = (ScalarFunction) f; - returnType = finalFunction.returnType(); - } - else - { - returnType = stateReturnType; - } - - if (ival != null) - { - initcond = Terms.asBytes(functionName.keyspace, ival.toString(), stateType); - - if (initcond != null) - { - try - { - stateType.validate(initcond); - } - catch (MarshalException e) - { - throw new InvalidRequestException(String.format("Invalid value for INITCOND of type %s%s", stateType.asCQL3Type(), - e.getMessage() == null ? "" : String.format(" (%s)", e.getMessage()))); - } - } - - // Sanity check that converts the initcond to a CQL literal and parse it back to avoid getting in CASSANDRA-11064. - String initcondAsCql = stateType.asCQL3Type().toCQLLiteral(initcond, ProtocolVersion.CURRENT); - assert Objects.equals(initcond, Terms.asBytes(functionName.keyspace, initcondAsCql, stateType)); - - if (Constants.NULL_LITERAL != ival && UDHelper.isNullOrEmpty(stateType, initcond)) - throw new InvalidRequestException("INITCOND must not be empty for all types except TEXT, ASCII, BLOB"); - } - - return this; - } - - private AbstractType<?> prepareType(String typeName, CQL3Type.Raw rawType) - { - if (rawType.isFrozen()) - throw new InvalidRequestException(String.format("The function %s should not be frozen; remove the frozen<> modifier", typeName)); - - AbstractType<?> type = rawType.prepare(functionName.keyspace).getType(); - return type; - } - - public void setKeyspace(ClientState state) throws InvalidRequestException - { - if (!functionName.hasKeyspace() && state.getRawKeyspace() != null) - functionName = new FunctionName(state.getKeyspace(), functionName.name); - - if (!functionName.hasKeyspace()) - throw new InvalidRequestException("Functions must be fully qualified with a keyspace name if a keyspace is not set for the session"); - - Schema.validateKeyspaceNotSystem(functionName.keyspace); - - stateFunc = new FunctionName(functionName.keyspace, stateFunc.name); - if (finalFunc != null) - finalFunc = new FunctionName(functionName.keyspace, finalFunc.name); - } - - protected void grantPermissionsToCreator(QueryState state) - { - try - { - IResource resource = FunctionResource.function(functionName.keyspace, functionName.name, argTypes); - DatabaseDescriptor.getAuthorizer().grant(AuthenticatedUser.SYSTEM_USER, - resource.applicablePermissions(), - resource, - RoleResource.role(state.getClientState().getUser().getName())); - } - catch (RequestExecutionException e) - { - throw new RuntimeException(e); - } - } - - public void authorize(ClientState state) throws UnauthorizedException, InvalidRequestException - { - if (Schema.instance.findFunction(functionName, argTypes).isPresent() && orReplace) - state.ensurePermission(Permission.ALTER, FunctionResource.function(functionName.keyspace, - functionName.name, - argTypes)); - else - state.ensurePermission(Permission.CREATE, FunctionResource.keyspace(functionName.keyspace)); - - state.ensurePermission(Permission.EXECUTE, stateFunction); - - if (finalFunction != null) - state.ensurePermission(Permission.EXECUTE, finalFunction); - } - - public void validate(ClientState state) throws InvalidRequestException - { - if (ifNotExists && orReplace) - throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); - - if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null) - throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace)); - } - - public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException - { - Function old = Schema.instance.findFunction(functionName, argTypes).orElse(null); - boolean replaced = old != null; - if (replaced) - { - if (ifNotExists) - return null; - if (!orReplace) - throw new InvalidRequestException(String.format("Function %s already exists", old)); - if (!(old instanceof AggregateFunction)) - throw new InvalidRequestException(String.format("Aggregate %s can only replace an aggregate", old)); - - // Means we're replacing the function. We still need to validate that 1) it's not a native function and 2) that the return type - // matches (or that could break existing code badly) - if (old.isNative()) - throw new InvalidRequestException(String.format("Cannot replace native aggregate %s", old)); - if (!old.returnType().isValueCompatibleWith(returnType)) - throw new InvalidRequestException(String.format("Cannot replace aggregate %s, the new return type %s is not compatible with the return type %s of existing function", - functionName, returnType.asCQL3Type(), old.returnType().asCQL3Type())); - } - - if (!stateFunction.isCalledOnNullInput() && initcond == null) - throw new InvalidRequestException(String.format("Cannot create aggregate %s without INITCOND because state function %s does not accept 'null' arguments", functionName, stateFunc)); - - UDAggregate udAggregate = new UDAggregate(functionName, argTypes, returnType, stateFunction, finalFunction, initcond); - - MigrationManager.announceNewAggregate(udAggregate, isLocalOnly); - - return new Event.SchemaChange(replaced ? Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED, - Event.SchemaChange.Target.AGGREGATE, - udAggregate.name().keyspace, udAggregate.name().name, AbstractType.asCQLTypeStringList(udAggregate.argTypes())); - } - - private static String stateFuncSig(FunctionName stateFuncName, CQL3Type.Raw stateTypeRaw, List<CQL3Type.Raw> argRawTypes) - { - StringBuilder sb = new StringBuilder(); - sb.append(stateFuncName.toString()).append('(').append(stateTypeRaw); - for (CQL3Type.Raw argRawType : argRawTypes) - sb.append(", ").append(argRawType); - sb.append(')'); - return sb.toString(); - } - - private static List<AbstractType<?>> stateArguments(AbstractType<?> stateType, List<AbstractType<?>> argTypes) - { - List<AbstractType<?>> r = new ArrayList<>(argTypes.size() + 1); - r.add(stateType); - r.addAll(argTypes); - return r; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java index 04e3c81..ef0df86 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java @@ -21,25 +21,33 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Set; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.cassandra.auth.FunctionResource; import org.apache.cassandra.auth.IResource; +import org.apache.cassandra.auth.Permission; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.schema.Functions.FunctionsDiff; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.Event.SchemaChange; +import org.apache.cassandra.transport.Event.SchemaChange.Change; +import org.apache.cassandra.transport.Event.SchemaChange.Target; import org.apache.cassandra.transport.ProtocolVersion; import static java.lang.String.format; import static java.lang.String.join; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.transform; @@ -76,17 +84,6 @@ public final class CreateAggregateStatement extends AlterSchemaStatement this.ifNotExists = ifNotExists; } - SchemaChange schemaChangeEvent(KeyspacesDiff diff) - { - throw new UnsupportedOperationException(); - } - - @Override - Set<IResource> createdResources(KeyspacesDiff diff) - { - return ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes)); - } - public Keyspaces apply(Keyspaces schema) { if (ifNotExists && orReplace) @@ -108,17 +105,20 @@ public final class CreateAggregateStatement extends AlterSchemaStatement * Resolve the state function */ - // TODO replace Lists.transform use - List<AbstractType<?>> argumentTypes = Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName, keyspace.types).getType()); + List<AbstractType<?>> argumentTypes = + rawArgumentTypes.stream() + .map(t -> t.prepare(keyspaceName, keyspace.types).getType()) + .collect(toList()); AbstractType<?> stateType = rawStateType.prepare(keyspaceName, keyspace.types).getType(); List<AbstractType<?>> stateFunctionArguments = Lists.newArrayList(concat(singleton(stateType), argumentTypes)); - Function stateFunction = keyspace.functions.find(stateFunctionName, stateFunctionArguments).orElse(null); - if (null == stateFunction) - throw ire("State function %s does not exist", stateFunctionString()); + Function stateFunction = + keyspace.functions + .find(stateFunctionName, stateFunctionArguments) + .orElseThrow(() -> ire("State function %s doesn't exist", stateFunctionString())); if (stateFunction.isAggregate()) - throw ire("State function %s is not a scalar function", stateFunctionString()); + throw ire("State function %s isn't a scalar function", stateFunctionString()); if (!stateFunction.returnType().equals(stateType)) { @@ -137,10 +137,10 @@ public final class CreateAggregateStatement extends AlterSchemaStatement { finalFunction = keyspace.functions.find(finalFunctionName, singletonList(stateType)).orElse(null); if (null == finalFunction) - throw ire("Final function %s does not exist", finalFunctionString()); + throw ire("Final function %s doesn't exist", finalFunctionString()); if (finalFunction.isAggregate()) - throw ire("Final function %s is not a scalar function", finalFunctionString()); + throw ire("Final function %s isn't a scalar function", finalFunctionString()); // override return type with that of the final function returnType = finalFunction.returnType(); @@ -150,11 +150,10 @@ public final class CreateAggregateStatement extends AlterSchemaStatement * Validate initial condition */ - // TODO: WTF? ByteBuffer initialValue = null; if (null != rawInitialValue) { - initialValue = Terms.asBytes(keyspaceName, initialValue.toString(), stateType); + initialValue = Terms.asBytes(keyspaceName, rawInitialValue.toString(), stateType); if (null != initialValue) { @@ -170,7 +169,7 @@ public final class CreateAggregateStatement extends AlterSchemaStatement // Converts initcond to a CQL literal and parse it back to avoid another CASSANDRA-11064 String initialValueString = stateType.asCQL3Type().toCQLLiteral(initialValue, ProtocolVersion.CURRENT); - assert !Terms.asBytes(keyspaceName, initialValueString, stateType).equals(initialValue); + assert Objects.equal(initialValue, Terms.asBytes(keyspaceName, initialValueString, stateType)); if (Constants.NULL_LITERAL != rawInitialValue && UDHelper.isNullOrEmpty(stateType, initialValue)) throw ire("INITCOND must not be empty for all types except TEXT, ASCII, BLOB"); @@ -201,7 +200,7 @@ public final class CreateAggregateStatement extends AlterSchemaStatement if (!returnType.isCompatibleWith(existingAggregate.returnType())) { - throw ire("Cannot replace aggregate '%s', the new return type %s is not compatible with the return type %s of existing function", + throw ire("Cannot replace aggregate '%s', the new return type %s isn't compatible with the return type %s of existing function", aggregateName, returnType.asCQL3Type(), existingAggregate.returnType().asCQL3Type()); @@ -219,6 +218,51 @@ public final class CreateAggregateStatement extends AlterSchemaStatement return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(aggregate))); } + SchemaChange schemaChangeEvent(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas; + + assert udasDiff.created.size() + udasDiff.altered.size() == 1; + boolean created = !udasDiff.created.isEmpty(); + + return new SchemaChange(created ? Change.CREATED : Change.UPDATED, + Target.AGGREGATE, + keyspaceName, + aggregateName, + rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList())); + } + + public void authorize(ClientState client) + { + FunctionName name = new FunctionName(keyspaceName, aggregateName); + + if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace) + client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes)); + else + client.ensurePermission(Permission.CREATE, FunctionResource.keyspace(keyspaceName)); + + FunctionResource stateFunction = + FunctionResource.functionFromCql(stateFunctionName, Lists.newArrayList(concat(singleton(rawStateType), rawArgumentTypes))); + client.ensurePermission(Permission.EXECUTE, stateFunction); + + if (null != finalFunctionName) + client.ensurePermission(Permission.EXECUTE, FunctionResource.functionFromCql(finalFunctionName, singletonList(rawStateType))); + } + + @Override + Set<IResource> createdResources(KeyspacesDiff diff) + { + assert diff.altered.size() == 1; + FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas; + + assert udasDiff.created.size() + udasDiff.altered.size() == 1; + + return udasDiff.created.isEmpty() + ? ImmutableSet.of() + : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, aggregateName, rawArgumentTypes)); + } + private String stateFunctionString() { return format("%s(%s)", stateFunctionName, join(", ", transform(concat(singleton(rawStateType), rawArgumentTypes), Object::toString))); @@ -228,4 +272,50 @@ public final class CreateAggregateStatement extends AlterSchemaStatement { return format("%s(%s)", finalFunctionName, rawStateType); } + + public static final class Raw extends CQLStatement.Raw + { + private final FunctionName aggregateName; + private final List<CQL3Type.Raw> rawArgumentTypes; + private final CQL3Type.Raw rawStateType; + private final String stateFunctionName; + private final String finalFunctionName; + private final Term.Raw rawInitialValue; + private final boolean orReplace; + private final boolean ifNotExists; + + public Raw(FunctionName aggregateName, + List<CQL3Type.Raw> rawArgumentTypes, + CQL3Type.Raw rawStateType, + String stateFunctionName, + String finalFunctionName, + Term.Raw rawInitialValue, + boolean orReplace, + boolean ifNotExists) + { + this.aggregateName = aggregateName; + this.rawArgumentTypes = rawArgumentTypes; + this.rawStateType = rawStateType; + this.stateFunctionName = stateFunctionName; + this.finalFunctionName = finalFunctionName; + this.rawInitialValue = rawInitialValue; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public CreateAggregateStatement prepare(ClientState state) + { + String keyspaceName = aggregateName.hasKeyspace() ? aggregateName.keyspace : state.getKeyspace(); + + return new CreateAggregateStatement(keyspaceName, + aggregateName.name, + rawArgumentTypes, + rawStateType, + new FunctionName(keyspaceName, stateFunctionName), + null != finalFunctionName ? new FunctionName(keyspaceName, finalFunctionName) : null, + rawInitialValue, + orReplace, + ifNotExists); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java index 00b98f5..78fe9d6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java @@ -167,7 +167,6 @@ public final class CreateFunctionStatement extends AlterSchemaStatement { FunctionName name = new FunctionName(keyspaceName, functionName); - // TODO: replace lists.transform use if (Schema.instance.findFunction(name, Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName).getType())).isPresent() && orReplace) client.ensurePermission(Permission.ALTER, FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes)); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Functions.java b/src/java/org/apache/cassandra/schema/Functions.java index db5e871..d5a2782 100644 --- a/src/java/org/apache/cassandra/schema/Functions.java +++ b/src/java/org/apache/cassandra/schema/Functions.java @@ -245,7 +245,7 @@ public final class Functions implements Iterable<Function> public Functions withAddedOrUpdated(Function function) { - return builder().add(Iterables.filter(this, f -> !(f.name().equals(function.name())) && Functions.typesMatch(f.argTypes(), function.argTypes()))) + return builder().add(Iterables.filter(this, f -> !(f.name().equals(function.name()) && Functions.typesMatch(f.argTypes(), function.argTypes())))) .add(function) .build(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index a9f69f4..e970552 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.UserType; @@ -307,13 +306,6 @@ public class MigrationManager announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); } - public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) - { - logger.info("Create aggregate function '{}'", udf.name()); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); - announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - static void announceKeyspaceUpdate(KeyspaceMetadata ksm) { ksm.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 19b0bb7..6be9df0 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -292,7 +292,11 @@ public final class SchemaKeyspace kd.views.dropped.forEach(v -> addDropViewToSchemaMutation(v, builder)); kd.views.altered.forEach(vd -> addAlterViewToSchemaMutation(vd.before, vd.after, builder)); + kd.udfs.created.forEach(f -> addFunctionToSchemaMutation((UDFunction) f, builder)); kd.udfs.dropped.forEach(f -> addDropFunctionToSchemaMutation((UDFunction) f, builder)); + + kd.udas.created.forEach(a -> addAggregateToSchemaMutation((UDAggregate) a, builder)); + kd.udas.altered.forEach(ad -> addAggregateToSchemaMutation(ad.after, builder)); kd.udas.dropped.forEach(a -> addDropAggregateToSchemaMutation((UDAggregate) a, builder)); mutations.put(ks.name, builder.build()); @@ -846,14 +850,6 @@ public final class SchemaKeyspace builder.update(Functions).row(function.name().name, function.argumentsList()).delete(); } - static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata keyspace, UDAggregate aggregate, long timestamp) - { - // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - Mutation.SimpleBuilder builder = makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp); - addAggregateToSchemaMutation(aggregate, builder); - return builder; - } - private static void addAggregateToSchemaMutation(UDAggregate aggregate, Mutation.SimpleBuilder builder) { builder.update(Aggregates) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java index 5489e4d..3e14854 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java @@ -41,18 +41,15 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.TurboFilterList; import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter; import ch.qos.logback.classic.turbo.TurboFilter; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.UntypedResultSet.Row; -import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.exceptions.FunctionExecutionException; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.ProtocolVersion; @@ -448,7 +445,7 @@ public class AggregationTest extends CQLTester schemaChange("CREATE OR REPLACE AGGREGATE " + a + "(double) " + "SFUNC " + shortFunctionName(f) + " " + "STYPE double " + - "INITCOND 0"); + "INITCOND 1"); assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, KEYSPACE, parseFunctionName(a).name, @@ -689,37 +686,37 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return a.toString();'"); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE double " + "FINALFUNC " + shortFunctionName(fFinal)); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE double " + "FINALFUNC " + shortFunctionName(fFinal)); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE int " + "FINALFUNC " + shortFunctionName(fFinal)); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE int"); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE double"); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" + "SFUNC " + shortFunctionName(fState2) + " " + "STYPE double " + "FINALFUNC " + shortFunctionName(fFinal)); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(double)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE double " + @@ -745,13 +742,13 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return a.toString();'"); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" + "SFUNC " + shortFunctionName(fState) + "_not_there " + "STYPE int " + "FINALFUNC " + shortFunctionName(fFinal)); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggrInvalid(int)" + "SFUNC " + shortFunctionName(fState) + " " + "STYPE int " + @@ -834,7 +831,7 @@ public class AggregationTest extends CQLTester @Test public void testJavaAggregateWithoutStateOrFinal() throws Throwable { - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE1(int) " + "SFUNC jSumFooNEstate " + "STYPE int"); @@ -847,7 +844,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return Integer.valueOf(a + b);'"); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".jSumFooNE2(int) " + "SFUNC " + shortFunctionName(f) + " " + "STYPE int " + @@ -1173,12 +1170,12 @@ public class AggregationTest extends CQLTester "SFUNC " + shortFunctionName(fState) + " " + "STYPE int "); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("doesn't exist", "CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " + "SFUNC " + shortFunctionName(a) + " " + "STYPE int "); - assertInvalidMessage("does not exist or is not a scalar function", + assertInvalidMessage("isn't a scalar function", "CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " + "SFUNC " + shortFunctionName(fState) + " " + "STYPE int " + @@ -1444,7 +1441,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return state;'"); - assertInvalidMessage("The function state type should not be frozen", + assertInvalidMessage("cannot be frozen", "CREATE AGGREGATE %s(set<int>) " + "SFUNC " + parseFunctionName(fState).name + ' ' + "STYPE frozen<set<int>> " + @@ -1496,7 +1493,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return state;'"); - assertInvalidMessage("The function state type should not be frozen", + assertInvalidMessage("cannot be frozen", "CREATE AGGREGATE %s(list<int>) " + "SFUNC " + parseFunctionName(fState).name + ' ' + "STYPE frozen<list<int>> " + @@ -1545,7 +1542,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return state;'"); - assertInvalidMessage("The function state type should not be frozen", + assertInvalidMessage("cannot be frozen", "CREATE AGGREGATE %s(map<int, int>) " + "SFUNC " + parseFunctionName(fState).name + ' ' + "STYPE frozen<map<int, int>> " + @@ -1594,7 +1591,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return state;'"); - assertInvalidMessage("The function state type should not be frozen", + assertInvalidMessage("cannot be frozen", "CREATE AGGREGATE %s(tuple<int, int>) " + "SFUNC " + parseFunctionName(fState).name + ' ' + "STYPE frozen<tuple<int, int>> " + @@ -1644,7 +1641,7 @@ public class AggregationTest extends CQLTester "LANGUAGE java " + "AS 'return state;'"); - assertInvalidMessage("The function state type should not be frozen", + assertInvalidMessage("cannot be frozen", "CREATE AGGREGATE %s(" + myType + ") " + "SFUNC " + parseFunctionName(fState).name + ' ' + "STYPE frozen<" + myType + "> " + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
