http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index af6c712..78f54e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -63,6 +63,7 @@ import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
 import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
@@ -688,7 +689,7 @@ public class JoinCompiler {
             if (isSubselect())
                 return 
SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, 
preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
 
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, 
selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, 
select.hasSequence(), Collections.<SelectStatement>emptyList());
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, 
selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, 
select.hasSequence(), Collections.<SelectStatement>emptyList(), 
select.getUdfParseNodes());
         }
 
         public boolean hasFilters() {
@@ -1177,7 +1178,7 @@ public class JoinCompiler {
             TableRef tableRef = table.getTableRef();
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? 
select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? 
select.getOrderBy() : null;
-            SelectStatement stmt = 
getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), 
tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, 
orderBy, table.isWildCardSelect(), select.hasSequence());
+            SelectStatement stmt = 
getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), 
tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, 
orderBy, table.isWildCardSelect(), select.hasSequence(), 
select.getUdfParseNodes());
             QueryPlan plan = 
statement.getConnection().getQueryServices().getOptimizer().optimize(statement, 
stmt);
             if (!plan.getTableRef().equals(tableRef)) {
                 replacement.put(tableRef, plan.getTableRef());
@@ -1247,7 +1248,7 @@ public class JoinCompiler {
     }
 
     private static SelectStatement getSubqueryForOptimizedPlan(HintNode 
hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, 
ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
-            List<OrderByNode> orderBy, boolean isWildCardSelect, boolean 
hasSequence) {
+            List<OrderByNode> orderBy, boolean isWildCardSelect, boolean 
hasSequence, Map<String, UDFParseNode> udfParseNodes) {
         String schemaName = tableRef.getTable().getSchemaName().getString();
         TableName tName = TableName.create(schemaName.length() == 0 ? null : 
schemaName, tableRef.getTable().getTableName().getString());
         List<AliasedNode> selectList = new ArrayList<AliasedNode>();
@@ -1267,7 +1268,7 @@ public class JoinCompiler {
         String tableAlias = tableRef.getTableAlias();
         TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : 
'"' + tableAlias + '"', tName, dynamicCols);
 
-        return NODE_FACTORY.select(from, hintNode, false, selectList, where, 
groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, 
Collections.<SelectStatement>emptyList());
+        return NODE_FACTORY.select(from, hintNode, false, selectList, where, 
groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, 
Collections.<SelectStatement>emptyList(), udfParseNodes);
     }
 
     public static PTable joinProjectedTables(PTable left, PTable right, 
JoinType type) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0c586f0..fcbeb7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -34,6 +34,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -123,6 +124,11 @@ public class PostDDLCompiler {
                             public List<TableRef> getTables() {
                                 return Collections.singletonList(tableRef);
                             }
+                            
+                            public java.util.List<PFunction> getFunctions() {
+                                return Collections.emptyList();
+                            };
+                            
                             @Override
                             public TableRef resolveTable(String schemaName, 
String tableName)
                                     throws SQLException {
@@ -135,6 +141,14 @@ public class PostDDLCompiler {
                                         : 
tableRef.getTable().getColumn(colName);
                                 return new ColumnRef(tableRef, 
column.getPosition());
                             }
+                            
+                            public PFunction resolveFunction(String 
functionName) throws SQLException {
+                                throw new UnsupportedOperationException();
+                            };
+
+                            public boolean hasUDFs() {
+                                return false;
+                            };
                         };
                         PhoenixStatement statement = new 
PhoenixStatement(connection);
                         StatementContext context = new 
StatementContext(statement, resolver, scan, new SequenceManager(statement));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index e84ca2a..c39db09 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -458,7 +458,7 @@ public class ProjectionCompiler {
                 projectColumnFamily(table, scan, family);
             }
         }
-        return new RowProjector(projectedColumns, estimatedByteSize, 
isProjectEmptyKeyValue);
+        return new RowProjector(projectedColumns, estimatedByteSize, 
isProjectEmptyKeyValue, resolver.hasUDFs());
     }
 
     private static void projectAllColumnFamilies(PTable table, Scan scan) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 90891dc..58ae4b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -62,6 +63,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -240,13 +242,13 @@ public class QueryCompiler {
                 context.setCurrentTable(table.getTableRef());
                 PTable projectedTable = 
table.createProjectedTable(!projectPKColumns, context);
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), 
new TupleProjector(projectedTable));
-                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), subquery.getUdfParseNodes()));
                 table.projectColumns(context.getScan());
                 return compileSingleQuery(context, subquery, binds, 
asSubquery, !asSubquery);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = 
table.createProjectedTable(plan.getProjector());
-            
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+            
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), subquery.getUdfParseNodes()));
             return new TupleProjectionPlan(plan, new 
TupleProjector(plan.getProjector()), 
table.compilePostFilterExpression(context));
         }
         
@@ -295,7 +297,7 @@ public class QueryCompiler {
                 } else {
                     tables[i] = null;
                 }
-                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // 
place-holder
                 Pair<List<Expression>, List<Expression>> joinConditions = 
joinSpec.compileJoinConditions(context, subContext, true);
                 joinExpressions[i] = joinConditions.getFirst();
@@ -354,7 +356,7 @@ public class QueryCompiler {
                 tupleProjector = new TupleProjector(plan.getProjector());
             }
             context.setCurrentTable(rhsTableRef);
-            
context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable));
+            
context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, 
context.getConnection(), rhs.getUdfParseNodes()));
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new 
ImmutableBytesPtr(emptyByteArray)};
             Pair<List<Expression>, List<Expression>> joinConditions = 
lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
             List<Expression> joinExpressions = joinConditions.getSecond();
@@ -364,7 +366,7 @@ public class QueryCompiler {
             int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() 
- rhsProjTable.getPKColumns().size() : 0;
             PTable projectedTable = needsMerge ? 
JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right 
? JoinType.Left : type) : rhsProjTable;
             TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
-            
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+            
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), rhs.getUdfParseNodes()));
             QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, 
asSubquery, !asSubquery && type == JoinType.Right);
             Expression postJoinFilterExpression = 
joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
@@ -413,7 +415,7 @@ public class QueryCompiler {
         int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - 
lhsProjTable.getPKColumns().size() : 0;
         PTable projectedTable = needsMerge ? 
JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == 
JoinType.Right ? JoinType.Left : type) : lhsProjTable;
 
-        ColumnResolver resolver = 
FromCompiler.getResolverForProjectedTable(projectedTable);
+        ColumnResolver resolver = 
FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), new HashMap<String,UDFParseNode>(1));
         TableRef tableRef = resolver.getTables().get(0);
         StatementContext subCtx = new StatementContext(statement, resolver, 
ScanUtil.newScan(originalScan), new SequenceManager(statement));
         subCtx.setCurrentTable(tableRef);
@@ -422,7 +424,7 @@ public class QueryCompiler {
         context.setResolver(resolver);
         TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), 
NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), 
tableRef.getTable().getTableName().getString()));
         ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, 
joinTable.getStatement().getHint(), false, Collections.<AliasedNode> 
emptyList(), where, null, null, orderBy, null, 0, false, 
joinTable.getStatement().hasSequence(), 
Collections.<SelectStatement>emptyList())
+        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, 
joinTable.getStatement().getHint(), false, Collections.<AliasedNode> 
emptyList(), where, null, null, orderBy, null, 0, false, 
joinTable.getStatement().hasSequence(), 
Collections.<SelectStatement>emptyList(), 
joinTable.getStatement().getUdfParseNodes())
                 : NODE_FACTORY.select(joinTable.getStatement(), from, where);
         
         return compileSingleFlatQuery(context, select, binds, asSubquery, 
false, innerPlan, null, isInRowKeyOrder);
@@ -505,7 +507,7 @@ public class QueryCompiler {
         if (this.projectTuples) {
             projectedTable = 
TupleProjectionCompiler.createProjectedTable(select, context);
             if (projectedTable != null) {
-                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable));
+                
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, 
context.getConnection(), select.getUdfParseNodes()));
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
index 1b35e92..c60933e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -52,9 +52,10 @@ public class RowProjector {
     private final int estimatedSize;
     private final boolean isProjectEmptyKeyValue;
     private final boolean cloneRequired;
+    private final boolean hasUDFs;
     
     public RowProjector(RowProjector projector, boolean 
isProjectEmptyKeyValue) {
-        this(projector.getColumnProjectors(), 
projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
+        this(projector.getColumnProjectors(), 
projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue, projector.hasUDFs);
     }
     /**
      * Construct RowProjector based on a list of ColumnProjectors.
@@ -64,6 +65,18 @@ public class RowProjector {
      * @param estimatedRowSize 
      */
     public RowProjector(List<? extends ColumnProjector> columnProjectors, int 
estimatedRowSize, boolean isProjectEmptyKeyValue) {
+        this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, 
false);
+    }
+    /**
+     * Construct RowProjector based on a list of ColumnProjectors.
+     * @param columnProjectors ordered list of ColumnProjectors corresponding 
to projected columns in SELECT clause
+     * aggregating coprocessor. Only required in the case of an aggregate 
query with a limit clause and otherwise may
+     * be null.
+     * @param estimatedRowSize 
+     * @param isProjectEmptyKeyValue
+     * @param hasUDFs
+     */
+    public RowProjector(List<? extends ColumnProjector> columnProjectors, int 
estimatedRowSize, boolean isProjectEmptyKeyValue, boolean hasUDFs) {
         this.columnProjectors = Collections.unmodifiableList(columnProjectors);
         int position = columnProjectors.size();
         reverseIndex = ArrayListMultimap.<String, Integer>create();
@@ -82,15 +95,18 @@ public class RowProjector {
         this.someCaseSensitive = someCaseSensitive;
         this.estimatedSize = estimatedRowSize;
         this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
+        this.hasUDFs = hasUDFs;
         boolean hasPerInvocationExpression = false;
-        for (int i = 0; i < this.columnProjectors.size(); i++) {
-            Expression expression = 
this.columnProjectors.get(i).getExpression();
-            if (expression.getDeterminism() == Determinism.PER_INVOCATION) {
-                hasPerInvocationExpression = true;
-                break;
+        if (!hasUDFs) {
+            for (int i = 0; i < this.columnProjectors.size(); i++) {
+                Expression expression = 
this.columnProjectors.get(i).getExpression();
+                if (expression.getDeterminism() == Determinism.PER_INVOCATION) 
{
+                    hasPerInvocationExpression = true;
+                    break;
+                }
             }
         }
-        this.cloneRequired = hasPerInvocationExpression;
+        this.cloneRequired = hasPerInvocationExpression || hasUDFs;
     }
 
     public RowProjector cloneIfNecessary() {
@@ -114,7 +130,7 @@ public class RowProjector {
         }
         return new RowProjector(clonedColProjectors, 
                 this.getEstimatedRowByteSize(),
-                this.isProjectEmptyKeyValue());
+                this.isProjectEmptyKeyValue(), this.hasUDFs);
     }
 
     public boolean isProjectEmptyKeyValue() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index b9897b1..9b54c86 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ComparisonParseNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
 import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.LessThanOrEqualParseNode;
 import org.apache.phoenix.parse.NamedTableNode;
@@ -99,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
             if (selectNodes != normSelectNodes) {
                 statement = NODE_FACTORY.select(statement.getFrom(), 
statement.getHint(), statement.isDistinct(),
                         normSelectNodes, statement.getWhere(), 
statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(),
-                        statement.getLimit(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(), statement.getSelects());
+                        statement.getLimit(), statement.getBindCount(), 
statement.isAggregate(), statement.hasSequence(), statement.getSelects(), 
statement.getUdfParseNodes());
             }
         }
         
@@ -151,7 +152,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
          }
          return super.visitLeave(node, nodes);
     }
-    
+
     @Override
     public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> 
nodes) throws SQLException {
        

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 1746d8a..123cb6a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -341,7 +341,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 groupbyNodes.set(i - 1, aliasedNode.getNode());
             }
             SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, 
subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
-            subquery = 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, 
derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, 
null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), 
true, false, Collections.<SelectStatement>emptyList());
+            subquery = 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, 
derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, 
null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), 
true, false, Collections.<SelectStatement>emptyList(), 
subquery.getUdfParseNodes());
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -364,7 +364,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             return select;
         
         // Wrap as a derived table.
-        return 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(),
 select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, 
null, null, null, select.getBindCount(), false, false, 
Collections.<SelectStatement> emptyList());
+        return 
NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(),
 select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, 
null, null, null, select.getBindCount(), false, false, 
Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
     }
     
     private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean 
addSelectOne) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 6862802..5a91a17 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -205,7 +205,7 @@ public class SubselectRewriter extends ParseNodeRewriter {
         }
         
         return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, 
isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, 
-            havingRewrite, orderByRewrite, limitRewrite, 
select.getBindCount(), isAggregateRewrite, select.hasSequence(), 
select.getSelects());
+            havingRewrite, orderByRewrite, limitRewrite, 
select.getBindCount(), isAggregateRewrite, select.hasSequence(), 
select.getSelects(), select.getUdfParseNodes());
     }
     
     private SelectStatement applyPostFilters(SelectStatement statement, 
List<ParseNode> postFilters) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 184387b..e036bc7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -51,6 +51,17 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
 import static org.apache.phoenix.util.SchemaUtil.getVarChars;
@@ -61,6 +72,8 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -91,22 +104,29 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -115,7 +135,10 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.parse.PFunction;
+import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -123,6 +146,7 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
@@ -149,6 +173,7 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -156,6 +181,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -221,6 +247,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
     }
+
     private static final int TABLE_TYPE_INDEX = 
TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
     private static final int TABLE_SEQ_NUM_INDEX = 
TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
     private static final int COLUMN_COUNT_INDEX = 
TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
@@ -278,6 +305,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     
     private static final int LINK_TYPE_INDEX = 0;
 
+    private static final KeyValue CLASS_NAME_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
CLASS_NAME_BYTES);
+    private static final KeyValue JAR_PATH_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
JAR_PATH_BYTES);
+    private static final KeyValue RETURN_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
RETURN_TYPE_BYTES);
+    private static final KeyValue NUM_ARGS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
NUM_ARGS_BYTES);
+    private static final KeyValue TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TYPE_BYTES);
+    private static final KeyValue IS_CONSTANT_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_CONSTANT_BYTES);
+    private static final KeyValue DEFAULT_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DEFAULT_VALUE_BYTES);
+    private static final KeyValue MIN_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
MIN_VALUE_BYTES);
+    private static final KeyValue MAX_VALUE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
MAX_VALUE_BYTES);
+    private static final KeyValue IS_ARRAY_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_ARRAY_BYTES);
+
+    private static final List<KeyValue> FUNCTION_KV_COLUMNS = 
Arrays.<KeyValue>asList(
+        EMPTY_KEYVALUE_KV,
+        CLASS_NAME_KV,
+        JAR_PATH_KV,
+        RETURN_TYPE_KV,
+        NUM_ARGS_KV
+        );
+    static {
+        Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    
+    private static final int CLASS_NAME_INDEX = 
FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
+    private static final int JAR_PATH_INDEX = 
FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
+    private static final int RETURN_TYPE_INDEX = 
FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
+    private static final int NUM_ARGS_INDEX = 
FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV);
+
+    private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = 
Arrays.<KeyValue>asList(
+        TYPE_KV,
+        IS_ARRAY_KV,
+        IS_CONSTANT_KV,
+        DEFAULT_VALUE_KV,
+        MIN_VALUE_KV,
+        MAX_VALUE_KV
+        );
+    static {
+        Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    
+    private static final int TYPE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(TYPE_KV);
+    private static final int IS_ARRAY_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
+    private static final int IS_CONSTANT_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
+    private static final int DEFAULT_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
+    private static final int MIN_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
+    private static final int MAX_VALUE_INDEX = 
FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);
+    
     private static PName newPName(byte[] keyBuffer, int keyOffset, int 
keyLength) {
         if (keyLength <= 0) {
             return null;
@@ -369,9 +442,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
         RegionScanner scanner = region.getScanner(scan);
 
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
         try {
-            PTable oldTable = metaDataCache.getIfPresent(cacheKey);
+            PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
             long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : 
oldTable.getTimeStamp();
             PTable newTable;
             newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
@@ -394,6 +467,48 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         }
     }
 
+    private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region,
+            long clientTimeStamp) throws IOException, SQLException {
+        List<KeyRange> keyRanges = 
Lists.newArrayListWithExpectedSize(keys.size());
+        for (byte[] key : keys) {
+            byte[] stopKey = ByteUtil.concat(key, 
QueryConstants.SEPARATOR_BYTE_ARRAY);
+            ByteUtil.nextKey(stopKey, stopKey.length);
+            keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, 
false));
+        }
+        Scan scan = new Scan();
+        scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
+        ScanRanges scanRanges =
+                ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA,
+                    Collections.singletonList(keyRanges), 
ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+        scanRanges.initializeScan(scan);
+        scan.setFilter(scanRanges.getSkipScanFilter());
+
+        RegionScanner scanner = region.getScanner(scan);
+
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        List<PFunction> functions = new ArrayList<PFunction>();
+        PFunction function = null;
+        try {
+            for(int i = 0; i< keys.size(); i++) {
+                function = null;
+                function = getFunction(scanner);
+                if (function == null) {
+                    return null;
+                }
+                byte[] functionKey =
+                        SchemaUtil.getFunctionKey(
+                            function.getTenantId() == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : function
+                                    .getTenantId().getBytes(), 
Bytes.toBytes(function
+                                    .getFunctionName()));
+                metaDataCache.put(new FunctionBytesPtr(functionKey), function);
+                functions.add(function);
+            }
+            return functions;
+        } finally {
+            scanner.close();
+        }
+    }
+
     private void addIndexToTable(PName tenantId, PName schemaName, PName 
indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws 
IOException, SQLException {
         byte[] key = SchemaUtil.getTableKey(tenantId == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), 
indexName.getBytes());
         PTable indexTable = doGetTable(key, clientTimeStamp);
@@ -473,6 +588,61 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced, expressionStr);
         columns.add(column);
     }
+    
+    private void addArgumentToFunction(List<Cell> results, PName functionName, 
PName type,
+        Cell[] functionKeyValues, List<FunctionArgument> arguments, short 
argPosition) {
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), 
searchKv.getQualifierLength());
+            if (cmp == 0) {
+                functionKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                functionKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table 
column row
+            }
+        }
+
+        Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX];
+        boolean isArrayType =
+                isArrayKv == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+                    isArrayKv.getValueArray(), isArrayKv.getValueOffset(),
+                    isArrayKv.getValueLength()));
+        Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX];
+        boolean isConstant =
+                isConstantKv == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
+                    isConstantKv.getValueArray(), 
isConstantKv.getValueOffset(),
+                    isConstantKv.getValueLength()));
+        Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX];
+        String defaultValue =
+                defaultValueKv == null ? null : (String) 
PVarchar.INSTANCE.toObject(
+                    defaultValueKv.getValueArray(), 
defaultValueKv.getValueOffset(),
+                    defaultValueKv.getValueLength());
+        Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX];
+        String minValue =
+                minValueKv == null ? null : (String) 
PVarchar.INSTANCE.toObject(
+                    minValueKv.getValueArray(), minValueKv.getValueOffset(),
+                    minValueKv.getValueLength());
+        Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX];
+        String maxValue =
+                maxValueKv == null ? null : (String) 
PVarchar.INSTANCE.toObject(
+                    maxValueKv.getValueArray(), maxValueKv.getValueOffset(),
+                    maxValueKv.getValueLength());
+        FunctionArgument arg =
+                new FunctionArgument(type.getString(), isArrayType, isConstant,
+                        defaultValue == null ? null : 
LiteralExpression.newConstant(defaultValue),
+                        minValue == null ? null : 
LiteralExpression.newConstant(minValue),
+                        maxValue == null ? null : 
LiteralExpression.newConstant(maxValue),
+                        argPosition);
+        arguments.add(arg);
+    }
 
     private PTable getTable(RegionScanner scanner, long clientTimeStamp, long 
tableTimeStamp)
         throws IOException, SQLException {
@@ -646,6 +816,106 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             disableWAL, multiTenant, storeNulls, viewType, viewIndexId, 
indexType, stats);
     }
 
+    private PFunction getFunction(RegionScanner scanner)
+            throws IOException, SQLException {
+        List<Cell> results = Lists.newArrayList();
+        scanner.next(results);
+        if (results.isEmpty()) {
+            return null;
+        }
+        Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()];
+        Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()];
+
+        // Create PFunction based on KeyValues from scan
+        Cell keyValue = results.get(0);
+        byte[] keyBuffer = keyValue.getRowArray();
+        int keyLength = keyValue.getRowLength();
+        int keyOffset = keyValue.getRowOffset();
+        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
+        int tenantIdLength = (tenantId == null) ? 0 : 
tenantId.getBytes().length;
+        if (tenantIdLength == 0) {
+            tenantId = null;
+        }
+        PName functionName =
+                newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength 
- tenantIdLength - 1);
+        int functionNameLength = functionName.getBytes().length+1;
+        int offset = tenantIdLength + functionNameLength + 1;
+
+        long timeStamp = keyValue.getTimestamp();
+
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = FUNCTION_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), 
searchKv.getQualifierLength());
+            if (cmp == 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find 
max timestamp of table
+                                                                    // header 
row
+                functionKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp());
+                functionKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table 
header row
+            }
+        }
+        // CLASS_NAME,NUM_ARGS and JAR_PATH are required.
+        if (functionKeyValues[CLASS_NAME_INDEX] == null || 
functionKeyValues[NUM_ARGS_INDEX] == null) {
+            throw new IllegalStateException(
+                    "Didn't find expected key values for function row in 
metadata row");
+        }
+
+        Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX];
+        PName className = newPName(classNameKv.getValueArray(), 
classNameKv.getValueOffset(),
+            classNameKv.getValueLength());
+        Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX];
+        PName jarPath = null;
+        if(jarPathKv != null) {
+            jarPath = newPName(jarPathKv.getValueArray(), 
jarPathKv.getValueOffset(),
+                jarPathKv.getValueLength());
+        }
+        Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX];
+        int numArgs =
+                
PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(),
+                    numArgsKv.getValueOffset(), SortOrder.getDefault());
+        Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX];
+        PName returnType =
+                returnTypeKv == null ? null : 
newPName(returnTypeKv.getValueArray(),
+                    returnTypeKv.getValueOffset(), 
returnTypeKv.getValueLength());
+
+        List<FunctionArgument> arguments = 
Lists.newArrayListWithExpectedSize(numArgs);
+        for (int k = 0; k < numArgs; k++) {
+            results.clear();
+            scanner.next(results);
+            if (results.isEmpty()) {
+                break;
+            }
+            Cell typeKv = results.get(0);
+            int typeKeyLength = typeKv.getRowLength();
+            PName typeName =
+                    newPName(typeKv.getRowArray(), typeKv.getRowOffset() + 
offset, typeKeyLength
+                            - offset - 3);
+            
+            int argPositionOffset =  offset + typeName.getBytes().length + 1;
+            short argPosition = Bytes.toShort(typeKv.getRowArray(), 
typeKv.getRowOffset() + argPositionOffset, typeKeyLength
+                - argPositionOffset);
+            addArgumentToFunction(results, functionName, typeName, 
functionArgKeyValues, arguments, argPosition);
+        }
+        Collections.sort(arguments, new Comparator<FunctionArgument>() {
+            @Override
+            public int compare(FunctionArgument o1, FunctionArgument o2) {
+                return o1.getArgPosition() - o2.getArgPosition();
+            }
+        });
+        return new PFunction(tenantId, functionName.getString(), arguments, 
returnType.getString(),
+                className.getString(), jarPath == null ? null : 
jarPath.getString(), timeStamp);
+    }
+    
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, 
HRegion region,
         long clientTimeStamp) throws IOException {
         if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
@@ -663,7 +933,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         if (!results.isEmpty() && results.get(0).getTimestamp() > 
clientTimeStamp) {
             Cell kv = results.get(0);
             if (kv.getTypeByte() == Type.Delete.getCode()) {
-                Cache<ImmutableBytesPtr, PTable> metaDataCache =
+                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                         GlobalCache.getInstance(this.env).getMetaDataCache();
                 PTable table = newDeletedTableMarker(kv.getTimestamp());
                 metaDataCache.put(cacheKey, table);
@@ -673,20 +943,57 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         return null;
     }
 
+    
+    private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr 
cacheKey, HRegion region,
+        long clientTimeStamp) throws IOException {
+        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+            return null;
+        }
+
+        Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP);
+        scan.setFilter(new FirstKeyOnlyFilter());
+        scan.setRaw(true);
+        List<Cell> results = Lists.<Cell> newArrayList();
+        try (RegionScanner scanner = region.getScanner(scan);) {
+          scanner.next(results);
+        }
+        // HBase ignores the time range on a raw scan (HBASE-7362)
+        if (!results.isEmpty() && results.get(0).getTimestamp() > 
clientTimeStamp) {
+            Cell kv = results.get(0);
+            if (kv.getTypeByte() == Type.Delete.getCode()) {
+                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
+                PFunction function = 
newDeletedFunctionMarker(kv.getTimestamp());
+                metaDataCache.put(cacheKey, function);
+                return function;
+            }
+        }
+        return null;
+    }
+
+
     private static PTable newDeletedTableMarker(long timestamp) {
         return new PTableImpl(timestamp);
     }
 
+    private static PFunction newDeletedFunctionMarker(long timestamp) {
+        return new PFunction(timestamp);
+    }
+
     private static boolean isTableDeleted(PTable table) {
         return table.getName() == null;
     }
 
+    private static boolean isFunctionDeleted(PFunction function) {
+        return function.getFunctionName() == null;
+    }
+
     private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
         ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
         throws IOException, SQLException {
         HRegion region = env.getRegion();
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We always cache the latest version - fault in if not in cache
         if (table != null || (table = buildTable(key, cacheKey, region, 
asOfTimeStamp)) != null) {
             return table;
@@ -700,6 +1007,29 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         return null;
     }
 
+    private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] 
key,
+            ImmutableBytesPtr cacheKey, long clientTimeStamp, long 
asOfTimeStamp)
+            throws IOException, SQLException {
+            HRegion region = env.getRegion();
+            Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+            PFunction function = 
(PFunction)metaDataCache.getIfPresent(cacheKey);
+            // We always cache the latest version - fault in if not in cache
+            if (function != null) {
+                return function;
+            }
+            ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
+            arrayList.add(key);
+            List<PFunction> functions = buildFunctions(arrayList, region, 
asOfTimeStamp);
+            if(functions != null) return functions.get(0);
+            // if not found then check if newer table already exists and add 
delete marker for timestamp
+            // found
+            if (function == null
+                    && (function = buildDeletedFunction(key, cacheKey, region, 
clientTimeStamp)) != null) {
+                return function;
+            }
+            return null;
+        }
+
 
     @Override
     public void createTable(RpcController controller, CreateTableRequest 
request,
@@ -801,7 +1131,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                 if (parentCacheKey != null) {
                     metaDataCache.invalidate(parentCacheKey);
                 }
@@ -949,7 +1279,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                 // Commit the list of deletion.
                 region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
@@ -983,8 +1313,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         HRegion region = env.getRegion();
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
 
-        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
 
         // We always cache the latest version - fault in if not in cache
         if (table != null
@@ -1131,8 +1461,8 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                 List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>();
                 invalidateList.add(cacheKey);
-                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
-                PTable table = metaDataCache.getIfPresent(cacheKey);
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
                 if (logger.isDebugEnabled()) {
                     if (table == null) {
                         logger.debug("Table " + Bytes.toStringBinary(key)
@@ -1298,9 +1628,9 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
 
     private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock 
rowLock) throws IOException, SQLException {
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-        Cache<ImmutableBytesPtr, PTable> metaDataCache =
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
-        PTable table = metaDataCache.getIfPresent(cacheKey);
+        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We only cache the latest, so we'll end up building the table with 
every call if the
         // client connection has specified an SCN.
         // TODO: If we indicate to the client that we're returning an older 
version, but there's a
@@ -1332,7 +1662,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         }
         try {
             // Try cache again in case we were waiting on a lock
-            table = metaDataCache.getIfPresent(cacheKey);
+            table = (PTable)metaDataCache.getIfPresent(cacheKey);
             // We only cache the latest, so we'll end up building the table 
with every call if the
             // client connection has specified an SCN.
             // TODO: If we indicate to the client that we're returning an 
older version, but there's
@@ -1357,6 +1687,64 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         }
     }
 
+    private List<PFunction> doGetFunctions(List<byte[]> keys, long 
clientTimeStamp) throws IOException, SQLException {
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
+        HRegion region = env.getRegion();
+        Collections.sort(keys, new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                return Bytes.compareTo(o1, o2);
+            }
+        });
+        /*
+         * Lock directly on key, though it may be an index table. This will 
just prevent a table
+         * from getting rebuilt too often.
+         */
+        List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());;
+        try {
+            rowLocks = new ArrayList<HRegion.RowLock>(keys.size());
+            for (int i = 0; i < keys.size(); i++) {
+                HRegion.RowLock rowLock = region.getRowLock(keys.get(i));
+                if (rowLock == null) {
+                    throw new IOException("Failed to acquire lock on "
+                            + Bytes.toStringBinary(keys.get(i)));
+                }
+                rowLocks.add(rowLock);
+            }
+
+            List<PFunction> functionsAvailable = new 
ArrayList<PFunction>(keys.size());
+            int numFunctions = keys.size(); 
+            Iterator<byte[]> iterator = keys.iterator();
+            while(iterator.hasNext()) {
+                byte[] key = iterator.next();
+                PFunction function = (PFunction)metaDataCache.getIfPresent(new 
FunctionBytesPtr(key));
+                if (function != null && function.getTimeStamp() < 
clientTimeStamp) {
+                    if (isFunctionDeleted(function)) {
+                        return null;
+                    }
+                    functionsAvailable.add(function);
+                    iterator.remove();
+                }
+            }
+            if(functionsAvailable.size() == numFunctions) return 
functionsAvailable;
+
+            // Query for the latest table first, since it's not cached
+            List<PFunction> buildFunctions = buildFunctions(keys, region, 
clientTimeStamp);
+            if(buildFunctions == null || buildFunctions.isEmpty()) {
+                return null;
+            }
+            functionsAvailable.addAll(buildFunctions);
+            if(functionsAvailable.size() == numFunctions) return 
functionsAvailable;
+            return null;
+        } finally {
+            for (HRegion.RowLock lock : rowLocks) {
+                lock.release();
+            }
+            rowLocks.clear();
+        }
+    }
+
     @Override
     public void dropColumn(RpcController controller, DropColumnRequest request,
             RpcCallback<MetaDataResponse> done) {
@@ -1477,7 +1865,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
     public void clearCache(RpcController controller, ClearCacheRequest request,
             RpcCallback<ClearCacheResponse> done) {
         GlobalCache cache = GlobalCache.getInstance(this.env);
-        Cache<ImmutableBytesPtr, PTable> metaDataCache =
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
         metaDataCache.invalidateAll();
         cache.clearTenantCache();
@@ -1633,7 +2021,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                     }
                     region.mutateRowsWithLocks(tableMetadata, 
Collections.<byte[]> emptySet());
                     // Invalidate from cache
-                    Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                    Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                     metaDataCache.invalidate(cacheKey);
                     if(dataTableKey != null) {
                         metaDataCache.invalidate(new 
ImmutableBytesPtr(dataTableKey));
@@ -1668,6 +2056,18 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 EnvironmentEdgeManager.currentTimeMillis(), null);
     }
 
+    private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, 
HRegion region) {
+        byte[] startKey = region.getStartKey();
+        byte[] endKey = region.getEndKey();
+        if (Bytes.compareTo(startKey, key) <= 0
+                && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || 
Bytes.compareTo(key,
+                    endKey) < 0)) {
+            return null; // normal case;
+        }
+        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_IN_REGION,
+                EnvironmentEdgeManager.currentTimeMillis(), null);
+    }
+
     /**
      * Certain operations, such as DROP TABLE are not allowed if there a table 
has child views.
      * This class wraps the Results of a scanning the Phoenix Metadata for 
child views for a specific table
@@ -1718,7 +2118,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
             byte[] tenantId = request.getTenantId().toByteArray();
             byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, 
tableName);
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
-            Cache<ImmutableBytesPtr, PTable> metaDataCache =
+            Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                     GlobalCache.getInstance(this.env).getMetaDataCache();
             metaDataCache.invalidate(cacheKey);
         } catch (Throwable t) {
@@ -1727,4 +2127,223 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                 
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), 
t));
         }
     }
+
+    public void getFunctions(RpcController controller, GetFunctionsRequest 
request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[] tenantId = request.getTenantId().toByteArray();
+        List<String> functionNames = new 
ArrayList<>(request.getFunctionNamesCount());
+        try {
+            HRegion region = env.getRegion();
+            List<ByteString> functionNamesList = 
request.getFunctionNamesList();
+            List<Long> functionTimestampsList = 
request.getFunctionTimestampsList();
+            List<byte[]> keys = new 
ArrayList<byte[]>(request.getFunctionNamesCount());
+            List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], 
Long>>(request.getFunctionNamesCount());
+            for(int i = 0; i< functionNamesList.size();i++) {
+                byte[] functionName = functionNamesList.get(i).toByteArray();
+                functionNames.add(Bytes.toString(functionName));
+                byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName);
+                MetaDataMutationResult result = checkFunctionKeyInRegion(key, 
region);
+                if (result != null) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
+                }
+                functions.add(new Pair<byte[], 
Long>(functionName,functionTimestampsList.get(i)));
+                keys.add(key);
+            }
+
+            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+            List<PFunction> functionsAvailable = doGetFunctions(keys, 
request.getClientTimestamp());
+            if (functionsAvailable == null) {
+                
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+                builder.setMutationTime(currentTime);
+                done.run(builder.build());
+                return;
+            }
+            
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+            builder.setMutationTime(currentTime);
+
+            for (PFunction function : functionsAvailable) {
+                builder.addFunction(PFunction.toProto(function));
+            }
+            done.run(builder.build());
+            return;
+        } catch (Throwable t) {
+            logger.error("getFunctions failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(functionNames.toString(), t));
+        }
+    }
+
+    @Override
+    public void createFunction(RpcController controller, CreateFunctionRequest 
request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[][] rowKeyMetaData = new byte[2][];
+        byte[] functionName = null;
+        try {
+            List<Mutation> functionMetaData = 
ProtobufUtil.getMutations(request);
+            boolean temporaryFunction = request.getTemporary();
+            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, 
rowKeyMetaData);
+            byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            functionName = 
rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, 
functionName);
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, 
region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
+            try {
+                acquireLock(region, lockKey, locks);
+                // Get as of latest timestamp so we can detect if we have a 
newer function that already
+                // exists without making an additional query
+                ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+                PFunction function =
+                        loadFunction(env, lockKey, cacheKey, clientTimeStamp, 
clientTimeStamp);
+                if (function != null) {
+                    if (function.getTimeStamp() < clientTimeStamp) {
+                        // If the function is older than the client time stamp 
and it's deleted,
+                        // continue
+                        if (!isFunctionDeleted(function)) {
+                            
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
+                            
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            builder.addFunction(PFunction.toProto(function));
+                            done.run(builder.build());
+                            return;
+                        }
+                    } else {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.addFunction(PFunction.toProto(function));
+                        done.run(builder.build());
+                        return;
+                    }
+                }
+                // Don't store function info for temporary functions.
+                if(!temporaryFunction) {
+                    region.mutateRowsWithLocks(functionMetaData, 
Collections.<byte[]> emptySet());
+                }
+
+                // Invalidate the cache - the next getFunction call will add it
+                // TODO: consider loading the function that was just created 
here, patching up the parent function, and updating the cache
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                metaDataCache.invalidate(cacheKey);
+                // Get timeStamp from mutations - the above method sets it if 
it's unset
+                long currentTimeStamp = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
+                
builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
+                builder.setMutationTime(currentTimeStamp);
+                done.run(builder.build());
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("createFunction failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(Bytes.toString(functionName), t));
+        }         
+    }
+
+    @Override
+    public void dropFunction(RpcController controller, DropFunctionRequest 
request,
+            RpcCallback<MetaDataResponse> done) {
+        byte[][] rowKeyMetaData = new byte[2][];
+        byte[] functionName = null;
+        try {
+            List<Mutation> functionMetaData = 
ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, 
rowKeyMetaData);
+            byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            functionName = 
rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
+            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, 
functionName);
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, 
region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
+            try {
+                acquireLock(region, lockKey, locks);
+                ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
+                List<byte[]> keys = new ArrayList<byte[]>(1);
+                keys.add(lockKey);
+                List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>();
+
+                result = doDropFunction(clientTimeStamp, keys, 
functionMetaData, invalidateList);
+                if (result.getMutationCode() != 
MutationCode.FUNCTION_ALREADY_EXISTS) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
+                }
+                region.mutateRowsWithLocks(functionMetaData, 
Collections.<byte[]> emptySet());
+
+                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                long currentTime = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
+                for(ImmutableBytesPtr ptr: invalidateList) {
+                    metaDataCache.invalidate(ptr);
+                    metaDataCache.put(ptr, 
newDeletedFunctionMarker(currentTime));
+                    
+                }
+
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("dropFunction failed", t);
+            ProtobufUtil.setControllerException(controller,
+                ServerUtil.createIOException(Bytes.toString(functionName), t));
+        }         
+    }
+
+    private MetaDataMutationResult doDropFunction(long clientTimeStamp, 
List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> 
invalidateList)
+            throws IOException, SQLException {
+        List<byte[]> keysClone = new ArrayList<byte[]>(keys);
+        List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp);
+        // We didn't find a table at the latest timestamp, so either there is 
no table or
+        // there was a table, but it's been deleted. In either case we want to 
return.
+        if (functions == null || functions.isEmpty()) {
+            if (buildDeletedFunction(keys.get(0), new 
FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) {
+                return new 
MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+            }
+            return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+        }
+
+        if (functions != null && !functions.isEmpty()) {
+            if (functions.get(0).getTimeStamp() < clientTimeStamp) {
+                // If the function is older than the client time stamp and 
it's deleted,
+                // continue
+                if (isFunctionDeleted(functions.get(0))) {
+                    return new 
MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
+                invalidateList.add(new FunctionBytesPtr(keys.get(0)));
+                HRegion region = env.getRegion();
+                Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), 
MIN_TABLE_TIMESTAMP, clientTimeStamp);
+                List<Cell> results = Lists.newArrayList();
+                try (RegionScanner scanner = region.getScanner(scan);) {
+                  scanner.next(results);
+                  if (results.isEmpty()) { // Should not be possible
+                    return new 
MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                  }
+                  do {
+                    Cell kv = results.get(0);
+                    Delete delete = new Delete(kv.getRowArray(), 
kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
+                    functionMetaData.add(delete);
+                    results.clear();
+                    scanner.next(results);
+                  } while (!results.isEmpty());
+                }
+                return new 
MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,
+                        EnvironmentEdgeManager.currentTimeMillis(), functions, 
true);
+            }
+        }
+        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
+                EnvironmentEdgeManager.currentTimeMillis(), null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa26c1cd/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 26a313d..cbbb3a0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -17,13 +17,16 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
@@ -86,6 +89,10 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
         UNALLOWED_TABLE_MUTATION,
         NO_PK_COLUMNS,
         PARENT_TABLE_NOT_FOUND,
+        FUNCTION_ALREADY_EXISTS,
+        FUNCTION_NOT_FOUND,
+        NEWER_FUNCTION_FOUND,
+        FUNCTION_NOT_IN_REGION,
         NO_OP
     };
 
@@ -97,6 +104,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
         private byte[] columnName;
         private byte[] familyName;
         private boolean wasUpdated;
+        private List<PFunction> functions = new ArrayList<PFunction>(1);
 
         public MetaDataMutationResult() {
         }
@@ -113,12 +121,19 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
            this(returnCode, currentTime, table, Collections.<byte[]> 
emptyList());
         }
 
+        public MetaDataMutationResult(MutationCode returnCode, long 
currentTime, List<PFunction> functions, boolean wasUpdated) {
+            this.returnCode = returnCode;
+            this.mutationTime = currentTime;
+            this.functions = functions;
+            this.wasUpdated = wasUpdated;
+         }
+
         // For testing, so that connectionless can set wasUpdated so 
ColumnResolver doesn't complain
         public MetaDataMutationResult(MutationCode returnCode, long 
currentTime, PTable table, boolean wasUpdated) {
             this(returnCode, currentTime, table, Collections.<byte[]> 
emptyList());
             this.wasUpdated = wasUpdated;
          }
-
+        
         public MetaDataMutationResult(MutationCode returnCode, long 
currentTime, PTable table, List<byte[]> tableNamesToDelete) {
             this.returnCode = returnCode;
             this.mutationTime = currentTime;
@@ -145,6 +160,10 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
         public void setTable(PTable table) {
             this.table = table;
         }
+        
+        public void setFunction(PFunction function) {
+            this.functions.add(function);
+        }
 
         public List<byte[]> getTableNamesToDelete() {
             return tableNamesToDelete;
@@ -158,6 +177,10 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
             return familyName;
         }
 
+        public List<PFunction> getFunctions() {
+            return functions;
+        }
+
         public static MetaDataMutationResult 
constructFromProto(MetaDataResponse proto) {
           MetaDataMutationResult result = new MetaDataMutationResult();
           result.returnCode = 
MutationCode.values()[proto.getReturnCode().ordinal()];
@@ -166,6 +189,11 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
             result.wasUpdated = true;
             result.table = PTableImpl.createFromProto(proto.getTable());
           }
+          if (proto.getFunctionCount() > 0) {
+              result.wasUpdated = true;
+              for(PFunctionProtos.PFunction function: proto.getFunctionList())
+              result.functions.add(PFunction.createFromProto(function));
+          }
           if (proto.getTablesToDeleteCount() > 0) {
             result.tableNamesToDelete =
                 
Lists.newArrayListWithExpectedSize(proto.getTablesToDeleteCount());

Reply via email to