Support pure user-defined functions Patch by Robert Stupp; reviewed by Tyler Hobbs for CASSANDRA-7395
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25411bf1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25411bf1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25411bf1 Branch: refs/heads/trunk Commit: 25411bf1d15a35bf17002cf7664173357c6dc6cf Parents: 2f25e6e Author: Robert Stupp <sn...@snazy.de> Authored: Fri Aug 8 14:11:01 2014 -0500 Committer: Tyler Hobbs <ty...@datastax.com> Committed: Fri Aug 8 14:11:01 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 2 +- pylib/cqlshlib/cql3handling.py | 2 +- src/java/org/apache/cassandra/auth/Auth.java | 12 + .../org/apache/cassandra/config/CFMetaData.java | 14 +- .../org/apache/cassandra/config/KSMetaData.java | 1 + .../org/apache/cassandra/config/UFMetaData.java | 309 +++++++++++++++++++ .../cassandra/cql3/AssignementTestable.java | 4 +- src/java/org/apache/cassandra/cql3/Cql.g | 73 ++++- .../org/apache/cassandra/cql3/TypeCast.java | 2 +- .../cassandra/cql3/functions/FunctionCall.java | 26 +- .../cassandra/cql3/functions/Functions.java | 7 +- .../statements/CreateFunctionStatement.java | 180 +++++++++++ .../cql3/statements/DropFunctionStatement.java | 94 ++++++ .../statements/SchemaAlteringStatement.java | 6 +- .../cassandra/cql3/statements/Selectable.java | 9 +- .../cassandra/cql3/statements/Selection.java | 21 +- .../cql3/udf/UDFFunctionOverloads.java | 87 ++++++ .../apache/cassandra/cql3/udf/UDFRegistry.java | 146 +++++++++ .../apache/cassandra/cql3/udf/UDFunction.java | 178 +++++++++++ .../org/apache/cassandra/db/DefsTables.java | 83 +++++ .../org/apache/cassandra/db/SystemKeyspace.java | 14 +- .../cassandra/service/CassandraDaemon.java | 4 + .../cassandra/service/IMigrationListener.java | 4 + .../cassandra/service/MigrationManager.java | 42 +++ .../org/apache/cassandra/transport/Event.java | 40 +-- .../org/apache/cassandra/transport/Server.java | 12 + test/unit/org/apache/cassandra/cql3/UFTest.java | 186 +++++++++++ 28 files changed, 1517 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b33399b..f6285fe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Support pure user-defined functions (CASSANDRA-7395) * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) * Move sstable RandomAccessReader to nio2, which allows using the FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index e904ca2..057bf3d 100644 --- a/build.xml +++ b/build.xml @@ -1461,7 +1461,7 @@ </java> </target> - <target name="javadoc" depends="init" description="Create javadoc"> + <target name="javadoc" depends="init" description="Create javadoc" unless="no-javadoc"> <create-javadoc destdir="${javadoc.dir}"> <filesets> <fileset dir="${build.src.java}" defaultexcludes="yes"> http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 72461db..d912c67 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -212,7 +212,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ; <mapLiteral> ::= "{" <term> ":" <term> ( "," <term> ":" <term> )* "}" ; -<functionName> ::= <identifier> +<functionName> ::= ( <identifier> ":" ":" )? <identifier> ; <statementBody> ::= <useStatement> http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/auth/Auth.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java index 7c532b0..bc23d05 100644 --- a/src/java/org/apache/cassandra/auth/Auth.java +++ b/src/java/org/apache/cassandra/auth/Auth.java @@ -291,6 +291,10 @@ public class Auth { } + public void onDropFunction(String namespace, String functionName) + { + } + public void onCreateKeyspace(String ksName) { } @@ -303,6 +307,10 @@ public class Auth { } + public void onCreateFunction(String namespace, String functionName) + { + } + public void onUpdateKeyspace(String ksName) { } @@ -314,5 +322,9 @@ public class Auth public void onUpdateUserType(String ksName, String userType) { } + + public void onUpdateFunction(String namespace, String functionName) + { + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 5a347f7..37f586d 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -213,6 +213,19 @@ public final class CFMetaData + "PRIMARY KEY (keyspace_name, type_name)" + ") WITH COMMENT='Defined user types' AND gc_grace_seconds=604800"); + public static final CFMetaData SchemaFunctionsCf = compile("CREATE TABLE " + SystemKeyspace.SCHEMA_FUNCTIONS_CF + " (" + + "namespace text," + + "name text," + + "signature text," + + "argument_names list<text>," + + "argument_types list<text>," + + "return_type text," + + "deterministic boolean," + + "language text," + + "body text," + + "primary key ((namespace, name), signature)" + + ") WITH COMMENT='user defined functions' AND gc_grace_seconds=604800"); + public static final CFMetaData HintsCf = compile("CREATE TABLE " + SystemKeyspace.HINTS_CF + " (" + "target_id uuid," + "hint_id timeuuid," @@ -331,7 +344,6 @@ public final class CFMetaData + "PRIMARY KEY (id)" + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800"); - public static class SpeculativeRetry { public enum RetryType http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index 8c99191..64ac3ff 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -101,6 +101,7 @@ public final class KSMetaData CFMetaData.SchemaColumnsCf, CFMetaData.SchemaTriggersCf, CFMetaData.SchemaUserTypesCf, + CFMetaData.SchemaFunctionsCf, CFMetaData.CompactionLogCf, CFMetaData.CompactionHistoryCf, CFMetaData.PaxosCf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/config/UFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/UFMetaData.java b/src/java/org/apache/cassandra/config/UFMetaData.java new file mode 100644 index 0000000..18484f3 --- /dev/null +++ b/src/java/org/apache/cassandra/config/UFMetaData.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import org.antlr.runtime.ANTLRStringStream; +import org.antlr.runtime.CharStream; +import org.antlr.runtime.CommonTokenStream; +import org.antlr.runtime.RecognitionException; +import org.antlr.runtime.TokenStream; +import org.apache.cassandra.cql3.AssignementTestable; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.CqlLexer; +import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.udf.UDFFunctionOverloads; +import org.apache.cassandra.cql3.udf.UDFRegistry; +import org.apache.cassandra.db.CFRowAdder; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.SyntaxException; + +/** + * Defined (and loaded) user functions. + * <p/> + * In practice, because user functions are global, we have only one instance of + * this class that retrieve through the Schema class. + */ +public final class UFMetaData +{ + public final String namespace; + public final String functionName; + public final String qualifiedName; + public final String returnType; + public final List<String> argumentNames; + public final List<String> argumentTypes; + public final String language; + public final String body; + public final boolean deterministic; + + public final String signature; + public final List<CQL3Type> cqlArgumentTypes; + public final CQL3Type cqlReturnType; + + static final CompositeType partKey = (CompositeType) CFMetaData.SchemaFunctionsCf.getKeyValidator(); + + // TODO tracking "valid" status via an exception field is really bad style - but we need some way to mark a function as "dead" + public InvalidRequestException invalid; + + public UFMetaData(String namespace, String functionName, boolean deterministic, List<String> argumentNames, + List<String> argumentTypes, String returnType, String language, String body) + { + this.namespace = namespace != null ? namespace.toLowerCase() : ""; + this.functionName = functionName.toLowerCase(); + this.qualifiedName = qualifiedName(namespace, functionName); + this.returnType = returnType; + this.argumentNames = argumentNames; + this.argumentTypes = argumentTypes; + this.language = language == null ? "class" : language.toLowerCase(); + this.body = body; + this.deterministic = deterministic; + + this.cqlArgumentTypes = new ArrayList<>(argumentTypes.size()); + InvalidRequestException inv = null; + CQL3Type rt = null; + try + { + rt = parseCQLType(returnType); + for (String argumentType : argumentTypes) + cqlArgumentTypes.add(parseCQLType(argumentType)); + } + catch (InvalidRequestException e) + { + inv = e; + } + this.invalid = inv; + this.cqlReturnType = rt; + + StringBuilder signature = new StringBuilder(); + signature.append(qualifiedName); + for (String argumentType : argumentTypes) + { + signature.append(','); + signature.append(argumentType); + } + this.signature = signature.toString(); + } + + public boolean compatibleArgs(String ksName, String cfName, List<? extends AssignementTestable> providedArgs) + { + int cnt = cqlArgumentTypes.size(); + if (cnt != providedArgs.size()) + return false; + for (int i = 0; i < cnt; i++) + { + AssignementTestable provided = providedArgs.get(i); + + if (provided == null) + continue; + + AbstractType<?> argType = cqlArgumentTypes.get(i).getType(); + + ColumnSpecification expected = makeArgSpec(ksName, cfName, argType, i); + if (!provided.isAssignableTo(ksName, expected)) + return false; + } + + return true; + } + + public ColumnSpecification makeArgSpec(String ksName, String cfName, AbstractType<?> argType, int i) + { + return new ColumnSpecification(ksName, + cfName, + new ColumnIdentifier("arg" + i + "(" + qualifiedName + ")", true), argType); + } + + private static CQL3Type parseCQLType(String cqlType) + throws InvalidRequestException + { + CharStream stream = new ANTLRStringStream(cqlType); + CqlLexer lexer = new CqlLexer(stream); + + TokenStream tokenStream = new CommonTokenStream(lexer); + CqlParser parser = new CqlParser(tokenStream); + try + { + CQL3Type.Raw rawType = parser.comparatorType(); + // TODO CASSANDRA-7563 use appropiate keyspace here ... keyspace must be fully qualified + CQL3Type t = rawType.prepare(null); + // TODO CASSANDRA-7563 support "complex" types (UDT, tuples, collections), remove catch-NPE below + if (!(t instanceof CQL3Type.Native)) + throw new InvalidRequestException("non-native CQL type '" + cqlType + "' not supported"); + return t; + } + catch (NullPointerException | InvalidRequestException | RecognitionException e) + { + throw new InvalidRequestException("invalid CQL type '" + cqlType + "'"); + } + } + + public static String qualifiedName(String namespace, String functionName) + { + if (namespace == null) + return "::" + functionName; + return (namespace + "::" + functionName).toLowerCase(); + } + + public static Mutation dropFunction(long timestamp, String namespace, String functionName) + { + UDFFunctionOverloads sigMap = UDFRegistry.getFunctionSigMap(UFMetaData.qualifiedName(namespace, functionName)); + if (sigMap == null || sigMap.isEmpty()) + return null; + + Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(namespace, functionName)); + ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF); + + int ldt = (int) (System.currentTimeMillis() / 1000); + for (UFMetaData f : sigMap.values()) + udfRemove(timestamp, cf, ldt, f); + + return mutation; + } + + private static Composite udfSignatureKey(UFMetaData function) + { + return CFMetaData.SchemaFunctionsCf.comparator.make(function.signature); + } + + private static void udfRemove(long timestamp, ColumnFamily cf, int ldt, UFMetaData f) + { + Composite prefix = udfSignatureKey(f); + cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + } + + public static Mutation createOrReplaceFunction(long timestamp, UFMetaData f) + throws ConfigurationException, SyntaxException + { + Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, partKey.decompose(f.namespace, f.functionName)); + ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_CF); + + Composite prefix = udfSignatureKey(f); + CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); + + adder.resetCollection("argument_names"); + adder.resetCollection("argument_types"); + adder.add("name", f.functionName); + adder.add("return_type", f.returnType); + adder.add("language", f.language); + adder.add("body", f.body); + adder.add("deterministic", f.deterministic); + + for (String argName : f.argumentNames) + adder.addListEntry("argument_names", argName); + for (String argType : f.argumentTypes) + adder.addListEntry("argument_types", argType); + + return mutation; + } + + public static UFMetaData fromSchema(UntypedResultSet.Row row) + { + String namespace = row.getString("namespace"); + String name = row.getString("name"); + List<String> argumentNames = row.getList("argument_names", UTF8Type.instance); + List<String> argumentTypes = row.getList("argument_types", UTF8Type.instance); + String returnType = row.getString("return_type"); + boolean deterministic = row.getBoolean("deterministic"); + String language = row.getString("language"); + String body = row.getString("body"); + + return new UFMetaData(namespace, name, deterministic, argumentNames, argumentTypes, returnType, language, body); + } + + public static Map<String, UFMetaData> fromSchema(Row row) + { + UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_CF, row); + Map<String, UFMetaData> udfs = new HashMap<>(results.size()); + for (UntypedResultSet.Row result : results) + { + UFMetaData udf = fromSchema(result); + udfs.put(udf.signature, udf); + } + return udfs; + } + + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + UFMetaData that = (UFMetaData) o; + if (!signature.equals(that.signature)) + return false; + if (deterministic != that.deterministic) + return false; + if (argumentNames != null ? !argumentNames.equals(that.argumentNames) : that.argumentNames != null) + return false; + if (body != null ? !body.equals(that.body) : that.body != null) + return false; + if (!namespace.equals(that.namespace)) + return false; + if (!language.equals(that.language)) + return false; + if (returnType != null ? !returnType.equals(that.returnType) : that.returnType != null) + return false; + + return true; + } + + public int hashCode() + { + int result = signature.hashCode(); + result = 31 * result + (returnType != null ? returnType.hashCode() : 0); + result = 31 * result + (argumentNames != null ? argumentNames.hashCode() : 0); + result = 31 * result + (argumentTypes.hashCode()); + result = 31 * result + (language.hashCode()); + result = 31 * result + (body != null ? body.hashCode() : 0); + result = 31 * result + (deterministic ? 1 : 0); + return result; + } + + public String toString() + { + return new ToStringBuilder(this) + .append("signature", signature) + .append("returnType", returnType) + .append("deterministic", deterministic) + .append("language", language) + .append("body", body) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/AssignementTestable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/AssignementTestable.java b/src/java/org/apache/cassandra/cql3/AssignementTestable.java index 2253cf7..02b3013 100644 --- a/src/java/org/apache/cassandra/cql3/AssignementTestable.java +++ b/src/java/org/apache/cassandra/cql3/AssignementTestable.java @@ -17,12 +17,10 @@ */ package org.apache.cassandra.cql3; -import org.apache.cassandra.exceptions.InvalidRequestException; - public interface AssignementTestable { /** * @return whether this object can be assigned to the provided receiver */ - public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException; + public boolean isAssignableTo(String keyspace, ColumnSpecification receiver); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index 268bce5..96a668b 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -242,6 +242,8 @@ cqlStatement returns [ParsedStatement stmt] | st25=createTypeStatement { $stmt = st25; } | st26=alterTypeStatement { $stmt = st26; } | st27=dropTypeStatement { $stmt = st27; } + | st28=createFunctionStatement { $stmt = st28; } + | st29=dropFunctionStatement { $stmt = st29; } ; /* @@ -298,7 +300,8 @@ unaliasedSelector returns [Selectable s] : ( c=cident { tmp = c; } | K_WRITETIME '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL(c, true); } | K_TTL '(' c=cident ')' { tmp = new Selectable.WritetimeOrTTL(c, false); } - | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(f, args); } + | f=functionName args=selectionFunctionArgs { tmp = new Selectable.WithFunction("", f, args); } + | bn=udfName '::' fn=udfName args=selectionFunctionArgs { tmp = new Selectable.WithFunction(bn, fn, args); } ) ( '.' fi=cident { tmp = new Selectable.WithFieldSelection(tmp, fi); } )* { $s = tmp; } ; @@ -485,6 +488,48 @@ batchStatementObjective returns [ModificationStatement.Parsed statement] | d=deleteStatement { $statement = d; } ; +createFunctionStatement returns [CreateFunctionStatement expr] + @init { + boolean orReplace = false; + boolean ifNotExists = false; + + boolean deterministic = true; + String language = "CLASS"; + String bodyOrClassName = null; + List<CreateFunctionStatement.Argument> args = new ArrayList<CreateFunctionStatement.Argument>(); + } + : K_CREATE (K_OR K_REPLACE { orReplace = true; })? + ((K_NON { deterministic = false; })? K_DETERMINISTIC)? + K_FUNCTION + (K_IF K_NOT K_EXISTS { ifNotExists = true; })? + ( bn=udfName '::' )? + fn=udfName + '(' + ( + k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); } + ( ',' k=cident v=comparatorType { args.add(new CreateFunctionStatement.Argument(k, v)); } )* + )? + ')' + K_RETURNS + rt=comparatorType + ( + ( { language="CLASS"; } cls = STRING_LITERAL { bodyOrClassName = $cls.text; } ) + | ( K_LANGUAGE l = IDENT { language=$l.text; } K_BODY body = ((~K_END_BODY)*) { bodyOrClassName = $body.text; } K_END_BODY ) + ) + { $expr = new CreateFunctionStatement(bn, fn, language, bodyOrClassName, deterministic, rt, args, orReplace, ifNotExists); } + ; + +dropFunctionStatement returns [DropFunctionStatement expr] + @init { + boolean ifExists = false; + } + : K_DROP K_FUNCTION + (K_IF K_EXISTS { ifExists = true; } )? + ( bn=udfName '::' )? + fn=udfName + { $expr = new DropFunctionStatement(bn, fn, ifExists); } + ; + /** * CREATE KEYSPACE [IF NOT EXISTS] <KEYSPACE> WITH attr1 = value1 AND attr2 = value2; */ @@ -917,6 +962,11 @@ functionName returns [String s] | K_TOKEN { $s = "token"; } ; +udfName returns [String s] + : f=IDENT { $s = $f.text; } + | u=unreserved_function_keyword { $s = u; } + ; + functionArgs returns [List<Term.Raw> a] : '(' ')' { $a = Collections.emptyList(); } | '(' t1=term { List<Term.Raw> args = new ArrayList<Term.Raw>(); args.add(t1); } @@ -926,7 +976,8 @@ functionArgs returns [List<Term.Raw> a] term returns [Term.Raw term] : v=value { $term = v; } - | f=functionName args=functionArgs { $term = new FunctionCall.Raw(f, args); } + | f=functionName args=functionArgs { $term = new FunctionCall.Raw("", f, args); } + | bn=udfName '::' fn=udfName args=functionArgs { $term = new FunctionCall.Raw(bn, fn, args); } | '(' c=comparatorType ')' t=term { $term = new TypeCast(c, t); } ; @@ -1180,10 +1231,16 @@ basic_unreserved_keyword returns [String str] | K_DISTINCT | K_CONTAINS | K_STATIC + | K_FUNCTION + | K_RETURNS + | K_LANGUAGE + | K_NON + | K_DETERMINISTIC + | K_BODY + | K_END_BODY ) { $str = $k.text; } ; - // Case-insensitive keywords K_SELECT: S E L E C T; K_FROM: F R O M; @@ -1287,6 +1344,16 @@ K_TUPLE: T U P L E; K_TRIGGER: T R I G G E R; K_STATIC: S T A T I C; +K_FUNCTION: F U N C T I O N; +K_RETURNS: R E T U R N S; +K_LANGUAGE: L A N G U A G E; +K_NON: N O N; +K_OR: O R; +K_REPLACE: R E P L A C E; +K_DETERMINISTIC: D E T E R M I N I S T I C; +K_END_BODY: E N D '_' B O D Y; +K_BODY: B O D Y; + // Case-insensitive alpha characters fragment A: ('a'|'A'); fragment B: ('b'|'B'); http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/TypeCast.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/TypeCast.java b/src/java/org/apache/cassandra/cql3/TypeCast.java index e325e4d..3250e3b 100644 --- a/src/java/org/apache/cassandra/cql3/TypeCast.java +++ b/src/java/org/apache/cassandra/cql3/TypeCast.java @@ -46,7 +46,7 @@ public class TypeCast implements Term.Raw return new ColumnSpecification(receiver.ksName, receiver.cfName, new ColumnIdentifier(toString(), true), type.prepare(keyspace).getType()); } - public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException + public boolean isAssignableTo(String keyspace, ColumnSpecification receiver) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java index a0c7447..fe2c2ee 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.udf.UDFunction; +import org.apache.cassandra.cql3.udf.UDFRegistry; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.ListType; @@ -93,18 +95,33 @@ public class FunctionCall extends Term.NonTerminal public static class Raw implements Term.Raw { + private final String namespace; private final String functionName; private final List<Term.Raw> terms; - public Raw(String functionName, List<Term.Raw> terms) + public Raw(String namespace, String functionName, List<Term.Raw> terms) { + this.namespace = namespace; this.functionName = functionName; this.terms = terms; } public Term prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException { - Function fun = Functions.get(keyspace, functionName, terms, receiver); + Function fun = null; + if (namespace.isEmpty()) + fun = Functions.get(keyspace, functionName, terms, receiver); + + if (fun == null) + { + UDFunction udf = UDFRegistry.resolveFunction(namespace, functionName, receiver.ksName, receiver.cfName, terms); + if (udf != null) + // got a user defined function to call + fun = udf.create(terms); + } + + if (fun == null) + throw new InvalidRequestException(String.format("Unknown function %s called", namespace.isEmpty() ? functionName : namespace + "::" + functionName)); List<Term> parameters = new ArrayList<Term>(terms.size()); boolean allTerminal = true; @@ -149,10 +166,13 @@ public class FunctionCall extends Term.NonTerminal public String toString() { StringBuilder sb = new StringBuilder(); + if (!namespace.isEmpty()) + sb.append(namespace).append("::"); sb.append(functionName).append("("); for (int i = 0; i < terms.size(); i++) { - if (i > 0) sb.append(", "); + if (i > 0) + sb.append(", "); sb.append(terms.get(i)); } return sb.append(")").toString(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/functions/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/Functions.java b/src/java/org/apache/cassandra/cql3/functions/Functions.java index 605e7b3..03dd13d 100644 --- a/src/java/org/apache/cassandra/cql3/functions/Functions.java +++ b/src/java/org/apache/cassandra/cql3/functions/Functions.java @@ -62,6 +62,11 @@ public abstract class Functions declared.put("blobasvarchar", AbstractFunction.factory(BytesConversionFcts.BlobAsVarcharFact)); } + public static boolean contains(String functionName) + { + return declared.containsKey(functionName); + } + public static AbstractType<?> getReturnType(String functionName, String ksName, String cfName) { List<Function.Factory> factories = declared.get(functionName.toLowerCase()); @@ -82,7 +87,7 @@ public abstract class Functions { List<Function.Factory> factories = declared.get(name.toLowerCase()); if (factories.isEmpty()) - throw new InvalidRequestException(String.format("Unknown CQL3 function %s called", name)); + return null; // Fast path if there is not choice if (factories.size() == 1) http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java new file mode 100644 index 0000000..094c318 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.functions.Functions; +import org.apache.cassandra.cql3.udf.UDFRegistry; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.UnauthorizedException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.Event; +import org.apache.cassandra.transport.messages.ResultMessage; + +/** + * A <code>CREATE FUNCTION</code> statement parsed from a CQL query. + */ +public final class CreateFunctionStatement extends SchemaAlteringStatement +{ + final boolean orReplace; + final boolean ifNotExists; + final String namespace; + final String functionName; + final String qualifiedName; + final String language; + final String body; + final boolean deterministic; + final CQL3Type.Raw returnType; + final List<Argument> arguments; + + private UFMetaData ufMeta; + + public CreateFunctionStatement(String namespace, String functionName, String language, String body, boolean deterministic, + CQL3Type.Raw returnType, List<Argument> arguments, boolean orReplace, boolean ifNotExists) + { + super(); + this.namespace = namespace != null ? namespace : ""; + this.functionName = functionName; + this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName); + this.language = language; + this.body = body; + this.deterministic = deterministic; + this.returnType = returnType; + this.arguments = arguments; + assert functionName != null : "null function name"; + assert language != null : "null function language"; + assert body != null : "null function body"; + assert returnType != null : "null function returnType"; + assert arguments != null : "null function arguments"; + this.orReplace = orReplace; + this.ifNotExists = ifNotExists; + } + + public void checkAccess(ClientState state) throws UnauthorizedException + { + // TODO CASSANDRA-7557 (function DDL permission) + + state.hasAllKeyspacesAccess(Permission.CREATE); + } + + /** + * The <code>CqlParser</code> only goes as far as extracting the keyword arguments + * from these statements, so this method is responsible for processing and + * validating. + * + * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable + */ + public void validate(ClientState state) throws RequestValidationException + { + if (!namespace.isEmpty() && !namespace.matches("\\w+")) + throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName)); + if (!functionName.matches("\\w+")) + throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName)); + if (namespace.length() > Schema.NAME_LENGTH) + throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName)); + if (functionName.length() > Schema.NAME_LENGTH) + throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName)); + } + + public Event.SchemaChange changeEvent() + { + return null; + } + + public ResultMessage executeInternal(QueryState state, QueryOptions options) + { + try + { + doExecute(); + return super.executeInternal(state, options); + } + catch (RequestValidationException e) + { + throw new RuntimeException(e); + } + } + + public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException + { + doExecute(); + return super.execute(state, options); + } + + private void doExecute() throws RequestValidationException + { + boolean exists = UDFRegistry.hasFunction(qualifiedName); + if (exists && ifNotExists) + throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName)); + if (exists && !orReplace) + throw new InvalidRequestException(String.format("Function '%s' already exists.", qualifiedName)); + + if (namespace.isEmpty() && Functions.contains(functionName)) + throw new InvalidRequestException(String.format("Function name '%s' is reserved by CQL.", qualifiedName)); + + List<Argument> args = arguments; + List<String> argumentNames = new ArrayList<>(args.size()); + List<String> argumentTypes = new ArrayList<>(args.size()); + for (Argument arg : args) + { + argumentNames.add(arg.getName().toString()); + argumentTypes.add(arg.getType().toString()); + } + this.ufMeta = new UFMetaData(namespace, functionName, deterministic, argumentNames, argumentTypes, + returnType.toString(), language, body); + + UDFRegistry.tryCreateFunction(ufMeta); + } + + public void announceMigration(boolean isLocalOnly) throws RequestValidationException + { + MigrationManager.announceNewFunction(ufMeta, isLocalOnly); + } + + public static final class Argument + { + final ColumnIdentifier name; + final CQL3Type.Raw type; + + public Argument(ColumnIdentifier name, CQL3Type.Raw type) + { + this.name = name; + this.type = type; + } + + public ColumnIdentifier getName() + { + return name; + } + + public CQL3Type.Raw getType() + { + return type; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java new file mode 100644 index 0000000..7627ab4 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/statements/DropFunctionStatement.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.statements; + +import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.UnauthorizedException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.transport.Event; + +/** + * A <code>DROP FUNCTION</code> statement parsed from a CQL query. + */ +public final class DropFunctionStatement extends SchemaAlteringStatement +{ + private final String namespace; + private final String functionName; + private final String qualifiedName; + private final boolean ifExists; + + public DropFunctionStatement(String namespace, String functionName, boolean ifExists) + { + super(); + this.namespace = namespace == null ? "" : namespace; + this.functionName = functionName; + this.qualifiedName = UFMetaData.qualifiedName(namespace, functionName); + this.ifExists = ifExists; + } + + public void checkAccess(ClientState state) throws UnauthorizedException + { + // TODO CASSANDRA-7557 (function DDL permission) + + state.hasAllKeyspacesAccess(Permission.DROP); + } + + /** + * The <code>CqlParser</code> only goes as far as extracting the keyword arguments + * from these statements, so this method is responsible for processing and + * validating. + * + * @throws org.apache.cassandra.exceptions.InvalidRequestException if arguments are missing or unacceptable + */ + public void validate(ClientState state) throws RequestValidationException + { + if (!namespace.isEmpty() && !namespace.matches("\\w+")) + throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName)); + if (!functionName.matches("\\w+")) + throw new InvalidRequestException(String.format("\"%s\" is not a valid function name", qualifiedName)); + if (namespace.length() > Schema.NAME_LENGTH) + throw new InvalidRequestException(String.format("UDF namespace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName)); + if (functionName.length() > Schema.NAME_LENGTH) + throw new InvalidRequestException(String.format("UDF function names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, qualifiedName)); + } + + public Event.SchemaChange changeEvent() + { + return null; + } + + // no execute() - drop propagated via MigrationManager + + public void announceMigration(boolean isLocalOnly) throws RequestValidationException + { + try + { + MigrationManager.announceFunctionDrop(namespace, functionName, isLocalOnly); + } + catch (InvalidRequestException e) + { + if (!ifExists) + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index e70aac9..876568a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@ -70,7 +70,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException { announceMigration(false); - return new ResultMessage.SchemaChange(changeEvent()); + Event.SchemaChange ce = changeEvent(); + return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce); } public ResultMessage executeInternal(QueryState state, QueryOptions options) @@ -78,7 +79,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL try { announceMigration(true); - return new ResultMessage.SchemaChange(changeEvent()); + Event.SchemaChange ce = changeEvent(); + return ce == null ? new ResultMessage.Void() : new ResultMessage.SchemaChange(ce); } catch (RequestValidationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selectable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/Selectable.java b/src/java/org/apache/cassandra/cql3/statements/Selectable.java index 448301c..ab0a5a3 100644 --- a/src/java/org/apache/cassandra/cql3/statements/Selectable.java +++ b/src/java/org/apache/cassandra/cql3/statements/Selectable.java @@ -44,11 +44,13 @@ public interface Selectable public static class WithFunction implements Selectable { + public final String namespace; public final String functionName; public final List<Selectable> args; - public WithFunction(String functionName, List<Selectable> args) + public WithFunction(String namespace, String functionName, List<Selectable> args) { + this.namespace = namespace; this.functionName = functionName; this.args = args; } @@ -57,10 +59,13 @@ public interface Selectable public String toString() { StringBuilder sb = new StringBuilder(); + if (!namespace.isEmpty()) + sb.append(namespace).append("::"); sb.append(functionName).append("("); for (int i = 0; i < args.size(); i++) { - if (i > 0) sb.append(", "); + if (i > 0) + sb.append(", "); sb.append(args.get(i)); } return sb.append(")").toString(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/statements/Selection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java index 0f0cb62..325ef15 100644 --- a/src/java/org/apache/cassandra/cql3/statements/Selection.java +++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java @@ -29,6 +29,8 @@ import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.cql3.functions.Functions; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.udf.UDFunction; +import org.apache.cassandra.cql3.udf.UDFRegistry; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.CounterCell; import org.apache.cassandra.db.ExpiringCell; @@ -156,13 +158,26 @@ public abstract class Selection else { Selectable.WithFunction withFun = (Selectable.WithFunction)raw.selectable; - List<Selector> args = new ArrayList<Selector>(withFun.args.size()); + List<Selector> args = new ArrayList<>(withFun.args.size()); for (Selectable rawArg : withFun.args) args.add(makeSelector(cfm, new RawSelector(rawArg, null), defs, null)); + // resolve built-in functions before user defined functions AbstractType<?> returnType = Functions.getReturnType(withFun.functionName, cfm.ksName, cfm.cfName); if (returnType == null) - throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.functionName)); + { + UDFunction userFun = UDFRegistry.resolveFunction(withFun.namespace, withFun.functionName, cfm.ksName, cfm.cfName, args); + if (userFun != null) + { + // got a user defined function to call + Function fun = userFun.create(args); + ColumnSpecification spec = makeFunctionSpec(cfm, withFun, fun.returnType(), raw.alias); + if (metadata != null) + metadata.add(spec); + return new FunctionSelector(userFun.create(args), args); + } + throw new InvalidRequestException(String.format("Unknown function '%s'", withFun.namespace.isEmpty() ? withFun.functionName : withFun.namespace + "::" + withFun.functionName)); + } ColumnSpecification spec = makeFunctionSpec(cfm, withFun, returnType, raw.alias); Function fun = Functions.get(cfm.ksName, withFun.functionName, args, spec); if (metadata != null) @@ -193,7 +208,7 @@ public abstract class Selection ColumnIdentifier alias) throws InvalidRequestException { if (returnType == null) - throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.functionName)); + throw new InvalidRequestException(String.format("Unknown function %s called in selection clause", fun.namespace.isEmpty() ? fun.functionName : fun.namespace +"::"+fun.functionName)); return new ColumnSpecification(cfm.ksName, cfm.cfName, http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java new file mode 100644 index 0000000..aa6892a --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/udf/UDFFunctionOverloads.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3.udf; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.cql3.AssignementTestable; +import org.apache.cassandra.exceptions.InvalidRequestException; + +public final class UDFFunctionOverloads +{ + final Map<String, UFMetaData> signatureMap = new ConcurrentHashMap<>(); + final Map<String, UDFunction> udfInstances = new ConcurrentHashMap<>(); + + public void addAndInit(UFMetaData uf, boolean addIfInvalid) + { + try + { + UDFunction UDFunction = new UDFunction(uf); + udfInstances.put(uf.signature, UDFunction); + } + catch (InvalidRequestException e) + { + uf.invalid = e; + } + + if (uf.invalid == null || addIfInvalid) + signatureMap.put(uf.signature, uf); + } + + public void remove(UFMetaData uf) + { + signatureMap.remove(uf.signature); + udfInstances.remove(uf.signature); + } + + public Collection<UFMetaData> values() + { + return signatureMap.values(); + } + + public boolean isEmpty() + { + return signatureMap.isEmpty(); + } + + public UDFunction resolveFunction(String ksName, String cfName, List<? extends AssignementTestable> args) + throws InvalidRequestException + { + for (UFMetaData candidate : signatureMap.values()) + { + // Currently the UDF implementation must use concrete types (like Double, Integer) instead of base types (like Number). + // To support handling of base types it is necessary to construct new, temporary instances of UDFFunction with the + // signature for the current request in UDFFunction#argsType + UDFFunction#returnType. + // Additionally we need the requested return type (AssignementTestable) has a parameter for this method. + if (candidate.compatibleArgs(ksName, cfName, args)) + { + + // TODO CASSANDRA-7557 (specific per-function EXECUTE permission ??) + + if (candidate.invalid != null) + throw new InvalidRequestException(candidate.invalid.getMessage()); + return udfInstances.get(candidate.signature); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java new file mode 100644 index 0000000..cb3f1a1 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/udf/UDFRegistry.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.udf; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.cql3.AssignementTestable; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.functions.Functions; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.exceptions.InvalidRequestException; + +/** + * Central registry for user defined functions (CASSANDRA-7395). + * <p/> + * UDFs are maintained in {@code system.schema_functions} table and distributed to all nodes. + * <p/> + * UDFs are not maintained in {@link org.apache.cassandra.cql3.functions.Functions} class to have a strict + * distinction between 'core CQL' functions provided by Cassandra and functions provided by the user. + * 'Core CQL' functions have precedence over UDFs. + */ +public class UDFRegistry +{ + private static final Logger logger = LoggerFactory.getLogger(UDFRegistry.class); + + static final String SELECT_CQL = "SELECT namespace, name, signature, deterministic, argument_names, argument_types, " + + "return_type, language, body FROM " + + Keyspace.SYSTEM_KS + '.' + SystemKeyspace.SCHEMA_FUNCTIONS_CF; + + private static final Map<String, UDFFunctionOverloads> functions = new ConcurrentHashMap<>(); + + public static void init() + { + refreshInitial(); + } + + /** + * Initial loading of all existing UDFs. + */ + public static void refreshInitial() + { + logger.debug("Refreshing UDFs"); + for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(SELECT_CQL)) + { + UFMetaData uf = UFMetaData.fromSchema(row); + UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName); + if (sigMap == null) + functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads()); + + if (Functions.contains(uf.qualifiedName)) + logger.warn("The UDF '" + uf.functionName + "' cannot be used because it uses the same name as the CQL " + + "function with the same name. You should drop this function but can do a " + + "'DESCRIBE FUNCTION "+uf.functionName+";' in cqlsh before to get more information about it."); + + // add the function to the registry even if it is invalid (to be able to drop it) + sigMap.addAndInit(uf, true); + + if (uf.invalid != null) + logger.error("Loaded invalid UDF : " + uf.invalid.getMessage()); + } + } + + public static boolean hasFunction(String qualifiedName) + { + UDFFunctionOverloads sigMap = functions.get(qualifiedName.toLowerCase()); + return sigMap != null && !sigMap.isEmpty(); + } + + public static UDFunction resolveFunction(String namespace, String functionName, String ksName, String cfName, + List<? extends AssignementTestable> args) + throws InvalidRequestException + { + UDFFunctionOverloads sigMap = functions.get(UFMetaData.qualifiedName(namespace, functionName)); + if (sigMap != null) + return sigMap.resolveFunction(ksName, cfName, args); + return null; + } + + public static void migrateDropFunction(UFMetaData uf) + { + UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName); + if (sigMap == null) + return; + + sigMap.remove(uf); + } + + public static void migrateUpdateFunction(UFMetaData uf) + { + migrateAddFunction(uf); + } + + public static void migrateAddFunction(UFMetaData uf) + { + addFunction(uf, true); + } + + /** + * Used by {@link org.apache.cassandra.cql3.statements.CreateFunctionStatement} to create or replace a new function. + */ + public static void tryCreateFunction(UFMetaData ufMeta) throws InvalidRequestException + { + addFunction(ufMeta, false); + + if (ufMeta.invalid != null) + throw ufMeta.invalid; + } + + private static void addFunction(UFMetaData uf, boolean addIfInvalid) + { + UDFFunctionOverloads sigMap = functions.get(uf.qualifiedName); + if (sigMap == null) + functions.put(uf.qualifiedName, sigMap = new UDFFunctionOverloads()); + + sigMap.addAndInit(uf, addIfInvalid); + } + + public static UDFFunctionOverloads getFunctionSigMap(String qualifiedName) + { + return functions.get(qualifiedName); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/cql3/udf/UDFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/udf/UDFunction.java b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java new file mode 100644 index 0000000..4866c22 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/udf/UDFunction.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.udf; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.cql3.AssignementTestable; +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.exceptions.InvalidRequestException; + +/** + * UDFunction contains the <i>invokable</i> instance of a user defined function. + * Currently (as of CASSANDRA-7395) only {@code public static} methods in a {@link public} class + * can be invoked. + * CASSANDRA-7562 will introduce Java source code UDFs and CASSANDRA-7526 will introduce JSR-223 scripting languages. + * Invocations of UDFs are routed via this class. + */ +public class UDFunction +{ + private static final Logger logger = LoggerFactory.getLogger(UDFunction.class); + + public final UFMetaData meta; + + public final Method method; + + UDFunction(UFMetaData meta) throws InvalidRequestException + { + this.meta = meta; + + Method m; + switch (meta.language) + { + case "class": + m = resolveClassMethod(); + break; + default: + throw new InvalidRequestException("Invalid UDF language " + meta.language + " for '" + meta.qualifiedName + '\''); + } + this.method = m; + } + + private Method resolveClassMethod() throws InvalidRequestException + { + Class<?> jReturnType = meta.cqlReturnType.getType().getSerializer().getType(); + Class<?> paramTypes[] = new Class[meta.cqlArgumentTypes.size()]; + for (int i = 0; i < paramTypes.length; i++) + paramTypes[i] = meta.cqlArgumentTypes.get(i).getType().getSerializer().getType(); + + String className; + String methodName; + int i = meta.body.indexOf('#'); + if (i != -1) + { + methodName = meta.body.substring(i + 1); + className = meta.body.substring(0, i); + } + else + { + methodName = meta.functionName; + className = meta.body; + } + try + { + Class<?> cls = Class.forName(className, false, Thread.currentThread().getContextClassLoader()); + + Method method = cls.getMethod(methodName, paramTypes); + + if (!jReturnType.isAssignableFrom(method.getReturnType())) + { + throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") " + + "has incompatible return type " + method.getReturnType() + " (not assignable to " + jReturnType + ')'); + } + + return method; + } + catch (ClassNotFoundException e) + { + throw new InvalidRequestException("Class " + className + " does not exist"); + } + catch (NoSuchMethodException e) + { + throw new InvalidRequestException("Method " + className + '.' + methodName + '(' + Arrays.toString(paramTypes) + ") does not exist"); + } + } + + public Function create(List<? extends AssignementTestable> providedArgs) throws InvalidRequestException + { + final int argCount = providedArgs.size(); + final List<AbstractType<?>> argsType = new ArrayList<>(argCount); + final AbstractType<?> returnType = meta.cqlReturnType.getType(); + for (int i = 0; i < argCount; i++) + { + AbstractType<?> argType = meta.cqlArgumentTypes.get(i).getType(); + argsType.add(argType); + } + + return new Function() + { + public String name() + { + return meta.qualifiedName; + } + + public List<AbstractType<?>> argsType() + { + return argsType; + } + + public AbstractType<?> returnType() + { + return returnType; + } + + public ByteBuffer execute(List<ByteBuffer> parameters) throws InvalidRequestException + { + Object[] parms = new Object[argCount]; + for (int i = 0; i < parms.length; i++) + { + ByteBuffer bb = parameters.get(i); + if (bb != null) + { + AbstractType<?> argType = argsType.get(i); + parms[i] = argType.compose(bb); + } + } + + Object result; + try + { + result = method.invoke(null, parms); + @SuppressWarnings("unchecked") ByteBuffer r = result != null ? ((AbstractType) returnType).decompose(result) : null; + return r; + } + catch (InvocationTargetException e) + { + Throwable c = e.getCause(); + logger.error("Invocation of UDF {} failed", meta.qualifiedName, c); + throw new InvalidRequestException("Invocation of UDF " + meta.qualifiedName + " failed: " + c); + } + catch (IllegalAccessException e) + { + throw new InvalidRequestException("UDF " + meta.qualifiedName + " invocation failed: " + e); + } + } + + public boolean isPure() + { + return meta.deterministic; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/DefsTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java index fc43c27..33a112a 100644 --- a/src/java/org/apache/cassandra/db/DefsTables.java +++ b/src/java/org/apache/cassandra/db/DefsTables.java @@ -24,6 +24,8 @@ import java.util.*; import com.google.common.collect.Iterables; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.cql3.udf.UDFRegistry; import org.apache.cassandra.db.commitlog.CommitLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,6 +179,7 @@ public class DefsTables Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces); Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces); Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces); + Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF); for (Mutation mutation : mutations) mutation.apply(); @@ -188,10 +191,12 @@ public class DefsTables Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF, keyspaces); Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, keyspaces); Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF, keyspaces); + Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_CF); Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); mergeColumnFamilies(oldColumnFamilies, newColumnFamilies); mergeTypes(oldTypes, newTypes); + mergeFunctions(oldFunctions, newFunctions); // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted for (String keyspaceToDrop : keyspacesToDrop) @@ -377,6 +382,54 @@ public class DefsTables } } + private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated) + { + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated); + + // New namespace with functions + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + { + ColumnFamily cfFunctions = entry.getValue(); + if (!cfFunctions.hasColumns()) + continue; + + for (UFMetaData uf : UFMetaData.fromSchema(new Row(entry.getKey(), cfFunctions)).values()) + addFunction(uf); + } + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntry : diff.entriesDiffering().entrySet()) + { + DecoratedKey namespace = modifiedEntry.getKey(); + ColumnFamily prevCFFunctions = modifiedEntry.getValue().leftValue(); // state before external modification + ColumnFamily newCFFunctions = modifiedEntry.getValue().rightValue(); // updated state + + if (!prevCFFunctions.hasColumns()) // whole namespace was deleted and now it's re-created + { + for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, newCFFunctions)).values()) + addFunction(uf); + } + else if (!newCFFunctions.hasColumns()) // whole namespace is deleted + { + for (UFMetaData uf : UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)).values()) + dropFunction(uf); + } + else // has modifications in the functions, need to perform nested diff to determine what was really changed + { + MapDifference<String, UFMetaData> functionsDiff = Maps.difference(UFMetaData.fromSchema(new Row(namespace, prevCFFunctions)), + UFMetaData.fromSchema(new Row(namespace, newCFFunctions))); + + for (UFMetaData function : functionsDiff.entriesOnlyOnRight().values()) + addFunction(function); + + for (UFMetaData function : functionsDiff.entriesOnlyOnLeft().values()) + dropFunction(function); + + for (MapDifference.ValueDifference<UFMetaData> tdiff : functionsDiff.entriesDiffering().values()) + updateFunction(tdiff.rightValue()); // use the most recent value + } + } + } + private static void addKeyspace(KSMetaData ksm) { assert Schema.instance.getKSMetaData(ksm.name) == null; @@ -425,6 +478,16 @@ public class DefsTables MigrationManager.instance.notifyCreateUserType(ut); } + private static void addFunction(UFMetaData uf) + { + logger.info("Loading {}", uf); + + UDFRegistry.migrateAddFunction(uf); + + if (!StorageService.instance.isClientMode()) + MigrationManager.instance.notifyCreateFunction(uf); + } + private static void updateKeyspace(KSMetaData newState) { KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name); @@ -467,6 +530,16 @@ public class DefsTables MigrationManager.instance.notifyUpdateUserType(ut); } + private static void updateFunction(UFMetaData uf) + { + logger.info("Updating {}", uf); + + UDFRegistry.migrateUpdateFunction(uf); + + if (!StorageService.instance.isClientMode()) + MigrationManager.instance.notifyUpdateFunction(uf); + } + private static void dropKeyspace(String ksName) { KSMetaData ksm = Schema.instance.getKSMetaData(ksName); @@ -546,6 +619,16 @@ public class DefsTables MigrationManager.instance.notifyDropUserType(ut); } + private static void dropFunction(UFMetaData uf) + { + logger.info("Drop {}", uf); + + UDFRegistry.migrateDropFunction(uf); + + if (!StorageService.instance.isClientMode()) + MigrationManager.instance.notifyDropFunction(uf); + } + private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude) { // clone ksm but do not include the new def http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 3c647b6..8b62740 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -80,6 +80,7 @@ public class SystemKeyspace public static final String SCHEMA_COLUMNS_CF = "schema_columns"; public static final String SCHEMA_TRIGGERS_CF = "schema_triggers"; public static final String SCHEMA_USER_TYPES_CF = "schema_usertypes"; + public static final String SCHEMA_FUNCTIONS_CF = "schema_functions"; public static final String COMPACTION_LOG = "compactions_in_progress"; public static final String PAXOS_CF = "paxos"; public static final String SSTABLE_ACTIVITY_CF = "sstable_activity"; @@ -91,7 +92,8 @@ public class SystemKeyspace SCHEMA_COLUMNFAMILIES_CF, SCHEMA_COLUMNS_CF, SCHEMA_TRIGGERS_CF, - SCHEMA_USER_TYPES_CF); + SCHEMA_USER_TYPES_CF, + SCHEMA_FUNCTIONS_CF); private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; @@ -769,6 +771,16 @@ public class SystemKeyspace } } + public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName) + { + Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); + + for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName)) + schema.put(schemaEntity.key, schemaEntity.cf); + + return schema; + } + public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces) { Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 5c88cb1..71cba23 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -33,6 +33,7 @@ import javax.management.StandardMBean; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.cql3.udf.UDFRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -372,6 +373,9 @@ public class CassandraDaemon if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) waitForGossipToSettle(); + // UDF + UDFRegistry.init(); + // Thift InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress(); int rpcPort = DatabaseDescriptor.getRpcPort(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/IMigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java index 4d142bd..b4eb392 100644 --- a/src/java/org/apache/cassandra/service/IMigrationListener.java +++ b/src/java/org/apache/cassandra/service/IMigrationListener.java @@ -22,12 +22,16 @@ public interface IMigrationListener public void onCreateKeyspace(String ksName); public void onCreateColumnFamily(String ksName, String cfName); public void onCreateUserType(String ksName, String typeName); + public void onCreateFunction(String namespace, String functionName); public void onUpdateKeyspace(String ksName); public void onUpdateColumnFamily(String ksName, String cfName); public void onUpdateUserType(String ksName, String typeName); + public void onUpdateFunction(String namespace, String functionName); public void onDropKeyspace(String ksName); public void onDropColumnFamily(String ksName, String cfName); public void onDropUserType(String ksName, String typeName); + public void onDropFunction(String namespace, String functionName); + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 5dd2534..28e3e39 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -29,6 +29,9 @@ import java.util.concurrent.*; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; +import org.apache.cassandra.config.UFMetaData; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.SyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,6 +176,24 @@ public class MigrationManager listener.onCreateUserType(ut.keyspace, ut.getNameAsString()); } + public void notifyCreateFunction(UFMetaData uf) + { + for (IMigrationListener listener : listeners) + listener.onCreateFunction(uf.namespace, uf.functionName); + } + + public void notifyUpdateFunction(UFMetaData uf) + { + for (IMigrationListener listener : listeners) + listener.onUpdateFunction(uf.namespace, uf.functionName); + } + + public void notifyDropFunction(UFMetaData uf) + { + for (IMigrationListener listener : listeners) + listener.onDropFunction(uf.namespace, uf.functionName); + } + public void notifyUpdateKeyspace(KSMetaData ksm) { for (IMigrationListener listener : listeners) @@ -352,6 +373,27 @@ public class MigrationManager announce(addSerializedKeyspace(UTMetaData.dropFromSchema(droppedType, FBUtilities.timestampMicros()), droppedType.keyspace), announceLocally); } + public static void announceFunctionDrop(String namespace, String functionName, boolean announceLocally) throws InvalidRequestException + { + Mutation mutation = UFMetaData.dropFunction(FBUtilities.timestampMicros(), namespace, functionName); + if (mutation == null) + throw new InvalidRequestException(String.format("Cannot drop non existing function '%s'.", functionName)); + + logger.info(String.format("Drop Function '%s::%s'", namespace, functionName)); + announce(mutation, announceLocally); + } + + public static void announceNewFunction(UFMetaData function, boolean announceLocally) + throws ConfigurationException, SyntaxException + { + Mutation mutation = UFMetaData.createOrReplaceFunction(FBUtilities.timestampMicros(), function); + if (mutation == null) + throw new ConfigurationException(String.format("Function '%s' already exists.", function.qualifiedName)); + + logger.info(String.format("Create Function '%s'", function)); + announce(mutation, announceLocally); + } + /** * actively announce a new version to active hosts via rpc * @param schema The schema mutation to be applied http://git-wip-us.apache.org/repos/asf/cassandra/blob/25411bf1/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index b7c5e68..85943cf 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -208,18 +208,18 @@ public abstract class Event public final Change change; public final Target target; - public final String keyspace; - public final String tableOrType; + public final String keyOrNamespace; + public final String tableOrTypeOrFunction; - public SchemaChange(Change change, Target target, String keyspace, String tableOrType) + public SchemaChange(Change change, Target target, String keyOrNamespace, String tableOrTypeOrFunction) { super(Type.SCHEMA_CHANGE); this.change = change; this.target = target; - this.keyspace = keyspace; - this.tableOrType = tableOrType; + this.keyOrNamespace = keyOrNamespace; + this.tableOrTypeOrFunction = tableOrTypeOrFunction; if (target != Target.KEYSPACE) - assert this.tableOrType != null : "Table or type should be set for non-keyspace schema change events"; + assert this.tableOrTypeOrFunction != null : "Table or type should be set for non-keyspace schema change events"; } public SchemaChange(Change change, String keyspace) @@ -252,9 +252,9 @@ public abstract class Event { CBUtil.writeEnumValue(change, dest); CBUtil.writeEnumValue(target, dest); - CBUtil.writeString(keyspace, dest); + CBUtil.writeString(keyOrNamespace, dest); if (target != Target.KEYSPACE) - CBUtil.writeString(tableOrType, dest); + CBUtil.writeString(tableOrTypeOrFunction, dest); } else { @@ -263,14 +263,14 @@ public abstract class Event // For the v1/v2 protocol, we have no way to represent type changes, so we simply say the keyspace // was updated. See CASSANDRA-7617. CBUtil.writeEnumValue(Change.UPDATED, dest); - CBUtil.writeString(keyspace, dest); + CBUtil.writeString(keyOrNamespace, dest); CBUtil.writeString("", dest); } else { CBUtil.writeEnumValue(change, dest); - CBUtil.writeString(keyspace, dest); - CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest); + CBUtil.writeString(keyOrNamespace, dest); + CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction, dest); } } } @@ -281,10 +281,10 @@ public abstract class Event { int size = CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfEnumValue(target) - + CBUtil.sizeOfString(keyspace); + + CBUtil.sizeOfString(keyOrNamespace); if (target != Target.KEYSPACE) - size += CBUtil.sizeOfString(tableOrType); + size += CBUtil.sizeOfString(tableOrTypeOrFunction); return size; } @@ -293,25 +293,25 @@ public abstract class Event if (target == Target.TYPE) { return CBUtil.sizeOfEnumValue(Change.UPDATED) - + CBUtil.sizeOfString(keyspace) + + CBUtil.sizeOfString(keyOrNamespace) + CBUtil.sizeOfString(""); } return CBUtil.sizeOfEnumValue(change) - + CBUtil.sizeOfString(keyspace) - + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType); + + CBUtil.sizeOfString(keyOrNamespace) + + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrTypeOrFunction); } } @Override public String toString() { - return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType); + return change + " " + target + " " + keyOrNamespace + (tableOrTypeOrFunction == null ? "" : "." + tableOrTypeOrFunction); } @Override public int hashCode() { - return Objects.hashCode(change, target, keyspace, tableOrType); + return Objects.hashCode(change, target, keyOrNamespace, tableOrTypeOrFunction); } @Override @@ -323,8 +323,8 @@ public abstract class Event SchemaChange scc = (SchemaChange)other; return Objects.equal(change, scc.change) && Objects.equal(target, scc.target) - && Objects.equal(keyspace, scc.keyspace) - && Objects.equal(tableOrType, scc.tableOrType); + && Objects.equal(keyOrNamespace, scc.keyOrNamespace) + && Objects.equal(tableOrTypeOrFunction, scc.tableOrTypeOrFunction); } } }