http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index af6c712..78f54e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -63,6 +63,7 @@ import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.TableNode; import org.apache.phoenix.parse.TableNodeVisitor; import org.apache.phoenix.parse.TableWildcardParseNode; +import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; @@ -688,7 +689,7 @@ public class JoinCompiler { if (isSubselect()) return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias()); - return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList()); + return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList(), select.getUdfParseNodes()); } public boolean hasFilters() { @@ -1177,7 +1178,7 @@ public class JoinCompiler { TableRef tableRef = table.getTableRef(); List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null; List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null; - SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence()); + SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes()); QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt); if (!plan.getTableRef().equals(tableRef)) { replacement.put(tableRef, plan.getTableRef()); @@ -1247,7 +1248,7 @@ public class JoinCompiler { } private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy, - List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence) { + List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) { String schemaName = tableRef.getTable().getSchemaName().getString(); TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString()); List<AliasedNode> selectList = new ArrayList<AliasedNode>(); @@ -1267,7 +1268,7 @@ public class JoinCompiler { String tableAlias = tableRef.getTableAlias(); TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols); - return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList()); + return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList(), udfParseNodes); } public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 0c586f0..fcbeb7e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -34,6 +34,7 @@ import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -123,6 +124,11 @@ public class PostDDLCompiler { public List<TableRef> getTables() { return Collections.singletonList(tableRef); } + + public java.util.List<PFunction> getFunctions() { + return Collections.emptyList(); + }; + @Override public TableRef resolveTable(String schemaName, String tableName) throws SQLException { @@ -135,6 +141,14 @@ public class PostDDLCompiler { : tableRef.getTable().getColumn(colName); return new ColumnRef(tableRef, column.getPosition()); } + + public PFunction resolveFunction(String functionName) throws SQLException { + throw new UnsupportedOperationException(); + }; + + public boolean hasUDFs() { + return false; + }; }; PhoenixStatement statement = new PhoenixStatement(connection); StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index e84ca2a..c39db09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -458,7 +458,7 @@ public class ProjectionCompiler { projectColumnFamily(table, scan, family); } } - return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue); + return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue, resolver.hasUDFs()); } private static void projectAllColumnFamilies(PTable table, Scan scan) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 3100664..e877e03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Set; @@ -62,6 +63,7 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.SubqueryParseNode; import org.apache.phoenix.parse.TableNode; +import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -240,13 +242,13 @@ public class QueryCompiler { context.setCurrentTable(table.getTableRef()); PTable projectedTable = table.createProjectedTable(!projectPKColumns, context); TupleProjector.serializeProjectorIntoScan(context.getScan(), new TupleProjector(projectedTable)); - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes())); table.projectColumns(context.getScan()); return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery); } QueryPlan plan = compileSubquery(subquery, false); PTable projectedTable = table.createProjectedTable(plan.getProjector()); - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes())); return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), table.compilePostFilterExpression(context)); } @@ -295,7 +297,7 @@ public class QueryCompiler { } else { tables[i] = null; } - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes())); joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true); joinExpressions[i] = joinConditions.getFirst(); @@ -354,7 +356,7 @@ public class QueryCompiler { tupleProjector = new TupleProjector(plan.getProjector()); } context.setCurrentTable(rhsTableRef); - context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(rhsProjTable, context.getConnection(), rhs.getUdfParseNodes())); ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)}; Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true); List<Expression> joinExpressions = joinConditions.getSecond(); @@ -364,7 +366,7 @@ public class QueryCompiler { int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0; PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable; TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes())); QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right); Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable); Integer limit = null; @@ -413,7 +415,7 @@ public class QueryCompiler { int fieldPosition = needsMerge ? lhsProjTable.getColumns().size() - lhsProjTable.getPKColumns().size() : 0; PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(lhsProjTable, rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable; - ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable); + ColumnResolver resolver = FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), new HashMap<String,UDFParseNode>(1)); TableRef tableRef = resolver.getTables().get(0); StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement)); subCtx.setCurrentTable(tableRef); @@ -422,7 +424,7 @@ public class QueryCompiler { context.setResolver(resolver); TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString())); ParseNode where = joinTable.getPostFiltersCombined(); - SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList()) + SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes()) : NODE_FACTORY.select(joinTable.getStatement(), from, where); return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder); @@ -505,7 +507,7 @@ public class QueryCompiler { if (this.projectTuples) { projectedTable = TupleProjectionCompiler.createProjectedTable(select, context); if (projectedTable != null) { - context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable)); + context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes())); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java index 1b35e92..c60933e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java @@ -52,9 +52,10 @@ public class RowProjector { private final int estimatedSize; private final boolean isProjectEmptyKeyValue; private final boolean cloneRequired; + private final boolean hasUDFs; public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) { - this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue); + this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue, projector.hasUDFs); } /** * Construct RowProjector based on a list of ColumnProjectors. @@ -64,6 +65,18 @@ public class RowProjector { * @param estimatedRowSize */ public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) { + this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, false); + } + /** + * Construct RowProjector based on a list of ColumnProjectors. + * @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause + * aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may + * be null. + * @param estimatedRowSize + * @param isProjectEmptyKeyValue + * @param hasUDFs + */ + public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue, boolean hasUDFs) { this.columnProjectors = Collections.unmodifiableList(columnProjectors); int position = columnProjectors.size(); reverseIndex = ArrayListMultimap.<String, Integer>create(); @@ -82,15 +95,18 @@ public class RowProjector { this.someCaseSensitive = someCaseSensitive; this.estimatedSize = estimatedRowSize; this.isProjectEmptyKeyValue = isProjectEmptyKeyValue; + this.hasUDFs = hasUDFs; boolean hasPerInvocationExpression = false; - for (int i = 0; i < this.columnProjectors.size(); i++) { - Expression expression = this.columnProjectors.get(i).getExpression(); - if (expression.getDeterminism() == Determinism.PER_INVOCATION) { - hasPerInvocationExpression = true; - break; + if (!hasUDFs) { + for (int i = 0; i < this.columnProjectors.size(); i++) { + Expression expression = this.columnProjectors.get(i).getExpression(); + if (expression.getDeterminism() == Determinism.PER_INVOCATION) { + hasPerInvocationExpression = true; + break; + } } } - this.cloneRequired = hasPerInvocationExpression; + this.cloneRequired = hasPerInvocationExpression || hasUDFs; } public RowProjector cloneIfNecessary() { @@ -114,7 +130,7 @@ public class RowProjector { } return new RowProjector(clonedColProjectors, this.getEstimatedRowByteSize(), - this.isProjectEmptyKeyValue()); + this.isProjectEmptyKeyValue(), this.hasUDFs); } public boolean isProjectEmptyKeyValue() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java index b9897b1..9b54c86 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java @@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ComparisonParseNode; import org.apache.phoenix.parse.DerivedTableNode; import org.apache.phoenix.parse.FamilyWildcardParseNode; import org.apache.phoenix.parse.JoinTableNode; +import org.apache.phoenix.parse.NamedNode; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.parse.LessThanOrEqualParseNode; import org.apache.phoenix.parse.NamedTableNode; @@ -99,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter { if (selectNodes != normSelectNodes) { statement = NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), normSelectNodes, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), - statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects()); + statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes()); } } @@ -151,7 +152,7 @@ public class StatementNormalizer extends ParseNodeRewriter { } return super.visitLeave(node, nodes); } - + @Override public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> nodes) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java index 1746d8a..123cb6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java @@ -341,7 +341,7 @@ public class SubqueryRewriter extends ParseNodeRewriter { groupbyNodes.set(i - 1, aliasedNode.getNode()); } SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true); - subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList()); + subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList(), subquery.getUdfParseNodes()); } ParseNode onNode = conditionExtractor.getJoinCondition(); @@ -364,7 +364,7 @@ public class SubqueryRewriter extends ParseNodeRewriter { return select; // Wrap as a derived table. - return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList()); + return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), select.getUdfParseNodes()); } private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java index 6862802..5a91a17 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java @@ -205,7 +205,7 @@ public class SubselectRewriter extends ParseNodeRewriter { } return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, - havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects()); + havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), select.getUdfParseNodes()); } private SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index fab1ad0..cd10007 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -51,6 +51,17 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTE import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES; import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; import static org.apache.phoenix.util.SchemaUtil.getVarChars; @@ -61,6 +72,8 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -91,22 +104,29 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr; +import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -115,7 +135,10 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.metrics.Metrics; +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PFunction.FunctionArgument; import org.apache.phoenix.protobuf.ProtobufUtil; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; @@ -123,6 +146,7 @@ import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; @@ -149,6 +173,7 @@ import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; @@ -156,6 +181,7 @@ import org.slf4j.LoggerFactory; import com.google.common.cache.Cache; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -220,6 +246,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); } + private static final int TABLE_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV); private static final int TABLE_SEQ_NUM_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV); private static final int COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV); @@ -277,6 +304,52 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int LINK_TYPE_INDEX = 0; + private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES); + private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES); + private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES); + private static final KeyValue NUM_ARGS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NUM_ARGS_BYTES); + private static final KeyValue TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TYPE_BYTES); + private static final KeyValue IS_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_CONSTANT_BYTES); + private static final KeyValue DEFAULT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_VALUE_BYTES); + private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MIN_VALUE_BYTES); + private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES); + private static final KeyValue IS_ARRAY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES); + + private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList( + EMPTY_KEYVALUE_KV, + CLASS_NAME_KV, + JAR_PATH_KV, + RETURN_TYPE_KV, + NUM_ARGS_KV + ); + static { + Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR); + } + + private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV); + private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV); + private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV); + private static final int NUM_ARGS_INDEX = FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV); + + private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = Arrays.<KeyValue>asList( + TYPE_KV, + IS_ARRAY_KV, + IS_CONSTANT_KV, + DEFAULT_VALUE_KV, + MIN_VALUE_KV, + MAX_VALUE_KV + ); + static { + Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR); + } + + private static final int TYPE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(TYPE_KV); + private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV); + private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV); + private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV); + private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV); + private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV); + private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) { if (keyLength <= 0) { return null; @@ -368,9 +441,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); RegionScanner scanner = region.getScanner(scan); - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); try { - PTable oldTable = metaDataCache.getIfPresent(cacheKey); + PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey); long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp(); PTable newTable; newTable = getTable(scanner, clientTimeStamp, tableTimeStamp); @@ -393,6 +466,48 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } + private List<PFunction> buildFunctions(List<byte[]> keys, HRegion region, + long clientTimeStamp) throws IOException, SQLException { + List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size()); + for (byte[] key : keys) { + byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY); + ByteUtil.nextKey(stopKey, stopKey.length); + keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false)); + } + Scan scan = new Scan(); + scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp); + ScanRanges scanRanges = + ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, + Collections.singletonList(keyRanges), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + + RegionScanner scanner = region.getScanner(scan); + + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + List<PFunction> functions = new ArrayList<PFunction>(); + PFunction function = null; + try { + for(int i = 0; i< keys.size(); i++) { + function = null; + function = getFunction(scanner); + if (function == null) { + return null; + } + byte[] functionKey = + SchemaUtil.getFunctionKey( + function.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : function + .getTenantId().getBytes(), Bytes.toBytes(function + .getFunctionName())); + metaDataCache.put(new FunctionBytesPtr(functionKey), function); + functions.add(function); + } + return functions; + } finally { + scanner.close(); + } + } + private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException { byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes()); PTable indexTable = doGetTable(key, clientTimeStamp); @@ -473,6 +588,61 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr); columns.add(column); } + + private void addArgumentToFunction(List<Cell> results, PName functionName, PName type, + Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) { + int i = 0; + int j = 0; + while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) { + Cell kv = results.get(i); + Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j); + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), searchKv.getQualifierArray(), + searchKv.getQualifierOffset(), searchKv.getQualifierLength()); + if (cmp == 0) { + functionKeyValues[j++] = kv; + i++; + } else if (cmp > 0) { + functionKeyValues[j++] = null; + } else { + i++; // shouldn't happen - means unexpected KV in system table column row + } + } + + Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX]; + boolean isArrayType = + isArrayKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( + isArrayKv.getValueArray(), isArrayKv.getValueOffset(), + isArrayKv.getValueLength())); + Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX]; + boolean isConstant = + isConstantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( + isConstantKv.getValueArray(), isConstantKv.getValueOffset(), + isConstantKv.getValueLength())); + Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX]; + String defaultValue = + defaultValueKv == null ? null : (String) PVarchar.INSTANCE.toObject( + defaultValueKv.getValueArray(), defaultValueKv.getValueOffset(), + defaultValueKv.getValueLength()); + Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX]; + String minValue = + minValueKv == null ? null : (String) PVarchar.INSTANCE.toObject( + minValueKv.getValueArray(), minValueKv.getValueOffset(), + minValueKv.getValueLength()); + Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX]; + String maxValue = + maxValueKv == null ? null : (String) PVarchar.INSTANCE.toObject( + maxValueKv.getValueArray(), maxValueKv.getValueOffset(), + maxValueKv.getValueLength()); + FunctionArgument arg = + new FunctionArgument(type.getString(), isArrayType, isConstant, + defaultValue == null ? null : LiteralExpression.newConstant(defaultValue), + minValue == null ? null : LiteralExpression.newConstant(minValue), + maxValue == null ? null : LiteralExpression.newConstant(maxValue), + argPosition); + arguments.add(arg); + } private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) throws IOException, SQLException { @@ -646,6 +816,106 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats); } + private PFunction getFunction(RegionScanner scanner) + throws IOException, SQLException { + List<Cell> results = Lists.newArrayList(); + scanner.next(results); + if (results.isEmpty()) { + return null; + } + Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()]; + Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()]; + + // Create PFunction based on KeyValues from scan + Cell keyValue = results.get(0); + byte[] keyBuffer = keyValue.getRowArray(); + int keyLength = keyValue.getRowLength(); + int keyOffset = keyValue.getRowOffset(); + PName tenantId = newPName(keyBuffer, keyOffset, keyLength); + int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length; + if (tenantIdLength == 0) { + tenantId = null; + } + PName functionName = + newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1); + int functionNameLength = functionName.getBytes().length+1; + int offset = tenantIdLength + functionNameLength + 1; + + long timeStamp = keyValue.getTimestamp(); + + int i = 0; + int j = 0; + while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) { + Cell kv = results.get(i); + Cell searchKv = FUNCTION_KV_COLUMNS.get(j); + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), searchKv.getQualifierArray(), + searchKv.getQualifierOffset(), searchKv.getQualifierLength()); + if (cmp == 0) { + timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table + // header row + functionKeyValues[j++] = kv; + i++; + } else if (cmp > 0) { + timeStamp = Math.max(timeStamp, kv.getTimestamp()); + functionKeyValues[j++] = null; + } else { + i++; // shouldn't happen - means unexpected KV in system table header row + } + } + // CLASS_NAME,NUM_ARGS and JAR_PATH are required. + if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) { + throw new IllegalStateException( + "Didn't find expected key values for function row in metadata row"); + } + + Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX]; + PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(), + classNameKv.getValueLength()); + Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX]; + PName jarPath = null; + if(jarPathKv != null) { + jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(), + jarPathKv.getValueLength()); + } + Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX]; + int numArgs = + PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(), + numArgsKv.getValueOffset(), SortOrder.getDefault()); + Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX]; + PName returnType = + returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(), + returnTypeKv.getValueOffset(), returnTypeKv.getValueLength()); + + List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs); + for (int k = 0; k < numArgs; k++) { + results.clear(); + scanner.next(results); + if (results.isEmpty()) { + break; + } + Cell typeKv = results.get(0); + int typeKeyLength = typeKv.getRowLength(); + PName typeName = + newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength + - offset - 3); + + int argPositionOffset = offset + typeName.getBytes().length + 1; + short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength + - argPositionOffset); + addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition); + } + Collections.sort(arguments, new Comparator<FunctionArgument>() { + @Override + public int compare(FunctionArgument o1, FunctionArgument o2) { + return o1.getArgPosition() - o2.getArgPosition(); + } + }); + return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(), + className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp); + } + private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { @@ -663,7 +933,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) { Cell kv = results.get(0); if (kv.getTypeByte() == Type.Delete.getCode()) { - Cache<ImmutableBytesPtr, PTable> metaDataCache = + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable table = newDeletedTableMarker(kv.getTimestamp()); metaDataCache.put(cacheKey, table); @@ -673,20 +943,57 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return null; } + + private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, + long clientTimeStamp) throws IOException { + if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { + return null; + } + + Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setRaw(true); + List<Cell> results = Lists.<Cell> newArrayList(); + try (RegionScanner scanner = region.getScanner(scan);) { + scanner.next(results); + } + // HBase ignores the time range on a raw scan (HBASE-7362) + if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) { + Cell kv = results.get(0); + if (kv.getTypeByte() == Type.Delete.getCode()) { + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = + GlobalCache.getInstance(this.env).getMetaDataCache(); + PFunction function = newDeletedFunctionMarker(kv.getTimestamp()); + metaDataCache.put(cacheKey, function); + return function; + } + } + return null; + } + + private static PTable newDeletedTableMarker(long timestamp) { return new PTableImpl(timestamp); } + private static PFunction newDeletedFunctionMarker(long timestamp) { + return new PFunction(timestamp); + } + private static boolean isTableDeleted(PTable table) { return table.getName() == null; } + private static boolean isFunctionDeleted(PFunction function) { + return function.getFunctionName() == null; + } + private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException { HRegion region = env.getRegion(); - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - PTable table = metaDataCache.getIfPresent(cacheKey); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) { return table; @@ -700,6 +1007,29 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return null; } + private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key, + ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp) + throws IOException, SQLException { + HRegion region = env.getRegion(); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey); + // We always cache the latest version - fault in if not in cache + if (function != null) { + return function; + } + ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1); + arrayList.add(key); + List<PFunction> functions = buildFunctions(arrayList, region, asOfTimeStamp); + if(functions != null) return functions.get(0); + // if not found then check if newer table already exists and add delete marker for timestamp + // found + if (function == null + && (function = buildDeletedFunction(key, cacheKey, region, clientTimeStamp)) != null) { + return function; + } + return null; + } + @Override public void createTable(RpcController controller, CreateTableRequest request, @@ -801,7 +1131,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); if (parentCacheKey != null) { metaDataCache.invalidate(parentCacheKey); } @@ -950,7 +1280,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); // Commit the list of deletion. region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata); @@ -984,8 +1314,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso HRegion region = env.getRegion(); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - PTable table = metaDataCache.getIfPresent(cacheKey); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache if (table != null @@ -1132,8 +1462,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(); invalidateList.add(cacheKey); - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - PTable table = metaDataCache.getIfPresent(cacheKey); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); if (logger.isDebugEnabled()) { if (table == null) { logger.debug("Table " + Bytes.toStringBinary(key) @@ -1299,9 +1629,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException { ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); - Cache<ImmutableBytesPtr, PTable> metaDataCache = + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); - PTable table = metaDataCache.getIfPresent(cacheKey); + PTable table = (PTable)metaDataCache.getIfPresent(cacheKey); // We only cache the latest, so we'll end up building the table with every call if the // client connection has specified an SCN. // TODO: If we indicate to the client that we're returning an older version, but there's a @@ -1333,7 +1663,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } try { // Try cache again in case we were waiting on a lock - table = metaDataCache.getIfPresent(cacheKey); + table = (PTable)metaDataCache.getIfPresent(cacheKey); // We only cache the latest, so we'll end up building the table with every call if the // client connection has specified an SCN. // TODO: If we indicate to the client that we're returning an older version, but there's @@ -1358,6 +1688,64 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } + private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException { + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = + GlobalCache.getInstance(this.env).getMetaDataCache(); + HRegion region = env.getRegion(); + Collections.sort(keys, new Comparator<byte[]>() { + @Override + public int compare(byte[] o1, byte[] o2) { + return Bytes.compareTo(o1, o2); + } + }); + /* + * Lock directly on key, though it may be an index table. This will just prevent a table + * from getting rebuilt too often. + */ + List<RowLock> rowLocks = new ArrayList<HRegion.RowLock>(keys.size());; + try { + rowLocks = new ArrayList<HRegion.RowLock>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + HRegion.RowLock rowLock = region.getRowLock(keys.get(i)); + if (rowLock == null) { + throw new IOException("Failed to acquire lock on " + + Bytes.toStringBinary(keys.get(i))); + } + rowLocks.add(rowLock); + } + + List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size()); + int numFunctions = keys.size(); + Iterator<byte[]> iterator = keys.iterator(); + while(iterator.hasNext()) { + byte[] key = iterator.next(); + PFunction function = (PFunction)metaDataCache.getIfPresent(new FunctionBytesPtr(key)); + if (function != null && function.getTimeStamp() < clientTimeStamp) { + if (isFunctionDeleted(function)) { + return null; + } + functionsAvailable.add(function); + iterator.remove(); + } + } + if(functionsAvailable.size() == numFunctions) return functionsAvailable; + + // Query for the latest table first, since it's not cached + List<PFunction> buildFunctions = buildFunctions(keys, region, clientTimeStamp); + if(buildFunctions == null || buildFunctions.isEmpty()) { + return null; + } + functionsAvailable.addAll(buildFunctions); + if(functionsAvailable.size() == numFunctions) return functionsAvailable; + return null; + } finally { + for (HRegion.RowLock lock : rowLocks) { + lock.release(); + } + rowLocks.clear(); + } + } + @Override public void dropColumn(RpcController controller, DropColumnRequest request, RpcCallback<MetaDataResponse> done) { @@ -1478,7 +1866,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso public void clearCache(RpcController controller, ClearCacheRequest request, RpcCallback<ClearCacheResponse> done) { GlobalCache cache = GlobalCache.getInstance(this.env); - Cache<ImmutableBytesPtr, PTable> metaDataCache = + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); metaDataCache.invalidateAll(); cache.clearTenantCache(); @@ -1635,7 +2023,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); // Invalidate from cache - Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); metaDataCache.invalidate(cacheKey); if(dataTableKey != null) { metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey)); @@ -1670,6 +2058,18 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } + private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) { + byte[] startKey = region.getStartKey(); + byte[] endKey = region.getEndKey(); + if (Bytes.compareTo(startKey, key) <= 0 + && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, + endKey) < 0)) { + return null; // normal case; + } + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_IN_REGION, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + /** * Certain operations, such as DROP TABLE are not allowed if there a table has child views. This class wraps the * Results of a scanning the Phoenix Metadata for child views for a specific table and stores an additional flag for @@ -1720,7 +2120,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] tenantId = request.getTenantId().toByteArray(); byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); - Cache<ImmutableBytesPtr, PTable> metaDataCache = + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); metaDataCache.invalidate(cacheKey); } catch (Throwable t) { @@ -1729,5 +2129,222 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t)); } } + @Override + public void getFunctions(RpcController controller, GetFunctionsRequest request, + RpcCallback<MetaDataResponse> done) { + MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); + byte[] tenantId = request.getTenantId().toByteArray(); + List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount()); + try { + HRegion region = env.getRegion(); + List<ByteString> functionNamesList = request.getFunctionNamesList(); + List<Long> functionTimestampsList = request.getFunctionTimestampsList(); + List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount()); + List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], Long>>(request.getFunctionNamesCount()); + for(int i = 0; i< functionNamesList.size();i++) { + byte[] functionName = functionNamesList.get(i).toByteArray(); + functionNames.add(Bytes.toString(functionName)); + byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName); + MetaDataMutationResult result = checkFunctionKeyInRegion(key, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + functions.add(new Pair<byte[], Long>(functionName,functionTimestampsList.get(i))); + keys.add(key); + } + + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + List<PFunction> functionsAvailable = doGetFunctions(keys, request.getClientTimestamp()); + if (functionsAvailable == null) { + builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND); + builder.setMutationTime(currentTime); + done.run(builder.build()); + return; + } + builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS); + builder.setMutationTime(currentTime); + + for (PFunction function : functionsAvailable) { + builder.addFunction(PFunction.toProto(function)); + } + done.run(builder.build()); + return; + } catch (Throwable t) { + logger.error("getFunctions failed", t); + ProtobufUtil.setControllerException(controller, + ServerUtil.createIOException(functionNames.toString(), t)); + } + } + @Override + public void createFunction(RpcController controller, CreateFunctionRequest request, + RpcCallback<MetaDataResponse> done) { + MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); + byte[][] rowKeyMetaData = new byte[2][]; + byte[] functionName = null; + try { + List<Mutation> functionMetaData = ProtobufUtil.getMutations(request); + boolean temporaryFunction = request.getTemporary(); + MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData); + byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; + functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; + byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName); + HRegion region = env.getRegion(); + MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + List<RowLock> locks = Lists.newArrayList(); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData); + try { + acquireLock(region, lockKey, locks); + // Get as of latest timestamp so we can detect if we have a newer function that already + // exists without making an additional query + ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey); + PFunction function = + loadFunction(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp); + if (function != null) { + if (function.getTimeStamp() < clientTimeStamp) { + // If the function is older than the client time stamp and it's deleted, + // continue + if (!isFunctionDeleted(function)) { + builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + builder.addFunction(PFunction.toProto(function)); + done.run(builder.build()); + return; + } + } else { + builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + builder.addFunction(PFunction.toProto(function)); + done.run(builder.build()); + return; + } + } + // Don't store function info for temporary functions. + if(!temporaryFunction) { + region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet()); + } + + // Invalidate the cache - the next getFunction call will add it + // TODO: consider loading the function that was just created here, patching up the parent function, and updating the cache + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + metaDataCache.invalidate(cacheKey); + // Get timeStamp from mutations - the above method sets it if it's unset + long currentTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData); + builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND); + builder.setMutationTime(currentTimeStamp); + done.run(builder.build()); + return; + } finally { + region.releaseRowLocks(locks); + } + } catch (Throwable t) { + logger.error("createFunction failed", t); + ProtobufUtil.setControllerException(controller, + ServerUtil.createIOException(Bytes.toString(functionName), t)); + } + } + + @Override + public void dropFunction(RpcController controller, DropFunctionRequest request, + RpcCallback<MetaDataResponse> done) { + byte[][] rowKeyMetaData = new byte[2][]; + byte[] functionName = null; + try { + List<Mutation> functionMetaData = ProtobufUtil.getMutations(request); + MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData); + byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX]; + functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX]; + byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName); + HRegion region = env.getRegion(); + MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + List<RowLock> locks = Lists.newArrayList(); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData); + try { + acquireLock(region, lockKey, locks); + ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey); + List<byte[]> keys = new ArrayList<byte[]>(1); + keys.add(lockKey); + List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(); + + result = doDropFunction(clientTimeStamp, keys, functionMetaData, invalidateList); + if (result.getMutationCode() != MutationCode.FUNCTION_ALREADY_EXISTS) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + region.mutateRowsWithLocks(functionMetaData, Collections.<byte[]> emptySet()); + + Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData); + for(ImmutableBytesPtr ptr: invalidateList) { + metaDataCache.invalidate(ptr); + metaDataCache.put(ptr, newDeletedFunctionMarker(currentTime)); + + } + + done.run(MetaDataMutationResult.toProto(result)); + return; + } finally { + region.releaseRowLocks(locks); + } + } catch (Throwable t) { + logger.error("dropFunction failed", t); + ProtobufUtil.setControllerException(controller, + ServerUtil.createIOException(Bytes.toString(functionName), t)); + } + } + + private MetaDataMutationResult doDropFunction(long clientTimeStamp, List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> invalidateList) + throws IOException, SQLException { + List<byte[]> keysClone = new ArrayList<byte[]>(keys); + List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp); + // We didn't find a table at the latest timestamp, so either there is no table or + // there was a table, but it's been deleted. In either case we want to return. + if (functions == null || functions.isEmpty()) { + if (buildDeletedFunction(keys.get(0), new FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) { + return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null); + } + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); + } + + if (functions != null && !functions.isEmpty()) { + if (functions.get(0).getTimeStamp() < clientTimeStamp) { + // If the function is older than the client time stamp and it's deleted, + // continue + if (isFunctionDeleted(functions.get(0))) { + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + invalidateList.add(new FunctionBytesPtr(keys.get(0))); + HRegion region = env.getRegion(); + Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp); + List<Cell> results = Lists.newArrayList(); + try (RegionScanner scanner = region.getScanner(scan);) { + scanner.next(results); + if (results.isEmpty()) { // Should not be possible + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); + } + do { + Cell kv = results.get(0); + Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp); + functionMetaData.add(delete); + results.clear(); + scanner.next(results); + } while (!results.isEmpty()); + } + return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, + EnvironmentEdgeManager.currentTimeMillis(), functions, true); + } + } + return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, + EnvironmentEdgeManager.currentTimeMillis(), null); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/66bd3e35/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 3ef6e80..2cca4bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -17,13 +17,16 @@ */ package org.apache.phoenix.coprocessor; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; +import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; @@ -87,6 +90,10 @@ public abstract class MetaDataProtocol extends MetaDataService { UNALLOWED_TABLE_MUTATION, NO_PK_COLUMNS, PARENT_TABLE_NOT_FOUND, + FUNCTION_ALREADY_EXISTS, + FUNCTION_NOT_FOUND, + NEWER_FUNCTION_FOUND, + FUNCTION_NOT_IN_REGION, NO_OP }; @@ -98,6 +105,7 @@ public abstract class MetaDataProtocol extends MetaDataService { private byte[] columnName; private byte[] familyName; private boolean wasUpdated; + private List<PFunction> functions = new ArrayList<PFunction>(1); public MetaDataMutationResult() { } @@ -114,12 +122,19 @@ public abstract class MetaDataProtocol extends MetaDataService { this(returnCode, currentTime, table, Collections.<byte[]> emptyList()); } + public MetaDataMutationResult(MutationCode returnCode, long currentTime, List<PFunction> functions, boolean wasUpdated) { + this.returnCode = returnCode; + this.mutationTime = currentTime; + this.functions = functions; + this.wasUpdated = wasUpdated; + } + // For testing, so that connectionless can set wasUpdated so ColumnResolver doesn't complain public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, boolean wasUpdated) { this(returnCode, currentTime, table, Collections.<byte[]> emptyList()); this.wasUpdated = wasUpdated; } - + public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete) { this.returnCode = returnCode; this.mutationTime = currentTime; @@ -146,6 +161,10 @@ public abstract class MetaDataProtocol extends MetaDataService { public void setTable(PTable table) { this.table = table; } + + public void setFunction(PFunction function) { + this.functions.add(function); + } public List<byte[]> getTableNamesToDelete() { return tableNamesToDelete; @@ -159,6 +178,10 @@ public abstract class MetaDataProtocol extends MetaDataService { return familyName; } + public List<PFunction> getFunctions() { + return functions; + } + public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) { MetaDataMutationResult result = new MetaDataMutationResult(); result.returnCode = MutationCode.values()[proto.getReturnCode().ordinal()]; @@ -167,6 +190,11 @@ public abstract class MetaDataProtocol extends MetaDataService { result.wasUpdated = true; result.table = PTableImpl.createFromProto(proto.getTable()); } + if (proto.getFunctionCount() > 0) { + result.wasUpdated = true; + for(PFunctionProtos.PFunction function: proto.getFunctionList()) + result.functions.add(PFunction.createFromProto(function)); + } if (proto.getTablesToDeleteCount() > 0) { result.tableNamesToDelete = Lists.newArrayListWithExpectedSize(proto.getTablesToDeleteCount());
