http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 5eb641e..291c84c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -87,11 +88,11 @@ public class ParseNodeFactory { * * @since 0.1 */ - private static class BuiltInFunctionKey { + public static class BuiltInFunctionKey { private final String upperName; private final int argCount; - private BuiltInFunctionKey(String lowerName, int argCount) { + public BuiltInFunctionKey(String lowerName, int argCount) { this.upperName = lowerName; this.argCount = argCount; } @@ -180,9 +181,6 @@ public class ParseNodeFactory { public static BuiltInFunctionInfo get(String normalizedName, List<ParseNode> children) { initBuiltInFunctionMap(); BuiltInFunctionInfo info = BUILT_IN_FUNCTION_MAP.get(new BuiltInFunctionKey(normalizedName,children.size())); - if (info == null) { - throw new UnknownFunctionException(normalizedName); - } return info; } @@ -288,8 +286,8 @@ public class ParseNodeFactory { return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount); } - public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) { - return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount); + public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount, Map<String, UDFParseNode> udfParseNodes) { + return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes); } public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith, @@ -299,6 +297,14 @@ public class ParseNodeFactory { maxValue, cycle, ifNotExits, bindCount); } + public CreateFunctionStatement createFunction(PFunction functionInfo, boolean temporary) { + return new CreateFunctionStatement(functionInfo, temporary); + } + + public DropFunctionStatement dropFunction(String functionName, boolean ifExists) { + return new DropFunctionStatement(functionName, ifExists); + } + public DropSequenceStatement dropSequence(TableName tableName, boolean ifExits, int bindCount){ return new DropSequenceStatement(tableName, ifExits, bindCount); } @@ -388,6 +394,9 @@ public class ParseNodeFactory { public FunctionParseNode function(String name, List<ParseNode> args) { BuiltInFunctionInfo info = getInfo(name, args); + if (info == null) { + return new UDFParseNode(name, args, info); + } Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor(); if (ctor == null) { return info.isAggregate() @@ -411,6 +420,9 @@ public class ParseNodeFactory { args.addAll(valueNodes); BuiltInFunctionInfo info = getInfo(name, args); + if(info==null) { + return new UDFParseNode(name,args,info); + } Constructor<? extends FunctionParseNode> ctor = info.getNodeCtor(); if (ctor == null) { return new AggregateFunctionWithinGroupParseNode(name, args, info); @@ -657,100 +669,100 @@ public class ParseNodeFactory { public SelectStatement select(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate, - boolean hasSequence, List<SelectStatement> selects) { + boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) { return new SelectStatement(from, hint, isDistinct, select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having, - orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects); + orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate, hasSequence, selects == null ? Collections.<SelectStatement>emptyList() : selects, udfParseNodes); } - public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) { - return new UpsertStatement(table, hint, columns, values, select, bindCount); + public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes) { + return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes); } - public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount) { - return new DeleteStatement(table, hint, node, orderBy, limit, bindCount); + public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) { + return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes); } public SelectStatement select(SelectStatement statement, ParseNode where) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(), - statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, ParseNode where, ParseNode having) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), having, - statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), - select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + select, where, groupBy, having, orderBy, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, TableNode table) { return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, TableNode table, ParseNode where) { return select(table, statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select) { return select(statement.getFrom(), statement.getHint(), isDistinct, select, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where) { return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) { return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy, statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate, - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, statement.getLimit(), - statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, HintNode hint) { return hint == null || hint.isEmpty() ? statement : select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), - statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, HintNode hint, ParseNode where) { return select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), - statement.hasSequence(), statement.getSelects()); + statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit, - bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + bindCount, isAggregate || statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, LimitNode limit) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), limit, - statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy, LimitNode limit) { return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, limit, - statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } public SelectStatement select(List<SelectStatement> statements, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) { @@ -762,8 +774,10 @@ public class ParseNodeFactory { // it will be done later at compile stage. Empty or different aliases // are ignored, since they cannot be referred by outer queries. List<String> aliases = Lists.<String> newArrayList(); + Map<String, UDFParseNode> udfParseNodes = new HashMap<String, UDFParseNode>(1); for (int i = 0; i < statements.size() && aliases.isEmpty(); i++) { SelectStatement subselect = statements.get(i); + udfParseNodes.putAll(subselect.getUdfParseNodes()); if (!subselect.hasWildcard()) { for (AliasedNode aliasedNode : subselect.getSelect()) { String alias = aliasedNode.getAlias(); @@ -786,7 +800,7 @@ public class ParseNodeFactory { } return select(null, HintNode.EMPTY_HINT_NODE, false, aliasedNodes, - null, null, null, orderBy, limit, bindCount, false, false, statements); + null, null, null, orderBy, limit, bindCount, false, false, statements, udfParseNodes); } public SubqueryParseNode subquery(SelectStatement select, boolean expectSingleRow) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java index 4ce893d..e48967b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java @@ -151,7 +151,7 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> { return NODE_FACTORY.select(normFrom, statement.getHint(), statement.isDistinct(), normSelectNodes, normWhere, normGroupByNodes, normHaving, normOrderByNodes, statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), - statement.getSelects()); + statement.getSelects(), statement.getUdfParseNodes()); } private Map<String, ParseNode> getAliasMap() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java index 44b24af..362e98d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java @@ -20,7 +20,9 @@ package org.apache.phoenix.parse; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.expression.function.CountAggregateFunction; @@ -42,7 +44,7 @@ public class SelectStatement implements FilterableStatement { Collections.<AliasedNode>singletonList(new AliasedNode(null, LiteralParseNode.ONE)), null, Collections.<ParseNode>emptyList(), null, Collections.<OrderByNode>emptyList(), - null, 0, false, false, Collections.<SelectStatement>emptyList()); + null, 0, false, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1)); public static final SelectStatement COUNT_ONE = new SelectStatement( null, null, false, @@ -54,14 +56,14 @@ public class SelectStatement implements FilterableStatement { new BuiltInFunctionInfo(CountAggregateFunction.class, CountAggregateFunction.class.getAnnotation(BuiltInFunction.class))))), null, Collections.<ParseNode>emptyList(), null, Collections.<OrderByNode>emptyList(), - null, 0, true, false, Collections.<SelectStatement>emptyList()); + null, 0, true, false, Collections.<SelectStatement>emptyList(), new HashMap<String, UDFParseNode>(1)); public static SelectStatement create(SelectStatement select, HintNode hint) { if (select.getHint() == hint || hint.isEmpty()) { return select; } return new SelectStatement(select.getFrom(), hint, select.isDistinct(), select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(), - select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects()); + select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); } public SelectStatement combine(ParseNode where) { @@ -73,13 +75,13 @@ public class SelectStatement implements FilterableStatement { } return new SelectStatement(this.getFrom(), this.getHint(), this.isDistinct(), this.getSelect(), where, this.getGroupBy(), this.getHaving(), - this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects); + this.getOrderBy(), this.getLimit(), this.getBindCount(), this.isAggregate(), this.hasSequence(), this.selects, this.udfParseNodes); } public static SelectStatement create(SelectStatement select, List<AliasedNode> selects) { return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(), selects, select.getWhere(), select.getGroupBy(), select.getHaving(), - select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects()); + select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); } // Copy constructor for sub select statements in a union @@ -87,7 +89,7 @@ public class SelectStatement implements FilterableStatement { List<OrderByNode> orderBy, LimitNode limit, boolean isAggregate) { return new SelectStatement(select.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), select.getWhere(), select.getGroupBy(), select.getHaving(), - orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects()); + orderBy, limit, select.getBindCount(), isAggregate, select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); } private final TableNode fromTable; @@ -104,6 +106,7 @@ public class SelectStatement implements FilterableStatement { private final boolean hasSequence; private final boolean hasWildcard; private final List<SelectStatement> selects = new ArrayList<SelectStatement>(); + private final Map<String, UDFParseNode> udfParseNodes; @Override public final String toString() { @@ -216,7 +219,7 @@ public class SelectStatement implements FilterableStatement { protected SelectStatement(TableNode from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, - int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects) { + int bindCount, boolean isAggregate, boolean hasSequence, List<SelectStatement> selects, Map<String, UDFParseNode> udfParseNodes) { this.fromTable = from; this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint; this.isDistinct = isDistinct; @@ -241,6 +244,7 @@ public class SelectStatement implements FilterableStatement { if (!selects.isEmpty()) { this.selects.addAll(selects); } + this.udfParseNodes = udfParseNodes; } @Override @@ -333,4 +337,8 @@ public class SelectStatement implements FilterableStatement { public boolean hasWildcard() { return hasWildcard; } + + public Map<String, UDFParseNode> getUdfParseNodes() { + return udfParseNodes; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java new file mode 100644 index 0000000..c0b972f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UDFParseNode.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import java.util.List; + +public class UDFParseNode extends FunctionParseNode { + + public UDFParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo info) { + super(name, children, info); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java index fc299d1..48698bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpsertStatement.java @@ -19,15 +19,18 @@ package org.apache.phoenix.parse; import java.util.Collections; import java.util.List; +import java.util.Map; -public class UpsertStatement extends DMLStatement { +public class UpsertStatement extends DMLStatement { private final List<ColumnName> columns; private final List<ParseNode> values; private final SelectStatement select; private final HintNode hint; - public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) { - super(table, bindCount); + public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, + List<ParseNode> values, SelectStatement select, int bindCount, + Map<String, UDFParseNode> udfParseNodes) { + super(table, bindCount, udfParseNodes); this.columns = columns == null ? Collections.<ColumnName>emptyList() : columns; this.values = values; this.select = select; http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java index c12c64d..f4a60bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/protobuf/ProtobufUtil.java @@ -71,6 +71,16 @@ public class ProtobufUtil { return getMutations(request.getTableMetadataMutationsList()); } + public static List<Mutation> getMutations(MetaDataProtos.DropFunctionRequest request) + throws IOException { + return getMutations(request.getTableMetadataMutationsList()); + } + + public static List<Mutation> getMutations(MetaDataProtos.CreateFunctionRequest request) + throws IOException { + return getMutations(request.getTableMetadataMutationsList()); + } + public static List<Mutation> getMutations(MetaDataProtos.DropTableRequest request) throws IOException { return getMutations(request.getTableMetadataMutationsList()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 09705c6..dc51b10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; @@ -70,8 +71,10 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public PhoenixConnection connect(String url, Properties info) throws SQLException; public MetaDataMutationResult getTable(PName tenantId, byte[] schemaName, byte[] tableName, long tableTimestamp, long clientTimetamp) throws SQLException; + public MetaDataMutationResult getFunctions(PName tenantId, List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) throws SQLException; public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] tableName, PTableType tableType, Map<String,Object> tableProps, List<Pair<byte[],Map<String,Object>>> families, byte[][] splits) throws SQLException; public MetaDataMutationResult dropTable(List<Mutation> tableMetadata, PTableType tableType, boolean cascade) throws SQLException; + public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) throws SQLException; public MetaDataMutationResult addColumn(List<Mutation> tableMetaData, PTable table, Map<String, List<Pair<String,Object>>> properties, Set<String> colFamiliesForPColumnsToBeAdded) throws SQLException; public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException; public MetaDataMutationResult updateIndexState(List<Mutation> tableMetadata, String parentTableName) throws SQLException; @@ -93,6 +96,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated long currentSequenceValue(SequenceKey sequenceKey, long timestamp) throws SQLException; void returnSequences(List<SequenceKey> sequenceKeys, long timestamp, SQLException[] exceptions) throws SQLException; + MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException; void addConnection(PhoenixConnection connection) throws SQLException; void removeConnection(PhoenixConnection connection) throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 49c946a..30b43d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -87,9 +87,12 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; @@ -110,9 +113,11 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.EmptySequenceCacheException; +import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PColumn; @@ -220,6 +225,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes); } + /** * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase * cluster. @@ -562,6 +568,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (metadata == null) { throwConnectionClosedException(); } + return new PhoenixConnection(this, url, info, metadata); } @@ -691,12 +698,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table // stay on the same region. - if (SchemaUtil.isMetaTable(tableName)) { + if (SchemaUtil.isMetaTable(tableName) || SchemaUtil.isFunctionTable(tableName)) { if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) { descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null); } - if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { - descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null); + if(SchemaUtil.isMetaTable(tableName) ) { + if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { + descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null); + } } } else if (SchemaUtil.isSequenceTable(tableName)) { if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { @@ -992,23 +1001,33 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement */ private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, Batch.Call<MetaDataService, MetaDataResponse> callable) throws SQLException { + return metaDataCoprocessorExec(tableKey, callable, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + } + /** + * Invoke meta data coprocessor with one retry if the key was found to not be in the regions + * (due to a table split) + */ + private MetaDataMutationResult metaDataCoprocessorExec(byte[] tableKey, + Batch.Call<MetaDataService, MetaDataResponse> callable, byte[] tableName) throws SQLException { + try { boolean retried = false; while (true) { if (retried) { connection.relocateRegion( - TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES), + TableName.valueOf(tableName), tableKey); } - HTableInterface ht = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + HTableInterface ht = this.getTable(tableName); try { final Map<byte[], MetaDataResponse> results = ht.coprocessorService(MetaDataService.class, tableKey, tableKey, callable); assert(results.size() == 1); MetaDataResponse result = results.values().iterator().next(); - if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION) { + if (result.getReturnCode() == MetaDataProtos.MutationCode.TABLE_NOT_IN_REGION + || result.getReturnCode() == MetaDataProtos.MutationCode.FUNCTION_NOT_IN_REGION) { if (retried) return MetaDataMutationResult.constructFromProto(result); retried = true; continue; @@ -1331,6 +1350,37 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return result; } + @Override + public MetaDataMutationResult dropFunction(final List<Mutation> functionData, final boolean ifExists) throws SQLException { + byte[][] rowKeyMetadata = new byte[2][]; + byte[] key = functionData.get(0).getRow(); + SchemaUtil.getVarChars(key, rowKeyMetadata); + byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; + byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; + byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes); + + final MetaDataMutationResult result = metaDataCoprocessorExec(functionKey, + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + DropFunctionRequest.Builder builder = DropFunctionRequest.newBuilder(); + for (Mutation m : functionData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setIfExists(ifExists); + instance.dropFunction(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); + return result; + } private void invalidateTables(final List<byte[]> tableNamesToDelete) { if (tableNamesToDelete != null) { for ( byte[] tableName : tableNamesToDelete ) { @@ -1944,6 +1994,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " + PLong.INSTANCE.getSqlTypeName()); } + try { + metaConnection.createStatement().executeUpdate( + QueryConstants.CREATE_FUNCTION_METADATA); + } catch (NewerTableAlreadyExistsException e) { + } catch (TableAlreadyExistsException e) { + } + } catch (Exception e) { if (e instanceof SQLException) { initializationException = (SQLException)e; @@ -2540,4 +2597,96 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public int getSequenceSaltBuckets() { return nSequenceSaltBuckets; } + + @Override + public PMetaData addFunction(PFunction function) throws SQLException { + synchronized (latestMetaDataLock) { + try { + throwConnectionClosedIfNullMetaData(); + // If existing table isn't older than new table, don't replace + // If a client opens a connection at an earlier timestamp, this can happen + PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName())); + if (existingFunction.getTimeStamp() >= function.getTimeStamp()) { + return latestMetaData; + } + } catch (FunctionNotFoundException e) {} + latestMetaData = latestMetaData.addFunction(function); + latestMetaDataLock.notifyAll(); + return latestMetaData; + } + } + + @Override + public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + throws SQLException { + synchronized (latestMetaDataLock) { + throwConnectionClosedIfNullMetaData(); + latestMetaData = latestMetaData.removeFunction(tenantId, function, functionTimeStamp); + latestMetaDataLock.notifyAll(); + return latestMetaData; + } + } + + @Override + public MetaDataMutationResult getFunctions(PName tenantId, final List<Pair<byte[], Long>> functions, + final long clientTimestamp) throws SQLException { + final byte[] tenantIdBytes = tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(); + return metaDataCoprocessorExec(tenantIdBytes, + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + GetFunctionsRequest.Builder builder = GetFunctionsRequest.newBuilder(); + builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantIdBytes)); + for(Pair<byte[], Long> function: functions) { + builder.addFunctionNames(HBaseZeroCopyByteString.wrap(function.getFirst())); + builder.addFunctionTimestamps(function.getSecond().longValue()); + } + builder.setClientTimestamp(clientTimestamp); + + instance.getFunctions(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); + + } + + // TODO the mutations should be added to System functions table. + @Override + public MetaDataMutationResult createFunction(final List<Mutation> functionData, + final PFunction function, final boolean temporary) throws SQLException { + byte[][] rowKeyMetadata = new byte[2][]; + Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(functionData); + byte[] key = m.getRow(); + SchemaUtil.getVarChars(key, rowKeyMetadata); + byte[] tenantIdBytes = rowKeyMetadata[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; + byte[] functionBytes = rowKeyMetadata[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; + byte[] functionKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionBytes); + MetaDataMutationResult result = metaDataCoprocessorExec(functionKey, + new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = + new BlockingRpcCallback<MetaDataResponse>(); + CreateFunctionRequest.Builder builder = CreateFunctionRequest.newBuilder(); + for (Mutation m : functionData) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + builder.setTemporary(temporary); + instance.createFunction(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }, PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES); + return result; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 742c38e..4d582be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -20,6 +20,7 @@ package org.apache.phoenix.query; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -51,6 +52,8 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; @@ -104,6 +107,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple super(queryServices); userName = connInfo.getPrincipal(); metaData = newEmptyMetaData(); + // Use KeyValueBuilder that builds real KeyValues, as our test utils require this this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } @@ -280,6 +284,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // A TableAlreadyExistsException is not thrown, since the table only exists *after* this // fixed timestamp. } + + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); + } catch (NewerTableAlreadyExistsException ignore) { + } } catch (SQLException e) { sqlE = e; } finally { @@ -479,5 +488,46 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); } - + + @Override + public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) + throws SQLException { + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null); + } + + @Override + public PMetaData addFunction(PFunction function) throws SQLException { + return metaData = this.metaData.addFunction(function); + } + + @Override + public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + throws SQLException { + return metaData = this.metaData.removeFunction(tenantId, function, functionTimeStamp); + } + + @Override + public MetaDataMutationResult getFunctions(PName tenantId, + List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) + throws SQLException { + List<PFunction> functions = new ArrayList<PFunction>(functionNameAndTimeStampPairs.size()); + for(Pair<byte[], Long> functionInfo: functionNameAndTimeStampPairs) { + try { + PFunction function2 = metaData.getFunction(new PTableKey(tenantId, Bytes.toString(functionInfo.getFirst()))); + functions.add(function2); + } catch (FunctionNotFoundException e) { + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0, null); + } + } + if(functions.isEmpty()) { + return null; + } + return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, functions, true); + } + + @Override + public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) + throws SQLException { + return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 0, null); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index e2c9544..2a98cd5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -35,6 +35,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PName; @@ -250,4 +251,34 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public int getSequenceSaltBuckets() { return getDelegate().getSequenceSaltBuckets(); } + + @Override + public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) + throws SQLException { + return getDelegate().createFunction(functionData, function, temporary); + } + + @Override + public PMetaData addFunction(PFunction function) throws SQLException { + return getDelegate().addFunction(function); + } + + @Override + public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + throws SQLException { + return getDelegate().removeFunction(tenantId, function, functionTimeStamp); + } + + @Override + public MetaDataMutationResult getFunctions(PName tenantId, + List<Pair<byte[], Long>> functionNameAndTimeStampPairs, long clientTimestamp) + throws SQLException { + return getDelegate().getFunctions(tenantId, functionNameAndTimeStampPairs, clientTimestamp); + } + + @Override + public MetaDataMutationResult dropFunction(List<Mutation> tableMetadata, boolean ifExists) + throws SQLException { + return getDelegate().dropFunction(tableMetadata, ifExists); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java index ae37ac6..76e7593 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java @@ -20,6 +20,7 @@ package org.apache.phoenix.query; import java.sql.SQLException; import java.util.List; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PName; @@ -38,4 +39,6 @@ public interface MetaDataMutated { PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException; PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException; PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException; + PMetaData addFunction(PFunction function) throws SQLException; + PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 6470b72..73d1123 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -86,6 +86,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT; @@ -276,4 +287,29 @@ public interface QueryConstants { " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n"; + + public static final String CREATE_FUNCTION_METADATA = + "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\"(\n" + + // Pk columns + TENANT_ID + " VARCHAR NULL," + + FUNCTION_NAME + " VARCHAR NOT NULL, \n" + + NUM_ARGS + " INTEGER, \n" + + // Function metadata (will be null for argument row) + CLASS_NAME + " VARCHAR, \n" + + JAR_PATH + " VARCHAR, \n" + + RETURN_TYPE + " VARCHAR, \n" + + // Argument metadata (will be null for function row) + TYPE + " VARCHAR, \n" + + ARG_POSITION + " VARBINARY, \n" + + IS_ARRAY + " BOOLEAN, \n" + + IS_CONSTANT + " BOOLEAN, \n" + + DEFAULT_VALUE + " VARCHAR, \n" + + MIN_VALUE + " VARCHAR, \n" + + MAX_VALUE + " VARCHAR, \n" + + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ", " + FUNCTION_NAME + ", " + TYPE + ", " + ARG_POSITION + "))\n" + + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+ + // Install split policy to prevent a tenant's metadata from being split across regions. + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 4b793d1..9183a70 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -103,6 +103,7 @@ public interface QueryServices extends SQLCloseable { public static final String REGIONSERVER_INFO_PORT_ATTRIB = "hbase.regionserver.info.port"; public static final String REGIONSERVER_LEASE_PERIOD_ATTRIB = "hbase.regionserver.lease.period"; public static final String RPC_TIMEOUT_ATTRIB = "hbase.rpc.timeout"; + public static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir"; public static final String ZOOKEEPER_QUARUM_ATTRIB = "hbase.zookeeper.quorum"; public static final String ZOOKEEPER_PORT_ATTRIB = "hbase.zookeeper.property.clientPort"; public static final String ZOOKEEPER_ROOT_NODE_ATTRIB = "zookeeper.znode.parent"; @@ -165,6 +166,7 @@ public interface QueryServices extends SQLCloseable { public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count"; public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder"; + public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions"; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b98c9ee..972bf26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -18,6 +18,7 @@ package org.apache.phoenix.query; import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE; +import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; @@ -195,6 +196,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false; public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false; + public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false; private final Configuration config; @@ -250,7 +252,7 @@ public class QueryServicesOptions { .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED) .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY) .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX) - .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) + .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java new file mode 100644 index 0000000..91b9d07 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionAlreadyExistsException.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.sql.SQLException; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.parse.PFunction; + +public class FunctionAlreadyExistsException extends SQLException { + private static final long serialVersionUID = 1L; + private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_ALREADY_EXIST; + private final PFunction function; + private final String functionName; + + public FunctionAlreadyExistsException(String functionName) { + this(functionName, null, null); + } + + public FunctionAlreadyExistsException(String functionName, String msg) { + this(functionName, msg, null); + } + + public FunctionAlreadyExistsException(String functionName, PFunction function) { + this(functionName, null, function); + } + + public FunctionAlreadyExistsException(String functionName, String msg, PFunction function) { + super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).setMessage(msg).build().toString(), + code.getSQLState(), code.getErrorCode()); + this.functionName = functionName; + this.function = function; + } + + public String getFunctionName() { + return functionName; + } + + public PFunction getFunction() { + return function; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java new file mode 100644 index 0000000..73e23be --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; + +public class FunctionNotFoundException extends MetaDataEntityNotFoundException { + private static final long serialVersionUID = 1L; + private static SQLExceptionCode code = SQLExceptionCode.FUNCTION_UNDEFINED; + private final String functionName; + private final long timestamp; + + public FunctionNotFoundException(FunctionNotFoundException e, long timestamp) { + this(e.functionName, timestamp); + } + + public FunctionNotFoundException(String functionName) { + this(functionName, HConstants.LATEST_TIMESTAMP); + } + + public FunctionNotFoundException(String functionName, long timestamp) { + super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).build().toString(), + code.getSQLState(), code.getErrorCode(), null); + this.functionName = functionName; + this.timestamp = timestamp; + } + + public String getFunctionName() { + return functionName; + } + + public long getTimeStamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 22208f1..fcdb651 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -32,15 +32,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; @@ -62,10 +67,20 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.sql.Array; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ParameterMetaData; @@ -75,6 +90,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.Types; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -99,6 +115,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.ExplainPlan; @@ -120,6 +137,8 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.function.FunctionExpression; +import org.apache.phoenix.expression.function.ScalarFunction; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -130,21 +149,27 @@ import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AlterIndexStatement; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnName; +import org.apache.phoenix.parse.CreateFunctionStatement; import org.apache.phoenix.parse.CreateIndexStatement; import org.apache.phoenix.parse.CreateSequenceStatement; import org.apache.phoenix.parse.CreateTableStatement; import org.apache.phoenix.parse.DropColumnStatement; +import org.apache.phoenix.parse.DropFunctionStatement; import org.apache.phoenix.parse.DropIndexStatement; import org.apache.phoenix.parse.DropSequenceStatement; import org.apache.phoenix.parse.DropTableStatement; +import org.apache.phoenix.parse.FunctionParseNode; import org.apache.phoenix.parse.IndexKeyConstraint; import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.PrimaryKeyConstraint; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.UpdateStatisticsStatement; +import org.apache.phoenix.parse.PFunction.FunctionArgument; import org.apache.phoenix.query.ConnectionQueryServices.Feature; +import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -280,6 +305,28 @@ public class MetaDataClient { COLUMN_FAMILY + "," + ORDINAL_POSITION + ") VALUES (?, ?, ?, ?, ?, ?)"; + private static final String CREATE_FUNCTION = + "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " + + TENANT_ID +","+ + FUNCTION_NAME + "," + + NUM_ARGS + "," + + CLASS_NAME + "," + + JAR_PATH + "," + + RETURN_TYPE + + ") VALUES (?, ?, ?, ?, ?, ?)"; + private static final String INSERT_FUNCTION_ARGUMENT = + "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_FUNCTION_TABLE + "\" ( " + + TENANT_ID +","+ + FUNCTION_NAME + "," + + TYPE + "," + + ARG_POSITION +","+ + IS_ARRAY + "," + + IS_CONSTANT + "," + + DEFAULT_VALUE + "," + + MIN_VALUE + "," + + MAX_VALUE + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + private final PhoenixConnection connection; @@ -315,6 +362,24 @@ public class MetaDataClient { return updateCache(tenantId, schemaName, tableName, false); } + /** + * Update the cache with the latest as of the connection scn. + * @param functioNames + * @return the timestamp from the server, negative if the function was added to the cache and positive otherwise + * @throws SQLException + */ + public MetaDataMutationResult updateCache(List<String> functionNames) throws SQLException { + return updateCache(functionNames, false); + } + + private MetaDataMutationResult updateCache(List<String> functionNames, boolean alwaysHitServer) throws SQLException { + return updateCache(connection.getTenantId(), functionNames, alwaysHitServer); + } + + public MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames) throws SQLException { + return updateCache(tenantId, functionNames, false); + } + private long getClientTimeStamp() { Long scn = connection.getSCN(); long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; @@ -394,6 +459,77 @@ public class MetaDataClient { return result; } + + private MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames, + boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez + long clientTimeStamp = getClientTimeStamp(); + List<PFunction> functions = new ArrayList<PFunction>(functionNames.size()); + List<Long> functionTimeStamps = new ArrayList<Long>(functionNames.size()); + Iterator<String> iterator = functionNames.iterator(); + while (iterator.hasNext()) { + PFunction function = null; + try { + String functionName = iterator.next(); + function = + connection.getMetaDataCache().getFunction( + new PTableKey(tenantId, functionName)); + if (function != null && !alwaysHitServer + && function.getTimeStamp() == clientTimeStamp - 1) { + functions.add(function); + iterator.remove(); + continue; + } + if (function != null && function.getTimeStamp() != clientTimeStamp - 1) { + functionTimeStamps.add(function.getTimeStamp()); + } else { + functionTimeStamps.add(HConstants.LATEST_TIMESTAMP); + } + } catch (FunctionNotFoundException e) { + functionTimeStamps.add(HConstants.LATEST_TIMESTAMP); + } + } + // Don't bother with server call: we can't possibly find a newer function + if (functionNames.isEmpty()) { + return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,functions, true); + } + + int maxTryCount = tenantId == null ? 1 : 2; + int tryCount = 0; + MetaDataMutationResult result; + + do { + List<Pair<byte[], Long>> functionsToFecth = new ArrayList<Pair<byte[], Long>>(functionNames.size()); + for(int i = 0; i< functionNames.size(); i++) { + functionsToFecth.add(new Pair<byte[], Long>(PVarchar.INSTANCE.toBytes(functionNames.get(i)), functionTimeStamps.get(i))); + } + result = connection.getQueryServices().getFunctions(tenantId, functionsToFecth, clientTimeStamp); + + MutationCode code = result.getMutationCode(); + // We found an updated table, so update our cache + if (result.getFunctions() != null && !result.getFunctions().isEmpty()) { + result.getFunctions().addAll(functions); + addFunctionToCache(result); + return result; + } else { + if (code == MutationCode.FUNCTION_ALREADY_EXISTS) { + result.getFunctions().addAll(functions); + addFunctionToCache(result); + return result; + } + if (code == MutationCode.FUNCTION_NOT_FOUND && tryCount + 1 == maxTryCount) { + for (Pair<byte[], Long> f : functionsToFecth) { + connection.removeFunction(tenantId, Bytes.toString(f.getFirst()), + f.getSecond()); + } + // TODO removeFunctions all together from cache when + throw new FunctionNotFoundException(functionNames.toString() + " not found"); + } + } + tenantId = null; // Try again with global tenantId + } while (++tryCount < maxTryCount); + + return result; + } /** * Fault in the physical table to the cache and add any indexes it has to the indexes @@ -540,6 +676,20 @@ public class MetaDataClient { colUpsert.execute(); } + private void addFunctionArgMutation(String functionName, FunctionArgument arg, PreparedStatement argUpsert, int position) throws SQLException { + argUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); + argUpsert.setString(2, functionName); + argUpsert.setString(3, arg.getArgumentType()); + byte[] bytes = Bytes.toBytes((short)position); + argUpsert.setBytes(4, bytes); + argUpsert.setBoolean(5, arg.isArrayType()); + argUpsert.setBoolean(6, arg.isConstant()); + argUpsert.setString(7, arg.getDefaultValue() == null? null: (String)arg.getDefaultValue().getValue()); + argUpsert.setString(8, arg.getMinValue() == null? null: (String)arg.getMinValue().getValue()); + argUpsert.setString(9, arg.getMaxValue() == null? null: (String)arg.getMaxValue().getValue()); + argUpsert.execute(); + } + private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK) throws SQLException { try { ColumnName columnDefName = def.getColumnDefName(); @@ -954,7 +1104,7 @@ public class MetaDataClient { } while (true) { try { - ColumnResolver resolver = FromCompiler.getResolver(statement, connection); + ColumnResolver resolver = FromCompiler.getResolver(statement, connection, statement.getUdfParseNodes()); tableRef = resolver.getTables().get(0); PTable dataTable = tableRef.getTable(); boolean isTenantConnection = connection.getTenantId() != null; @@ -1198,6 +1348,57 @@ public class MetaDataClient { return new MutationState(1, connection); } + public MutationState createFunction(CreateFunctionStatement stmt) throws SQLException { + boolean wasAutoCommit = connection.getAutoCommit(); + connection.rollback(); + try { + PFunction function = new PFunction(stmt.getFunctionInfo(), stmt.isTemporary()); + connection.setAutoCommit(false); + String tenantIdStr = connection.getTenantId() == null ? null : connection.getTenantId().getString(); + List<Mutation> functionData = Lists.newArrayListWithExpectedSize(function.getFunctionArguments().size() + 1); + + List<FunctionArgument> args = function.getFunctionArguments(); + PreparedStatement argUpsert = connection.prepareStatement(INSERT_FUNCTION_ARGUMENT); + + for (int i = 0; i < args.size(); i++) { + FunctionArgument arg = args.get(i); + addFunctionArgMutation(function.getFunctionName(), arg, argUpsert, i); + } + functionData.addAll(connection.getMutationState().toMutations().next().getSecond()); + connection.rollback(); + + PreparedStatement functionUpsert = connection.prepareStatement(CREATE_FUNCTION); + functionUpsert.setString(1, tenantIdStr); + functionUpsert.setString(2, function.getFunctionName()); + functionUpsert.setInt(3, function.getFunctionArguments().size()); + functionUpsert.setString(4, function.getClassName()); + functionUpsert.setString(5, function.getJarPath()); + functionUpsert.setString(6, function.getReturnType()); + functionUpsert.execute(); + functionData.addAll(connection.getMutationState().toMutations().next().getSecond()); + connection.rollback(); + MetaDataMutationResult result = connection.getQueryServices().createFunction(functionData, function, stmt.isTemporary()); + MutationCode code = result.getMutationCode(); + switch(code) { + case FUNCTION_ALREADY_EXISTS: + throw new FunctionAlreadyExistsException(function.getFunctionName(), result + .getFunctions().get(0)); + case NEWER_FUNCTION_FOUND: + // Add function to ConnectionQueryServices so it's cached, but don't add + // it to this connection as we can't see it. + throw new NewerFunctionAlreadyExistsException(function.getFunctionName(), result.getFunctions().get(0)); + default: + List<PFunction> functions = new ArrayList<PFunction>(1); + functions.add(function); + result = new MetaDataMutationResult(code, result.getMutationTime(), functions, true); + addFunctionToCache(result); + } + } finally { + connection.setAutoCommit(wasAutoCommit); + } + return new MutationState(1, connection); + } + private static ColumnDef findColumnDefOrNull(List<ColumnDef> colDefs, ColumnName colName) { for (ColumnDef colDef : colDefs) { if (colDef.getColumnDefName().getColumnName().equals(colName.getColumnName())) { @@ -1807,6 +2008,10 @@ public class MetaDataClient { return dropTable(schemaName, tableName, null, statement.getTableType(), statement.ifExists(), statement.cascade()); } + public MutationState dropFunction(DropFunctionStatement statement) throws SQLException { + return dropFunction(statement.getFunctionName(), statement.ifExists()); + } + public MutationState dropIndex(DropIndexStatement statement) throws SQLException { String schemaName = statement.getTableName().getSchemaName(); String tableName = statement.getIndexName().getName(); @@ -1814,6 +2019,46 @@ public class MetaDataClient { return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false); } + private MutationState dropFunction(String functionName, + boolean ifExists) throws SQLException { + connection.rollback(); + boolean wasAutoCommit = connection.getAutoCommit(); + try { + PName tenantId = connection.getTenantId(); + byte[] key = + SchemaUtil.getFunctionKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY + : tenantId.getBytes(), Bytes.toBytes(functionName)); + Long scn = connection.getSCN(); + long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + try { + PFunction function = connection.getMetaDataCache().getFunction(new PTableKey(tenantId, functionName)); + if (function.isTemporaryFunction()) { + connection.removeFunction(tenantId, functionName, clientTimeStamp); + return new MutationState(0, connection); + } + } catch(FunctionNotFoundException e) { + + } + List<Mutation> functionMetaData = Lists.newArrayListWithExpectedSize(2); + Delete functionDelete = new Delete(key, clientTimeStamp); + functionMetaData.add(functionDelete); + MetaDataMutationResult result = connection.getQueryServices().dropFunction(functionMetaData, ifExists); + MutationCode code = result.getMutationCode(); + switch (code) { + case FUNCTION_NOT_FOUND: + if (!ifExists) { + throw new FunctionNotFoundException(functionName); + } + break; + default: + connection.removeFunction(tenantId, functionName, result.getMutationTime()); + break; + } + return new MutationState(0, connection); + } finally { + connection.setAutoCommit(wasAutoCommit); + } + } private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType, boolean ifExists, boolean cascade) throws SQLException { connection.rollback(); @@ -2658,7 +2903,14 @@ public class MetaDataClient { connection.addTable(table); return table; } - + + private List<PFunction> addFunctionToCache(MetaDataMutationResult result) throws SQLException { + for(PFunction function: result.getFunctions()) { + connection.addFunction(function); + } + return result.getFunctions(); + } + private void throwIfAlteringViewPK(ColumnDef col, PTable table) throws SQLException { if (col != null && col.isPK() && table.getType() == PTableType.VIEW) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MODIFY_VIEW_PK)
