13426: CREATE OR REPLACE AGGREGATE

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b30c6da2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b30c6da2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b30c6da2

Branch: refs/heads/13426
Commit: b30c6da2da0b69faaa303bc2245b0b7b9a641701
Parents: aaddbd4
Author: Aleksey Yeschenko <[email protected]>
Authored: Fri May 19 13:21:35 2017 +0100
Committer: Aleksey Yeschenko <[email protected]>
Committed: Wed May 31 18:51:15 2017 +0100

----------------------------------------------------------------------
 src/antlr/Cql.g                                 |   1 +
 src/antlr/Parser.g                              |   4 +-
 .../apache/cassandra/auth/FunctionResource.java |   5 +
 .../statements/CreateAggregateStatement.java    | 256 -------------------
 .../schema/CreateAggregateStatement.java        | 136 ++++++++--
 .../schema/CreateFunctionStatement.java         |   1 -
 .../org/apache/cassandra/schema/Functions.java  |   2 +-
 .../cassandra/schema/MigrationManager.java      |   8 -
 .../apache/cassandra/schema/SchemaKeyspace.java |  12 +-
 .../validation/operations/AggregationTest.java  |  41 ++-
 10 files changed, 145 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/antlr/Cql.g
----------------------------------------------------------------------
diff --git a/src/antlr/Cql.g b/src/antlr/Cql.g
index e0d5b3f..576a65d 100644
--- a/src/antlr/Cql.g
+++ b/src/antlr/Cql.g
@@ -48,6 +48,7 @@ import Parser,Lexer;
     import org.apache.cassandra.cql3.statements.*;
     import org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement;
     import org.apache.cassandra.cql3.statements.schema.AlterViewStatement;
+    import 
org.apache.cassandra.cql3.statements.schema.CreateAggregateStatement;
     import org.apache.cassandra.cql3.statements.schema.CreateIndexStatement;
     import org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement;
     import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/antlr/Parser.g
----------------------------------------------------------------------
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index e93dac9..1b230c6 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -630,7 +630,7 @@ batchStatementObjective returns 
[ModificationStatement.Parsed statement]
     | d=deleteStatement  { $statement = d; }
     ;
 
-createAggregateStatement returns [CreateAggregateStatement expr]
+createAggregateStatement returns [CreateAggregateStatement.Raw stmt]
     @init {
         boolean orReplace = false;
         boolean ifNotExists = false;
@@ -655,7 +655,7 @@ createAggregateStatement returns [CreateAggregateStatement 
expr]
       (
         K_INITCOND ival = term
       )?
-      { $expr = new CreateAggregateStatement(fn, argsTypes, sfunc, stype, 
ffunc, ival, orReplace, ifNotExists); }
+      { $stmt = new CreateAggregateStatement.Raw(fn, argsTypes, stype, sfunc, 
ffunc, ival, orReplace, ifNotExists); }
     ;
 
 dropAggregateStatement returns [DropAggregateStatement.Raw stmt]

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/auth/FunctionResource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/FunctionResource.java 
b/src/java/org/apache/cassandra/auth/FunctionResource.java
index 3e112cb..79497ef 100644
--- a/src/java/org/apache/cassandra/auth/FunctionResource.java
+++ b/src/java/org/apache/cassandra/auth/FunctionResource.java
@@ -162,6 +162,11 @@ public class FunctionResource implements IResource
         return new FunctionResource(keyspace, name, abstractTypes);
     }
 
+    public static FunctionResource functionFromCql(FunctionName name, 
List<CQL3Type.Raw> argTypes)
+    {
+        return functionFromCql(name.keyspace, name.name, argTypes);
+    }
+
     /**
      * Parses a resource name into a FunctionResource instance.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
deleted file mode 100644
index 878195f..0000000
--- 
a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3.statements;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Objects;
-import java.util.List;
-
-import org.apache.cassandra.auth.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.schema.MigrationManager;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.Event;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * A {@code CREATE AGGREGATE} statement parsed from a CQL query.
- */
-public final class CreateAggregateStatement extends SchemaAlteringStatement
-{
-    private final boolean orReplace;
-    private final boolean ifNotExists;
-    private FunctionName functionName;
-    private FunctionName stateFunc;
-    private FunctionName finalFunc;
-    private final CQL3Type.Raw stateTypeRaw;
-
-    private final List<CQL3Type.Raw> argRawTypes;
-    private final Term.Raw ival;
-
-    private List<AbstractType<?>> argTypes;
-    private AbstractType<?> returnType;
-    private ScalarFunction stateFunction;
-    private ScalarFunction finalFunction;
-    private ByteBuffer initcond;
-
-    public CreateAggregateStatement(FunctionName functionName,
-                                    List<CQL3Type.Raw> argRawTypes,
-                                    String stateFunc,
-                                    CQL3Type.Raw stateType,
-                                    String finalFunc,
-                                    Term.Raw ival,
-                                    boolean orReplace,
-                                    boolean ifNotExists)
-    {
-        this.functionName = functionName;
-        this.argRawTypes = argRawTypes;
-        this.stateFunc = new FunctionName(functionName.keyspace, stateFunc);
-        this.finalFunc = finalFunc != null ? new 
FunctionName(functionName.keyspace, finalFunc) : null;
-        this.stateTypeRaw = stateType;
-        this.ival = ival;
-        this.orReplace = orReplace;
-        this.ifNotExists = ifNotExists;
-    }
-
-    public CreateAggregateStatement prepare(ClientState state)
-    {
-        argTypes = new ArrayList<>(argRawTypes.size());
-        for (CQL3Type.Raw rawType : argRawTypes)
-            argTypes.add(prepareType("arguments", rawType));
-
-        AbstractType<?> stateType = prepareType("state type", stateTypeRaw);
-
-        List<AbstractType<?>> stateArgs = stateArguments(stateType, argTypes);
-
-        Function f = Schema.instance.findFunction(stateFunc, 
stateArgs).orElse(null);
-        if (!(f instanceof ScalarFunction))
-            throw new InvalidRequestException("State function " + 
stateFuncSig(stateFunc, stateTypeRaw, argRawTypes) + " does not exist or is not 
a scalar function");
-        stateFunction = (ScalarFunction)f;
-
-        AbstractType<?> stateReturnType = stateFunction.returnType();
-        if (!stateReturnType.equals(stateType))
-            throw new InvalidRequestException("State function " + 
stateFuncSig(stateFunction.name(), stateTypeRaw, argRawTypes) + " return type 
must be the same as the first argument type - check STYPE, argument and return 
types");
-
-        if (finalFunc != null)
-        {
-            List<AbstractType<?>> finalArgs = 
Collections.<AbstractType<?>>singletonList(stateType);
-            f = Schema.instance.findFunction(finalFunc, 
finalArgs).orElse(null);
-            if (!(f instanceof ScalarFunction))
-                throw new InvalidRequestException("Final function " + 
finalFunc + '(' + stateTypeRaw + ") does not exist or is not a scalar 
function");
-            finalFunction = (ScalarFunction) f;
-            returnType = finalFunction.returnType();
-        }
-        else
-        {
-            returnType = stateReturnType;
-        }
-
-        if (ival != null)
-        {
-            initcond = Terms.asBytes(functionName.keyspace, ival.toString(), 
stateType);
-
-            if (initcond != null)
-            {
-                try
-                {
-                    stateType.validate(initcond);
-                }
-                catch (MarshalException e)
-                {
-                    throw new InvalidRequestException(String.format("Invalid 
value for INITCOND of type %s%s", stateType.asCQL3Type(),
-                                                                    
e.getMessage() == null ? "" : String.format(" (%s)", e.getMessage())));
-                }
-            }
-
-            // Sanity check that converts the initcond to a CQL literal and 
parse it back to avoid getting in CASSANDRA-11064.
-            String initcondAsCql = 
stateType.asCQL3Type().toCQLLiteral(initcond, ProtocolVersion.CURRENT);
-            assert Objects.equals(initcond, 
Terms.asBytes(functionName.keyspace, initcondAsCql, stateType));
-
-            if (Constants.NULL_LITERAL != ival && 
UDHelper.isNullOrEmpty(stateType, initcond))
-                throw new InvalidRequestException("INITCOND must not be empty 
for all types except TEXT, ASCII, BLOB");
-        }
-
-        return this;
-    }
-
-    private AbstractType<?> prepareType(String typeName, CQL3Type.Raw rawType)
-    {
-        if (rawType.isFrozen())
-            throw new InvalidRequestException(String.format("The function %s 
should not be frozen; remove the frozen<> modifier", typeName));
-
-        AbstractType<?> type = 
rawType.prepare(functionName.keyspace).getType();
-        return type;
-    }
-
-    public void setKeyspace(ClientState state) throws InvalidRequestException
-    {
-        if (!functionName.hasKeyspace() && state.getRawKeyspace() != null)
-            functionName = new FunctionName(state.getKeyspace(), 
functionName.name);
-
-        if (!functionName.hasKeyspace())
-            throw new InvalidRequestException("Functions must be fully 
qualified with a keyspace name if a keyspace is not set for the session");
-
-        Schema.validateKeyspaceNotSystem(functionName.keyspace);
-
-        stateFunc = new FunctionName(functionName.keyspace, stateFunc.name);
-        if (finalFunc != null)
-            finalFunc = new FunctionName(functionName.keyspace, 
finalFunc.name);
-    }
-
-    protected void grantPermissionsToCreator(QueryState state)
-    {
-        try
-        {
-            IResource resource = 
FunctionResource.function(functionName.keyspace, functionName.name, argTypes);
-            
DatabaseDescriptor.getAuthorizer().grant(AuthenticatedUser.SYSTEM_USER,
-                                                     
resource.applicablePermissions(),
-                                                     resource,
-                                                     
RoleResource.role(state.getClientState().getUser().getName()));
-        }
-        catch (RequestExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void authorize(ClientState state) throws UnauthorizedException, 
InvalidRequestException
-    {
-        if (Schema.instance.findFunction(functionName, argTypes).isPresent() 
&& orReplace)
-            state.ensurePermission(Permission.ALTER, 
FunctionResource.function(functionName.keyspace,
-                                                                               
functionName.name,
-                                                                               
argTypes));
-        else
-            state.ensurePermission(Permission.CREATE, 
FunctionResource.keyspace(functionName.keyspace));
-
-        state.ensurePermission(Permission.EXECUTE, stateFunction);
-
-        if (finalFunction != null)
-            state.ensurePermission(Permission.EXECUTE, finalFunction);
-    }
-
-    public void validate(ClientState state) throws InvalidRequestException
-    {
-        if (ifNotExists && orReplace)
-            throw new InvalidRequestException("Cannot use both 'OR REPLACE' 
and 'IF NOT EXISTS' directives");
-
-        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
-            throw new InvalidRequestException(String.format("Cannot add 
aggregate '%s' to non existing keyspace '%s'.", functionName.name, 
functionName.keyspace));
-    }
-
-    public Event.SchemaChange announceMigration(QueryState queryState, boolean 
isLocalOnly) throws RequestValidationException
-    {
-        Function old = Schema.instance.findFunction(functionName, 
argTypes).orElse(null);
-        boolean replaced = old != null;
-        if (replaced)
-        {
-            if (ifNotExists)
-                return null;
-            if (!orReplace)
-                throw new InvalidRequestException(String.format("Function %s 
already exists", old));
-            if (!(old instanceof AggregateFunction))
-                throw new InvalidRequestException(String.format("Aggregate %s 
can only replace an aggregate", old));
-
-            // Means we're replacing the function. We still need to validate 
that 1) it's not a native function and 2) that the return type
-            // matches (or that could break existing code badly)
-            if (old.isNative())
-                throw new InvalidRequestException(String.format("Cannot 
replace native aggregate %s", old));
-            if (!old.returnType().isValueCompatibleWith(returnType))
-                throw new InvalidRequestException(String.format("Cannot 
replace aggregate %s, the new return type %s is not compatible with the return 
type %s of existing function",
-                                                                functionName, 
returnType.asCQL3Type(), old.returnType().asCQL3Type()));
-        }
-
-        if (!stateFunction.isCalledOnNullInput() && initcond == null)
-            throw new InvalidRequestException(String.format("Cannot create 
aggregate %s without INITCOND because state function %s does not accept 'null' 
arguments", functionName, stateFunc));
-
-        UDAggregate udAggregate = new UDAggregate(functionName, argTypes, 
returnType, stateFunction, finalFunction, initcond);
-
-        MigrationManager.announceNewAggregate(udAggregate, isLocalOnly);
-
-        return new Event.SchemaChange(replaced ? 
Event.SchemaChange.Change.UPDATED : Event.SchemaChange.Change.CREATED,
-                                      Event.SchemaChange.Target.AGGREGATE,
-                                      udAggregate.name().keyspace, 
udAggregate.name().name, 
AbstractType.asCQLTypeStringList(udAggregate.argTypes()));
-    }
-
-    private static String stateFuncSig(FunctionName stateFuncName, 
CQL3Type.Raw stateTypeRaw, List<CQL3Type.Raw> argRawTypes)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append(stateFuncName.toString()).append('(').append(stateTypeRaw);
-        for (CQL3Type.Raw argRawType : argRawTypes)
-            sb.append(", ").append(argRawType);
-        sb.append(')');
-        return sb.toString();
-    }
-
-    private static List<AbstractType<?>> stateArguments(AbstractType<?> 
stateType, List<AbstractType<?>> argTypes)
-    {
-        List<AbstractType<?>> r = new ArrayList<>(argTypes.size() + 1);
-        r.add(stateType);
-        r.addAll(argTypes);
-        return r;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
index 04e3c81..ef0df86 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
@@ -21,25 +21,33 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 import org.apache.cassandra.auth.FunctionResource;
 import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.schema.Functions.FunctionsDiff;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.transport.Event.SchemaChange.Target;
 import org.apache.cassandra.transport.ProtocolVersion;
 
 import static java.lang.String.format;
 import static java.lang.String.join;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
 
 import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Iterables.transform;
@@ -76,17 +84,6 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
         this.ifNotExists = ifNotExists;
     }
 
-    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    Set<IResource> createdResources(KeyspacesDiff diff)
-    {
-        return ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, 
aggregateName, rawArgumentTypes));
-    }
-
     public Keyspaces apply(Keyspaces schema)
     {
         if (ifNotExists && orReplace)
@@ -108,17 +105,20 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
          * Resolve the state function
          */
 
-        // TODO replace Lists.transform use
-        List<AbstractType<?>> argumentTypes = 
Lists.transform(rawArgumentTypes, t -> t.prepare(keyspaceName, 
keyspace.types).getType());
+        List<AbstractType<?>> argumentTypes =
+            rawArgumentTypes.stream()
+                            .map(t -> t.prepare(keyspaceName, 
keyspace.types).getType())
+                            .collect(toList());
         AbstractType<?> stateType = rawStateType.prepare(keyspaceName, 
keyspace.types).getType();
         List<AbstractType<?>> stateFunctionArguments = 
Lists.newArrayList(concat(singleton(stateType), argumentTypes));
 
-        Function stateFunction = keyspace.functions.find(stateFunctionName, 
stateFunctionArguments).orElse(null);
-        if (null == stateFunction)
-            throw ire("State function %s does not exist", 
stateFunctionString());
+        Function stateFunction =
+            keyspace.functions
+                    .find(stateFunctionName, stateFunctionArguments)
+                    .orElseThrow(() -> ire("State function %s doesn't exist", 
stateFunctionString()));
 
         if (stateFunction.isAggregate())
-            throw ire("State function %s is not a scalar function", 
stateFunctionString());
+            throw ire("State function %s isn't a scalar function", 
stateFunctionString());
 
         if (!stateFunction.returnType().equals(stateType))
         {
@@ -137,10 +137,10 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
         {
             finalFunction = keyspace.functions.find(finalFunctionName, 
singletonList(stateType)).orElse(null);
             if (null == finalFunction)
-                throw ire("Final function %s does not exist", 
finalFunctionString());
+                throw ire("Final function %s doesn't exist", 
finalFunctionString());
 
             if (finalFunction.isAggregate())
-                throw ire("Final function %s is not a scalar function", 
finalFunctionString());
+                throw ire("Final function %s isn't a scalar function", 
finalFunctionString());
 
             // override return type with that of the final function
             returnType = finalFunction.returnType();
@@ -150,11 +150,10 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
          * Validate initial condition
          */
 
-        // TODO: WTF?
         ByteBuffer initialValue = null;
         if (null != rawInitialValue)
         {
-            initialValue = Terms.asBytes(keyspaceName, 
initialValue.toString(), stateType);
+            initialValue = Terms.asBytes(keyspaceName, 
rawInitialValue.toString(), stateType);
 
             if (null != initialValue)
             {
@@ -170,7 +169,7 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
 
             // Converts initcond to a CQL literal and parse it back to avoid 
another CASSANDRA-11064
             String initialValueString = 
stateType.asCQL3Type().toCQLLiteral(initialValue, ProtocolVersion.CURRENT);
-            assert !Terms.asBytes(keyspaceName, initialValueString, 
stateType).equals(initialValue);
+            assert Objects.equal(initialValue, Terms.asBytes(keyspaceName, 
initialValueString, stateType));
 
             if (Constants.NULL_LITERAL != rawInitialValue && 
UDHelper.isNullOrEmpty(stateType, initialValue))
                 throw ire("INITCOND must not be empty for all types except 
TEXT, ASCII, BLOB");
@@ -201,7 +200,7 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
 
             if (!returnType.isCompatibleWith(existingAggregate.returnType()))
             {
-                throw ire("Cannot replace aggregate '%s', the new return type 
%s is not compatible with the return type %s of existing function",
+                throw ire("Cannot replace aggregate '%s', the new return type 
%s isn't compatible with the return type %s of existing function",
                           aggregateName,
                           returnType.asCQL3Type(),
                           existingAggregate.returnType().asCQL3Type());
@@ -219,6 +218,51 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
         return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.functions.withAddedOrUpdated(aggregate)));
     }
 
+    SchemaChange schemaChangeEvent(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas;
+
+        assert udasDiff.created.size() + udasDiff.altered.size() == 1;
+        boolean created = !udasDiff.created.isEmpty();
+
+        return new SchemaChange(created ? Change.CREATED : Change.UPDATED,
+                                Target.AGGREGATE,
+                                keyspaceName,
+                                aggregateName,
+                                
rawArgumentTypes.stream().map(CQL3Type.Raw::toString).collect(toList()));
+    }
+
+    public void authorize(ClientState client)
+    {
+        FunctionName name = new FunctionName(keyspaceName, aggregateName);
+
+        if (Schema.instance.findFunction(name, 
Lists.transform(rawArgumentTypes, t -> 
t.prepare(keyspaceName).getType())).isPresent() && orReplace)
+            client.ensurePermission(Permission.ALTER, 
FunctionResource.functionFromCql(keyspaceName, aggregateName, 
rawArgumentTypes));
+        else
+            client.ensurePermission(Permission.CREATE, 
FunctionResource.keyspace(keyspaceName));
+
+        FunctionResource stateFunction =
+            FunctionResource.functionFromCql(stateFunctionName, 
Lists.newArrayList(concat(singleton(rawStateType), rawArgumentTypes)));
+        client.ensurePermission(Permission.EXECUTE, stateFunction);
+
+        if (null != finalFunctionName)
+            client.ensurePermission(Permission.EXECUTE, 
FunctionResource.functionFromCql(finalFunctionName, 
singletonList(rawStateType)));
+    }
+
+    @Override
+    Set<IResource> createdResources(KeyspacesDiff diff)
+    {
+        assert diff.altered.size() == 1;
+        FunctionsDiff<UDAggregate> udasDiff = diff.altered.get(0).udas;
+
+        assert udasDiff.created.size() + udasDiff.altered.size() == 1;
+
+        return udasDiff.created.isEmpty()
+             ? ImmutableSet.of()
+             : ImmutableSet.of(FunctionResource.functionFromCql(keyspaceName, 
aggregateName, rawArgumentTypes));
+    }
+
     private String stateFunctionString()
     {
         return format("%s(%s)", stateFunctionName, join(", ", 
transform(concat(singleton(rawStateType), rawArgumentTypes), 
Object::toString)));
@@ -228,4 +272,50 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
     {
         return format("%s(%s)", finalFunctionName, rawStateType);
     }
+
+    public static final class Raw extends CQLStatement.Raw
+    {
+        private final FunctionName aggregateName;
+        private final List<CQL3Type.Raw> rawArgumentTypes;
+        private final CQL3Type.Raw rawStateType;
+        private final String stateFunctionName;
+        private final String finalFunctionName;
+        private final Term.Raw rawInitialValue;
+        private final boolean orReplace;
+        private final boolean ifNotExists;
+
+        public Raw(FunctionName aggregateName,
+                   List<CQL3Type.Raw> rawArgumentTypes,
+                   CQL3Type.Raw rawStateType,
+                   String stateFunctionName,
+                   String finalFunctionName,
+                   Term.Raw rawInitialValue,
+                   boolean orReplace,
+                   boolean ifNotExists)
+        {
+            this.aggregateName = aggregateName;
+            this.rawArgumentTypes = rawArgumentTypes;
+            this.rawStateType = rawStateType;
+            this.stateFunctionName = stateFunctionName;
+            this.finalFunctionName = finalFunctionName;
+            this.rawInitialValue = rawInitialValue;
+            this.orReplace = orReplace;
+            this.ifNotExists = ifNotExists;
+        }
+
+        public CreateAggregateStatement prepare(ClientState state)
+        {
+            String keyspaceName = aggregateName.hasKeyspace() ? 
aggregateName.keyspace : state.getKeyspace();
+
+            return new CreateAggregateStatement(keyspaceName,
+                                                aggregateName.name,
+                                                rawArgumentTypes,
+                                                rawStateType,
+                                                new FunctionName(keyspaceName, 
stateFunctionName),
+                                                null != finalFunctionName ? 
new FunctionName(keyspaceName, finalFunctionName) : null,
+                                                rawInitialValue,
+                                                orReplace,
+                                                ifNotExists);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
index 00b98f5..78fe9d6 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
@@ -167,7 +167,6 @@ public final class CreateFunctionStatement extends 
AlterSchemaStatement
     {
         FunctionName name = new FunctionName(keyspaceName, functionName);
 
-        // TODO: replace lists.transform use
         if (Schema.instance.findFunction(name, 
Lists.transform(rawArgumentTypes, t -> 
t.prepare(keyspaceName).getType())).isPresent() && orReplace)
             client.ensurePermission(Permission.ALTER, 
FunctionResource.functionFromCql(keyspaceName, functionName, rawArgumentTypes));
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/Functions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Functions.java 
b/src/java/org/apache/cassandra/schema/Functions.java
index db5e871..d5a2782 100644
--- a/src/java/org/apache/cassandra/schema/Functions.java
+++ b/src/java/org/apache/cassandra/schema/Functions.java
@@ -245,7 +245,7 @@ public final class Functions implements Iterable<Function>
 
     public Functions withAddedOrUpdated(Function function)
     {
-        return builder().add(Iterables.filter(this, f -> 
!(f.name().equals(function.name())) && Functions.typesMatch(f.argTypes(), 
function.argTypes())))
+        return builder().add(Iterables.filter(this, f -> 
!(f.name().equals(function.name()) && Functions.typesMatch(f.argTypes(), 
function.argTypes()))))
                         .add(function)
                         .build();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index a9f69f4..e970552 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.UserType;
@@ -307,13 +306,6 @@ public class MigrationManager
         announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
     }
 
-    public static void announceNewAggregate(UDAggregate udf, boolean 
announceLocally)
-    {
-        logger.info("Create aggregate function '{}'", udf.name());
-        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(udf.name().keyspace);
-        announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, 
FBUtilities.timestampMicros()), announceLocally);
-    }
-
     static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
     {
         ksm.validate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 19b0bb7..6be9df0 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -292,7 +292,11 @@ public final class SchemaKeyspace
             kd.views.dropped.forEach(v -> addDropViewToSchemaMutation(v, 
builder));
             kd.views.altered.forEach(vd -> 
addAlterViewToSchemaMutation(vd.before, vd.after, builder));
 
+            kd.udfs.created.forEach(f -> 
addFunctionToSchemaMutation((UDFunction) f, builder));
             kd.udfs.dropped.forEach(f -> 
addDropFunctionToSchemaMutation((UDFunction) f, builder));
+
+            kd.udas.created.forEach(a -> 
addAggregateToSchemaMutation((UDAggregate) a, builder));
+            kd.udas.altered.forEach(ad -> 
addAggregateToSchemaMutation(ad.after, builder));
             kd.udas.dropped.forEach(a -> 
addDropAggregateToSchemaMutation((UDAggregate) a, builder));
 
             mutations.put(ks.name, builder.build());
@@ -846,14 +850,6 @@ public final class SchemaKeyspace
         builder.update(Functions).row(function.name().name, 
function.argumentsList()).delete();
     }
 
-    static Mutation.SimpleBuilder makeCreateAggregateMutation(KeyspaceMetadata 
keyspace, UDAggregate aggregate, long timestamp)
-    {
-        // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
-        Mutation.SimpleBuilder builder = 
makeCreateKeyspaceMutation(keyspace.name, keyspace.params, timestamp);
-        addAggregateToSchemaMutation(aggregate, builder);
-        return builder;
-    }
-
     private static void addAggregateToSchemaMutation(UDAggregate aggregate, 
Mutation.SimpleBuilder builder)
     {
         builder.update(Aggregates)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b30c6da2/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
index 5489e4d..3e14854 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/AggregationTest.java
@@ -41,18 +41,15 @@ import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.spi.TurboFilterList;
 import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
 import ch.qos.logback.classic.turbo.TurboFilter;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.UntypedResultSet.Row;
-import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.FunctionExecutionException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.transport.ProtocolVersion;
@@ -448,7 +445,7 @@ public class AggregationTest extends CQLTester
         schemaChange("CREATE OR REPLACE AGGREGATE " + a + "(double) " +
                      "SFUNC " + shortFunctionName(f) + " " +
                      "STYPE double " +
-                     "INITCOND 0");
+                     "INITCOND 1");
 
         assertLastSchemaChange(Event.SchemaChange.Change.UPDATED, 
Event.SchemaChange.Target.AGGREGATE,
                                KEYSPACE, parseFunctionName(a).name,
@@ -689,37 +686,37 @@ public class AggregationTest extends CQLTester
                                         "LANGUAGE java " +
                                         "AS 'return a.toString();'");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(double)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE double " +
                              "FINALFUNC " + shortFunctionName(fFinal));
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(int)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE double " +
                              "FINALFUNC " + shortFunctionName(fFinal));
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(double)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE int " +
                              "FINALFUNC " + shortFunctionName(fFinal));
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(double)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE int");
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(int)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE double");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(double)" +
                              "SFUNC " + shortFunctionName(fState2) + " " +
                              "STYPE double " +
                              "FINALFUNC " + shortFunctionName(fFinal));
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(double)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE double " +
@@ -745,13 +742,13 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return a.toString();'");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(int)" +
                              "SFUNC " + shortFunctionName(fState) + 
"_not_there " +
                              "STYPE int " +
                              "FINALFUNC " + shortFunctionName(fFinal));
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".aggrInvalid(int)" +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE int " +
@@ -834,7 +831,7 @@ public class AggregationTest extends CQLTester
     @Test
     public void testJavaAggregateWithoutStateOrFinal() throws Throwable
     {
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".jSumFooNE1(int) " +
                              "SFUNC jSumFooNEstate " +
                              "STYPE int");
@@ -847,7 +844,7 @@ public class AggregationTest extends CQLTester
                                   "LANGUAGE java " +
                                   "AS 'return Integer.valueOf(a + b);'");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + 
".jSumFooNE2(int) " +
                              "SFUNC " + shortFunctionName(f) + " " +
                              "STYPE int " +
@@ -1173,12 +1170,12 @@ public class AggregationTest extends CQLTester
                                    "SFUNC " + shortFunctionName(fState) + " " +
                                    "STYPE int ");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("doesn't exist",
                              "CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
                              "SFUNC " + shortFunctionName(a) + " " +
                              "STYPE int ");
 
-        assertInvalidMessage("does not exist or is not a scalar function",
+        assertInvalidMessage("isn't a scalar function",
                              "CREATE AGGREGATE " + KEYSPACE + ".aggInv(int) " +
                              "SFUNC " + shortFunctionName(fState) + " " +
                              "STYPE int " +
@@ -1444,7 +1441,7 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return state;'");
 
-        assertInvalidMessage("The function state type should not be frozen",
+        assertInvalidMessage("cannot be frozen",
                              "CREATE AGGREGATE %s(set<int>) " +
                              "SFUNC " + parseFunctionName(fState).name + ' ' +
                              "STYPE frozen<set<int>> " +
@@ -1496,7 +1493,7 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return state;'");
 
-        assertInvalidMessage("The function state type should not be frozen",
+        assertInvalidMessage("cannot be frozen",
                              "CREATE AGGREGATE %s(list<int>) " +
                              "SFUNC " + parseFunctionName(fState).name + ' ' +
                              "STYPE frozen<list<int>> " +
@@ -1545,7 +1542,7 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return state;'");
 
-        assertInvalidMessage("The function state type should not be frozen",
+        assertInvalidMessage("cannot be frozen",
                              "CREATE AGGREGATE %s(map<int, int>) " +
                              "SFUNC " + parseFunctionName(fState).name + ' ' +
                              "STYPE frozen<map<int, int>> " +
@@ -1594,7 +1591,7 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return state;'");
 
-        assertInvalidMessage("The function state type should not be frozen",
+        assertInvalidMessage("cannot be frozen",
                              "CREATE AGGREGATE %s(tuple<int, int>) " +
                              "SFUNC " + parseFunctionName(fState).name + ' ' +
                              "STYPE frozen<tuple<int, int>> " +
@@ -1644,7 +1641,7 @@ public class AggregationTest extends CQLTester
                                        "LANGUAGE java " +
                                        "AS 'return state;'");
 
-        assertInvalidMessage("The function state type should not be frozen",
+        assertInvalidMessage("cannot be frozen",
                              "CREATE AGGREGATE %s(" + myType + ") " +
                              "SFUNC " + parseFunctionName(fState).name + ' ' +
                              "STYPE frozen<" + myType + "> " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to