http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java new file mode 100644 index 0000000..5a337c7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Objects; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + +/** + * Metadata that is specific to a Projection. + */ +@Immutable +@DefaultAnnotation(NonNull.class) +public class ProjectionMetadata extends CommonNodeMetadata { + + private final String childNodeId; + private final String parentNodeId; + private final VariableOrder projectedVars; + + /** + * Constructs an instance of {@link ProjectionMetadata}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @param varOrder - The order in which binding values are written in the row to identify this result. (not null) + * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null) + * @param parentNodeId - The parent node of this projection (not null) + * @param projectedVars - The variables that the results are projected onto (not null) + */ + public ProjectionMetadata( + final String nodeId, + final VariableOrder varOrder, + final String childNodeId, + final String parentNodeId, + final VariableOrder projectedVars) { + super(nodeId, varOrder); + this.childNodeId = checkNotNull(childNodeId); + this.parentNodeId = checkNotNull(parentNodeId); + this.projectedVars = checkNotNull(projectedVars); + } + + /** + * @return The node whose results are projected to the query's SELECT variables.l + */ + public String getChildNodeId() { + return childNodeId; + } + + /** + * @return The parent node of this projection node + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * @return The variables that results are projected onto + */ + public VariableOrder getProjectedVars() { + return projectedVars; + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.getNodeId(), + super.getVariableOrder(), + projectedVars, + childNodeId, + parentNodeId); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + + if(o instanceof ProjectionMetadata) { + if(super.equals(o)) { + final ProjectionMetadata projectionMetadata = (ProjectionMetadata)o; + return new EqualsBuilder() + .append(childNodeId, projectionMetadata.childNodeId) + .append(parentNodeId, projectionMetadata.parentNodeId) + .append(projectedVars, projectionMetadata.projectedVars) + .isEquals(); + } + return false; + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("ProjectionMetadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Projection Variables: " + projectedVars + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Child Node ID: " + childNodeId + "\n") + .append(" Parent Node ID: " + parentNodeId + "\n") + .append("}") + .toString(); + } + + /** + * Creates a new {@link Builder} for this class. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + * @return A new {@link Builder} for this class. + */ + public static Builder builder(final String nodeId) { + return new Builder(nodeId); + } + + /** + * Builds instances of {@link ProjectionMetadata}. + */ + @DefaultAnnotation(NonNull.class) + public static final class Builder implements CommonNodeMetadata.Builder { + + private String nodeId; + private VariableOrder varOrder; + private String childNodeId; + private String parentNodeId; + private VariableOrder projectedVars; + + /** + * Constructs an instance of {@link Builder}. + * + * @param nodeId - The ID the Fluo app uses to reference this node. (not null) + */ + public Builder(final String nodeId) { + this.nodeId = checkNotNull(nodeId); + } + + public String getNodeId() { + return nodeId; + } + + /** + * Set the variable order of binding sets that are emitted by this node. + * + * @param varOrder - The order in which result values are written to the row to identify this result + * @return This builder so that method invocations may be chained. + */ + public Builder setVarOrder(@Nullable final VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * @return the variable order of binding sets that are emitted by this node + */ + public VariableOrder getVariableOrder() { + return varOrder; + } + + /** + * Set the node whose results are projected to the query's SELECT variables. + * + * @param childNodeId - The node whose results are projected to the query's SELECT variables. + * @return This builder so that method invocations may be chained. + */ + public Builder setChildNodeId(@Nullable final String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + public String getChildNodeId() { + return childNodeId; + } + + /** + * Set the the parent node of this projection node. + * + * @param parentNodeId - The parent node of this projection node + * @return This builder so that method invocations may be chained. + */ + public Builder setParentNodeId(@Nullable final String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } + + public String getParentNodeId() { + return parentNodeId; + } + + /** + * @param varOrder - Variables that results are projected onto + * @return This builder so that method invocations may be chained. + */ + public Builder setProjectedVars(VariableOrder projectedVars) { + this.projectedVars = projectedVars; + return this; + } + + /** + * @return The variables that results are projected onto + */ + public VariableOrder getProjectionVars() { + return projectedVars; + } + + /** + * @return An instance of {@link ProjectionMetadata} built using this builder's values. + */ + public ProjectionMetadata build() { + return new ProjectionMetadata(nodeId, varOrder, childNodeId, parentNodeId, projectedVars); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java new file mode 100644 index 0000000..b45c56c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorBase.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +/** + * Base visitor class for navigating a {@link FluoQuery.Builder}. + * The visit methods in this class provide the basic functionality + * for navigating between the Builders that make u the FluoQuery.Builder. + * + */ +public abstract class QueryBuilderVisitorBase { + + private FluoQuery.Builder fluoBuilder; + + public QueryBuilderVisitorBase(FluoQuery.Builder fluoBuilder) { + this.fluoBuilder = Preconditions.checkNotNull(fluoBuilder); + } + + public void visit() { + this.visit(fluoBuilder.getQueryBuilder()); + } + + /** + * Visits the {@link FluoQuery.Builder} starting at the Metadata bulder node with the given id + * @param nodeId - id of the node this visitor will start at + */ + public void visit(String nodeId) { + visitNode(nodeId); + } + + public void visit(QueryMetadata.Builder queryBuilder) { + visitNode(queryBuilder.getChildNodeId()); + } + + public void visit(ConstructQueryMetadata.Builder constructBuilder) { + visitNode(constructBuilder.getChildNodeId()); + } + + public void visit(ProjectionMetadata.Builder projectionBuilder) { + visitNode(projectionBuilder.getChildNodeId()); + } + + public void visit(PeriodicQueryMetadata.Builder periodicBuilder) { + visitNode(periodicBuilder.getChildNodeId()); + } + + public void visit(FilterMetadata.Builder filterBuilder) { + visitNode(filterBuilder.getChildNodeId()); + } + + public void visit(JoinMetadata.Builder joinBuilder) { + visitNode(joinBuilder.getLeftChildNodeId()); + visitNode(joinBuilder.getRightChildNodeId()); + } + + public void visit(AggregationMetadata.Builder aggregationBuilder) { + visitNode(aggregationBuilder.getChildNodeId()); + } + + public void visit(StatementPatternMetadata.Builder statementPatternBuilder) {} + + public void visitNode(String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + switch(type.get()) { + case AGGREGATION: + visit(fluoBuilder.getAggregateBuilder(nodeId).get()); + break; + case CONSTRUCT: + visit(fluoBuilder.getConstructQueryBuilder(nodeId).get()); + break; + case FILTER: + visit(fluoBuilder.getFilterBuilder(nodeId).get()); + break; + case JOIN: + visit(fluoBuilder.getJoinBuilder(nodeId).get()); + break; + case PERIODIC_QUERY: + visit(fluoBuilder.getPeriodicQueryBuilder(nodeId).get()); + break; + case PROJECTION: + visit(fluoBuilder.getProjectionBuilder(nodeId).get()); + break; + case QUERY: + visit(fluoBuilder.getQueryBuilder(nodeId).get()); + break; + case STATEMENT_PATTERN: + visit(fluoBuilder.getStatementPatternBuilder(nodeId).get()); + break; + default: + throw new RuntimeException(); + } + } catch(Exception e) { + throw new IllegalArgumentException("Invalid Fluo Query."); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java index d017724..fe130fb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -20,18 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkNotNull; -import edu.umd.cs.findbugs.annotations.Nullable; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; +import java.util.Set; import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Objects; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; + /** - * Metadata that is specific to a Projection. + * Metadata for a query registered with Fluo. This metadata is for the topmost node + * in the {@link FluoQuery}, and it includes information about how to export results + * for the query. */ @Immutable @DefaultAnnotation(NonNull.class) @@ -39,6 +45,9 @@ public class QueryMetadata extends CommonNodeMetadata { private final String sparql; private final String childNodeId; + private final Set<ExportStrategy> exportStrategy; + private final QueryType queryType; + private final String exportId; /** * Constructs an instance of {@link QueryMetadata}. @@ -47,15 +56,29 @@ public class QueryMetadata extends CommonNodeMetadata { * @param varOrder - The variable order of binding sets that are emitted by this node. (not null) * @param sparql - The SPARQL query whose results are being updated by the Fluo app. (not null) * @param childNodeId - The node whose results are projected to the query's SELECT variables. (not null) + * @param exportStrategy - Set of export strategies used for emiting results from Rya-Fluo app */ public QueryMetadata( final String nodeId, final VariableOrder varOrder, final String sparql, - final String childNodeId) { + final String childNodeId, + final Set<ExportStrategy> exportStrategy, + final QueryType queryType) { super(nodeId, varOrder); this.sparql = checkNotNull(sparql); this.childNodeId = checkNotNull(childNodeId); + this.exportStrategy = checkNotNull(exportStrategy); + this.queryType = checkNotNull(queryType); + String[] idSplit = nodeId.split("_"); + if(idSplit.length != 2) { + throw new IllegalArgumentException("Invalid Query Node Id"); + } + this.exportId = idSplit[1]; + } + + public String getExportId() { + return exportId; } /** @@ -71,14 +94,30 @@ public class QueryMetadata extends CommonNodeMetadata { public String getChildNodeId() { return childNodeId; } - + + /** + * @return strategies used for exporting results from Rya-Fluo Application + */ + public Set<ExportStrategy> getExportStrategies() { + return exportStrategy; + } + + /** + * @return the {@link QueryType} of this query + */ + public QueryType getQueryType() { + return queryType; + } + @Override public int hashCode() { return Objects.hashCode( super.getNodeId(), super.getVariableOrder(), sparql, - childNodeId); + childNodeId, + exportStrategy, + queryType); } @Override @@ -93,6 +132,8 @@ public class QueryMetadata extends CommonNodeMetadata { return new EqualsBuilder() .append(sparql, queryMetadata.sparql) .append(childNodeId, queryMetadata.childNodeId) + .append(exportStrategy, queryMetadata.exportStrategy) + .append(queryType, queryMetadata.queryType) .isEquals(); } return false; @@ -109,6 +150,8 @@ public class QueryMetadata extends CommonNodeMetadata { .append(" Variable Order: " + super.getVariableOrder() + "\n") .append(" Child Node ID: " + childNodeId + "\n") .append(" SPARQL: " + sparql + "\n") + .append(" Query Type: " + queryType + "\n") + .append(" Export Strategies: " + exportStrategy + "\n") .append("}") .toString(); } @@ -127,12 +170,14 @@ public class QueryMetadata extends CommonNodeMetadata { * Builds instances of {@link QueryMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder { private String nodeId; private VariableOrder varOrder; private String sparql; private String childNodeId; + private Set<ExportStrategy> exportStrategies; + private QueryType queryType; /** * Constructs an instance of {@link Builder}. @@ -154,7 +199,7 @@ public class QueryMetadata extends CommonNodeMetadata { * @param varOrder - The variable order of binding sets that are emitted by this node. * @return This builder so that method invocations may be chained. */ - public Builder setVariableOrder(@Nullable final VariableOrder varOrder) { + public Builder setVarOrder(@Nullable final VariableOrder varOrder) { this.varOrder = varOrder; return this; } @@ -188,6 +233,37 @@ public class QueryMetadata extends CommonNodeMetadata { return this; } + /** + * Sets export strategies used for emitting results form Rya Fluo app + * @param export - Set of export strategies + * @return This builder so that method invocations may be chained + */ + public Builder setExportStrategies(Set<ExportStrategy> export) { + this.exportStrategies = export; + return this; + } + + /** + * Set query type for the given query + * @param queryType - {@link QueryType} of the given query + * @return This builder so that method invocations may be chained + */ + public Builder setQueryType(QueryType queryType) { + this.queryType = queryType; + return this; + } + + /** + * @return QueryType for the given query + */ + public QueryType getQueryType() { + return queryType; + } + + + /** + * @return id of the child node of this node + */ public String getChildNodeId() { return childNodeId; } @@ -196,7 +272,7 @@ public class QueryMetadata extends CommonNodeMetadata { * @return An instance of {@link QueryMetadata} build using this builder's values. */ public QueryMetadata build() { - return new QueryMetadata(nodeId, varOrder, sparql, childNodeId); + return new QueryMetadata(nodeId, varOrder, sparql, childNodeId, exportStrategies, queryType); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java new file mode 100644 index 0000000..ce9b02c --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorBase.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +public abstract class QueryMetadataVisitorBase { + + private FluoQuery fluoQuery; + + public QueryMetadataVisitorBase(FluoQuery fluoQuery) { + this.fluoQuery = Preconditions.checkNotNull(fluoQuery); + } + + public void visit() { + visit(fluoQuery.getQueryMetadata()); + } + + /** + * Visits the {@link FluoQuery} starting at the Metadata node with the given id + * @param nodeId - id of the node this visitor will start at + */ + public void visit(String nodeId) { + visitNode(nodeId); + } + + public void visit(QueryMetadata queryMetadata) { + visitNode(queryMetadata.getChildNodeId()); + } + + public void visit(ConstructQueryMetadata constructMetadata) { + visitNode(constructMetadata.getChildNodeId()); + } + + public void visit(ProjectionMetadata projectionMetadata) { + visitNode(projectionMetadata.getChildNodeId()); + } + + public void visit(PeriodicQueryMetadata periodicMetadata) { + visitNode(periodicMetadata.getChildNodeId()); + } + + public void visit(FilterMetadata filterMetadata) { + visitNode(filterMetadata.getChildNodeId()); + } + + public void visit(JoinMetadata joinMetadata) { + visitNode(joinMetadata.getLeftChildNodeId()); + visitNode(joinMetadata.getRightChildNodeId()); + } + + public void visit(AggregationMetadata aggregationMetadata) { + visitNode(aggregationMetadata.getChildNodeId()); + } + + public void visit(StatementPatternMetadata statementPatternMetadata) {} + + public void visitNode(String nodeId) { + Optional<NodeType> type = NodeType.fromNodeId(nodeId); + try { + switch(type.get()) { + case AGGREGATION: + visit(fluoQuery.getAggregationMetadata(nodeId).get()); + break; + case CONSTRUCT: + visit(fluoQuery.getConstructQueryMetadata(nodeId).get()); + break; + case FILTER: + visit(fluoQuery.getFilterMetadata(nodeId).get()); + break; + case JOIN: + visit(fluoQuery.getJoinMetadata(nodeId).get()); + break; + case PERIODIC_QUERY: + visit(fluoQuery.getPeriodicQueryMetadata(nodeId).get()); + break; + case PROJECTION: + visit(fluoQuery.getProjectionMetadata(nodeId).get()); + break; + case QUERY: + visit(fluoQuery.getQueryMetadata(nodeId).get()); + break; + case STATEMENT_PATTERN: + visit(fluoQuery.getStatementPatternMetadata(nodeId).get()); + break; + default: + throw new RuntimeException(); + } + } catch(Exception e) { + throw new IllegalArgumentException("Invalid Fluo Query."); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 8e348f2..6c03be1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -25,10 +25,11 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CO import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -42,16 +43,22 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.model.Value; import org.openrdf.model.impl.BNodeImpl; +import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.AggregateOperator; import org.openrdf.query.algebra.BNodeGenerator; import org.openrdf.query.algebra.Extension; @@ -75,6 +82,7 @@ import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -90,32 +98,86 @@ import net.jcip.annotations.Immutable; */ public class SparqlFluoQueryBuilder { + private String sparql; + private TupleExpr te; + private String queryId; + private NodeIds nodeIds; + //Default behavior is to export to Kafka - subject to change when user can + //specify their own export strategy + private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.Kafka)); + + public SparqlFluoQueryBuilder setSparql(String sparql) { + this.sparql = Preconditions.checkNotNull(sparql); + return this; + } + + public SparqlFluoQueryBuilder setTupleExpr(TupleExpr te) { + this.te = Preconditions.checkNotNull(te); + return this; + } + /** - * Creates the {@link FluoQuery} metadata that is required by the Fluo - * application to process a SPARQL query. - * - * @param parsedQuery - The query metadata will be derived from. (not null) - * @param nodeIds - The NodeIds object is passed in so that other parts - * of the application may look up which ID is associated with each - * node of the query. - * @return A {@link FluoQuery} object loaded with metadata built from the - * {@link ParsedQuery}. + * Sets the FluoQuery id as generated by {@link NodeType#generateNewFluoIdForType(NodeType)} or + * {@link NodeType#generateNewIdForType(NodeType, String)}, where NodeType is of type Query. + * @param queryId for the {@link FluoQuery} + * @return SparqlFluoQueryBuilder for chaining method calls */ - public FluoQuery make(final ParsedQuery parsedQuery, final NodeIds nodeIds) { - checkNotNull(parsedQuery); - - final String sparql = parsedQuery.getSourceString(); - final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); - - final NewQueryVisitor visitor = new NewQueryVisitor(sparql, fluoQueryBuilder, nodeIds); - TupleExpr te = parsedQuery.getTupleExpr(); + public SparqlFluoQueryBuilder setFluoQueryId(String queryId) { + this.queryId = Preconditions.checkNotNull(queryId); + return this; + } + + public SparqlFluoQueryBuilder setNodeIds(NodeIds nodeIds) { + this.nodeIds = Preconditions.checkNotNull(nodeIds); + return this; + } + + public SparqlFluoQueryBuilder setExportStrategies(Set<ExportStrategy> exportStrategies) { + this.exportStrategies = exportStrategies; + return this; + } + + public FluoQuery build() { + Preconditions.checkNotNull(sparql); + Preconditions.checkNotNull(queryId); + Preconditions.checkNotNull(exportStrategies); + + if(nodeIds == null) { + nodeIds = new NodeIds(); + } + + if(te == null) { + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq; + try { + pq = parser.parseQuery(sparql, null); + } catch (MalformedQueryException e) { + throw new RuntimeException(e); + } + te = pq.getTupleExpr(); + } + PeriodicQueryUtil.placePeriodicQueryNode(te); + String childNodeId = nodeIds.getOrMakeId(te); + + final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); + QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); + //sets {@link QueryType} and VariableOrder + setVarOrderAndQueryType(queryBuilder, te); + queryBuilder.setSparql(sparql); + queryBuilder.setChildNodeId(childNodeId); + queryBuilder.setExportStrategies(exportStrategies); + fluoQueryBuilder.setQueryMetadata(queryBuilder); + + setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId); + + final NewQueryVisitor visitor = new NewQueryVisitor(fluoQueryBuilder, nodeIds); te.visit( visitor ); - + final FluoQuery fluoQuery = fluoQueryBuilder.build(); return fluoQuery; } - + /** * A data structure that creates and keeps track of Node IDs for the nodes * of a {@link ParsedQuery}. This structure should only be used while creating @@ -187,7 +249,7 @@ public class SparqlFluoQueryBuilder { } else if (node instanceof Join || node instanceof LeftJoin) { prefix = JOIN_PREFIX; } else if (node instanceof Projection) { - prefix = QUERY_PREFIX; + prefix = PROJECTION_PREFIX; } else if(node instanceof Extension) { prefix = AGGREGATION_PREFIX; } else if (node instanceof Reduced) { @@ -214,7 +276,6 @@ public class SparqlFluoQueryBuilder { private final NodeIds nodeIds; private final FluoQuery.Builder fluoQueryBuilder; - private final String sparql; /** * Constructs an instance of {@link NewQueryVisitor}. @@ -227,8 +288,7 @@ public class SparqlFluoQueryBuilder { * of the application may look up which ID is associated with each * node of the query. */ - public NewQueryVisitor(final String sparql, final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) { - this.sparql = checkNotNull(sparql); + public NewQueryVisitor(final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) { this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder); this.nodeIds = checkNotNull(nodeIds); } @@ -256,6 +316,7 @@ public class SparqlFluoQueryBuilder { } else { groupByVariableOrder = new VariableOrder(); } + // The aggregations that need to be performed are the Group Elements. final List<AggregationElement> aggregations = new ArrayList<>(); @@ -289,15 +350,21 @@ public class SparqlFluoQueryBuilder { aggregationBuilder.setChildNodeId(childNodeId); aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder); + + Set<String> aggregationVars = getVarsToDelete(groupByVariableOrder.getVariableOrders(), aggregationBuilder.getVariableOrder().getVariableOrders()); + FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.DeleteVariable, Lists.newArrayList(aggregationVars), aggregationId); + for(final AggregationElement aggregation : aggregations) { aggregationBuilder.addAggregation(aggregation); } + + // Update the child node's metadata. final Set<String> childVars = getVars(child); final VariableOrder childVarOrder = new VariableOrder(childVars); - setChildMetadata(childNodeId, childVarOrder, aggregationId); + setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, aggregationId); } // Walk to the next node. @@ -369,11 +436,11 @@ public class SparqlFluoQueryBuilder { // Create or update the left child's variable order and parent node id. final VariableOrder leftVarOrder = varOrders.getLeftVarOrder(); - setChildMetadata(leftChildNodeId, leftVarOrder, joinNodeId); + setChildMetadata(fluoQueryBuilder, leftChildNodeId, leftVarOrder, joinNodeId); // Create or update the right child's variable order and parent node id. final VariableOrder rightVarOrder = varOrders.getRightVarOrder(); - setChildMetadata(rightChildNodeId, rightVarOrder, joinNodeId); + setChildMetadata(fluoQueryBuilder, rightChildNodeId, rightVarOrder, joinNodeId); } @Override @@ -407,7 +474,7 @@ public class SparqlFluoQueryBuilder { // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr)child); final VariableOrder childVarOrder = new VariableOrder(childVars); - setChildMetadata(childNodeId, childVarOrder, filterId); + setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, filterId); // Walk to the next node. super.meet(node); @@ -442,12 +509,12 @@ public class SparqlFluoQueryBuilder { // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr) child); final VariableOrder childVarOrder = new VariableOrder(childVars); - setChildMetadata(childNodeId, childVarOrder, periodicId); + setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, periodicId); // update variable order of this node and all ancestors to // include BIN_ID binding as // first variable in the ordering - PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId); + FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.AddVariable, Arrays.asList(IncrementalUpdateConstants.PERIODIC_BIN_ID), periodicId); // Walk to the next node. node.getArg().visit(this); } @@ -458,13 +525,12 @@ public class SparqlFluoQueryBuilder { public void meet(final Projection node) { // Create a builder for this node populated with the metadata. final String queryId = nodeIds.getOrMakeId(node); - final VariableOrder queryVarOrder = new VariableOrder(node.getBindingNames()); - - final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); - fluoQueryBuilder.setQueryMetadata(queryBuilder); - queryBuilder.setSparql(sparql); - queryBuilder.setVariableOrder(queryVarOrder); + ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(queryId).orNull(); + if (projectionBuilder == null) { + projectionBuilder = ProjectionMetadata.builder(queryId); + fluoQueryBuilder.addProjectionBuilder(projectionBuilder); + } final QueryModelNode child = node.getArg(); if(child == null) { @@ -472,13 +538,14 @@ public class SparqlFluoQueryBuilder { } final String childNodeId = nodeIds.getOrMakeId(child); - queryBuilder.setChildNodeId(childNodeId); + projectionBuilder.setChildNodeId(childNodeId); + projectionBuilder.setProjectedVars(projectionBuilder.getVariableOrder()); // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr)child); final VariableOrder childVarOrder = new VariableOrder(childVars); - setChildMetadata(childNodeId, childVarOrder, queryId); + setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, queryId); // Walk to the next node. super.meet(node); @@ -489,10 +556,13 @@ public class SparqlFluoQueryBuilder { //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata //builder with FluoQueryBuilder, and add metadata that we currently have final String constructId = nodeIds.getOrMakeId(node); - final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder(); - constructBuilder.setNodeId(constructId); - fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); - constructBuilder.setSparql(sparql); + + ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull(); + if(constructBuilder == null) { + constructBuilder = ConstructQueryMetadata.builder(); + constructBuilder.setNodeId(constructId); + fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); + } //get child node QueryModelNode child = node.getArg(); @@ -531,96 +601,12 @@ public class SparqlFluoQueryBuilder { // Update the child node's metadata. final Set<String> childVars = getVars((TupleExpr)child); final VariableOrder childVarOrder = new VariableOrder(childVars); - setChildMetadata(childNodeId, childVarOrder, constructId); + setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, constructId); //fast forward visitor to next node we care about child.visit(this); } - - /** - * Update a query node's metadata to include it's binding set variable order - * and it's parent node id. This information is only known when handling - * the parent node. - * - * @param childNodeId - The node ID of the child node. - * @param childVarOrder - The variable order of the child node's binding sets. - * @param parentNodeId - The node ID that consumes the child's binding sets. - */ - private void setChildMetadata(final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) { - checkNotNull(childNodeId); - checkNotNull(childVarOrder); - checkNotNull(parentNodeId); - - final NodeType childType = NodeType.fromNodeId(childNodeId).get(); - switch (childType) { - case STATEMENT_PATTERN: - StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull(); - if (spBuilder == null) { - spBuilder = StatementPatternMetadata.builder(childNodeId); - fluoQueryBuilder.addStatementPatternBuilder(spBuilder); - } - - spBuilder.setVarOrder(childVarOrder); - spBuilder.setParentNodeId(parentNodeId); - break; - - case JOIN: - JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull(); - if (joinBuilder == null) { - joinBuilder = JoinMetadata.builder(childNodeId); - fluoQueryBuilder.addJoinMetadata(joinBuilder); - } - - joinBuilder.setVariableOrder(childVarOrder); - joinBuilder.setParentNodeId(parentNodeId); - break; - - case FILTER: - FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull(); - if (filterBuilder == null) { - filterBuilder = FilterMetadata.builder(childNodeId); - fluoQueryBuilder.addFilterMetadata(filterBuilder); - } - - filterBuilder.setVarOrder(childVarOrder); - filterBuilder.setParentNodeId(parentNodeId); - break; - - case AGGREGATION: - AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); - if (aggregationBuilder == null) { - aggregationBuilder = AggregationMetadata.builder(childNodeId); - fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); - } - - aggregationBuilder.setVariableOrder(childVarOrder); - aggregationBuilder.setParentNodeId(parentNodeId); - break; - - case QUERY: - throw new IllegalArgumentException("A QUERY node cannot be the child of another node."); - - case CONSTRUCT: - throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node."); - - case PERIODIC_QUERY: - PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); - if (periodicQueryBuilder == null) { - periodicQueryBuilder = PeriodicQueryMetadata.builder(); - periodicQueryBuilder.setNodeId(childNodeId); - fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder); - } - periodicQueryBuilder.setVarOrder(childVarOrder); - periodicQueryBuilder.setParentNodeId(parentNodeId); - break; - - default: - throw new IllegalArgumentException("Unsupported NodeType: " + childType); - - } - } - private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) { Map<String, Value> valueMap = new HashMap<>(); //create valueMap to associate source names with Values @@ -654,6 +640,13 @@ public class SparqlFluoQueryBuilder { return new ConstructGraph(constructProj); } + private Set<String> getVarsToDelete(Collection<String> groupByVars, Collection<String> varOrderVars) { + Set<String> groupBySet = Sets.newHashSet(groupByVars); + Set<String> varOrderSet = Sets.newHashSet(varOrderVars); + + return Sets.difference(varOrderSet, groupBySet); + } + private void validateProjectionElemList(ProjectionElemList list) { List<ProjectionElem> elements = list.getElements(); checkArgument(elements.size() == 3); @@ -662,8 +655,6 @@ public class SparqlFluoQueryBuilder { checkArgument(elements.get(2).getTargetName().equals("object")); } - - /** * Get the non-constant variables from a {@link TupleExpr}. * @@ -764,4 +755,199 @@ public class SparqlFluoQueryBuilder { return shifted; } } + + private void setVarOrderAndQueryType(QueryMetadata.Builder builder, TupleExpr te) { + QueryMetadataLocator locator = new QueryMetadataLocator(); + try { + te.visit(locator); + } catch (Exception e) { + throw new RuntimeException(e); + } + + builder.setVarOrder(locator.getVarOrder()); + builder.setQueryType(locator.getQueryType()); + } + + public static class QueryMetadataLocator extends QueryModelVisitorBase<Exception> { + + private VariableOrder varOrder; + private QueryType queryType; + + public VariableOrder getVarOrder() { + return varOrder; + } + + public QueryType getQueryType() { + return queryType; + } + + public void meet(Projection node) throws Exception { + Set<String> bindingNames = node.getBindingNames(); + if(varOrder == null) { + varOrder = new VariableOrder(bindingNames); + } + + if(queryType == null) { + queryType = QueryType.Projection; + } + super.meet(node); + } + + public void meet(Reduced node) throws Exception { + if(varOrder == null) { + varOrder = getConstructGraphVarOrder(node); + } + + if(queryType == null) { + queryType = QueryType.Construct; + } + super.meet(node); + } + + public void meetOther(final QueryModelNode node) throws Exception { + if (node instanceof PeriodicQueryNode) { + queryType = QueryType.Periodic; + } else { + super.meetOther(node); + } + } + } + + private static VariableOrder getConstructGraphVarOrder(Reduced node) { + + //get child node + QueryModelNode child = node.getArg(); + Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection); + UnaryTupleOperator unary = (UnaryTupleOperator) child; + + //get ProjectionElemList to build ConstructGraph + final List<ProjectionElemList> projections = new ArrayList<>(); + if(unary instanceof Projection) { + projections.add(((Projection) unary).getProjectionElemList()); + } else { + projections.addAll(((MultiProjection)unary).getProjections()); + } + + return getConstructGraphVarOrder(projections); + } + + private static VariableOrder getConstructGraphVarOrder(List<ProjectionElemList> projections) { + Set<String> varOrders = new HashSet<>(); + + for(ProjectionElemList elems: projections) { + for(ProjectionElem elem: elems.getElements()) { + String name = elem.getSourceName(); + if(!name.startsWith("-const-") && !name.startsWith("-anon-")) { + varOrders.add(name); + } + } + } + + return new VariableOrder(varOrders); + } + + + /** + * Update a query node's metadata to include it's binding set variable order + * and it's parent node id. This information is only known when handling + * the parent node. + * + * @param fluoQueryBuilder - Builder whose metadata is updatad + * @param childNodeId - The node ID of the child node. + * @param childVarOrder - The variable order of the child node's binding sets. + * @param parentNodeId - The node ID that consumes the child's binding sets. + */ + private static void setChildMetadata(final FluoQuery.Builder fluoQueryBuilder, final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) { + checkNotNull(childNodeId); + checkNotNull(childVarOrder); + checkNotNull(parentNodeId); + + final NodeType childType = NodeType.fromNodeId(childNodeId).get(); + switch (childType) { + case STATEMENT_PATTERN: + StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull(); + if (spBuilder == null) { + spBuilder = StatementPatternMetadata.builder(childNodeId); + fluoQueryBuilder.addStatementPatternBuilder(spBuilder); + } + + spBuilder.setVarOrder(childVarOrder); + spBuilder.setParentNodeId(parentNodeId); + break; + + case JOIN: + JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull(); + if (joinBuilder == null) { + joinBuilder = JoinMetadata.builder(childNodeId); + fluoQueryBuilder.addJoinMetadata(joinBuilder); + } + + joinBuilder.setVarOrder(childVarOrder); + joinBuilder.setParentNodeId(parentNodeId); + break; + + case FILTER: + FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull(); + if (filterBuilder == null) { + filterBuilder = FilterMetadata.builder(childNodeId); + fluoQueryBuilder.addFilterMetadata(filterBuilder); + } + + filterBuilder.setVarOrder(childVarOrder); + filterBuilder.setParentNodeId(parentNodeId); + break; + + case AGGREGATION: + AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); + if (aggregationBuilder == null) { + aggregationBuilder = AggregationMetadata.builder(childNodeId); + fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); + } + + aggregationBuilder.setVarOrder(childVarOrder); + aggregationBuilder.setParentNodeId(parentNodeId); + break; + + case PROJECTION: + ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(childNodeId).orNull(); + if(projectionBuilder == null) { + projectionBuilder = ProjectionMetadata.builder(childNodeId); + fluoQueryBuilder.addProjectionBuilder(projectionBuilder); + } + + projectionBuilder.setVarOrder(childVarOrder); + projectionBuilder.setParentNodeId(parentNodeId); + break; + + case QUERY: + throw new IllegalArgumentException("A QUERY node cannot be the child of another node."); + + case CONSTRUCT: + ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull(); + if(constructBuilder == null) { + constructBuilder = ConstructQueryMetadata.builder(); + constructBuilder.setNodeId(childNodeId); + fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); + } + + Preconditions.checkArgument(childNodeId.equals(constructBuilder.getNodeId())); + constructBuilder.setVarOrder(childVarOrder); + constructBuilder.setParentNodeId(parentNodeId); + break; + + case PERIODIC_QUERY: + PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); + if (periodicQueryBuilder == null) { + periodicQueryBuilder = PeriodicQueryMetadata.builder(); + periodicQueryBuilder.setNodeId(childNodeId); + fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder); + } + periodicQueryBuilder.setVarOrder(childVarOrder); + periodicQueryBuilder.setParentNodeId(parentNodeId); + break; + + default: + throw new IllegalArgumentException("Unsupported NodeType: " + childType); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java index 7de10d5..beead93 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternMetadata.java @@ -127,7 +127,7 @@ public class StatementPatternMetadata extends CommonNodeMetadata { * Builds instances of {@link StatementPatternMetadata}. */ @DefaultAnnotation(NonNull.class) - public static final class Builder { + public static final class Builder implements CommonNodeMetadata.Builder { private final String nodeId; private VariableOrder varOrder; @@ -160,6 +160,11 @@ public class StatementPatternMetadata extends CommonNodeMetadata { this.varOrder = varOrder; return this; } + + @Override + public VariableOrder getVariableOrder() { + return varOrder; + } /** * Sets the statement pattern new statements are matched against. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java new file mode 100644 index 0000000..303f9bb --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoQueryUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction; + +import com.google.common.base.Preconditions; + +/** + * Utility class for manipulating components of a {@link FluoQuery}. + * + */ +public class FluoQueryUtils { + + /** + * Updates the {@link VariableOrder}s of a given {@link FluoQuery.Builder}. + * @param builder - builder whose VariableOrders will be updated + * @param action - add or delete variables + * @param variables - variables to be added or deleted + * @param stopNodeId - node to stop at + * @return - FluoQuery.Builder with updated VariableOrders + */ + public static FluoQuery.Builder updateVarOrders(FluoQuery.Builder builder, UpdateAction action, List<String> variables, String stopNodeId) { + VariableOrderUpdateVisitor visitor = new VariableOrderUpdateVisitor(builder, action, variables, stopNodeId); + visitor.visit(); + + return builder; + } + + /** + * Converts the fluo query id to a pcj id + * @param fluoQueryId - query id of the form query_prefix + _ + UUID + * @return the pcjid which consists of only the UUID portion of the fluo query id + */ + public static String convertFluoQueryIdToPcjId(String fluoQueryId) { + Preconditions.checkNotNull(fluoQueryId); + String[] queryIdParts = fluoQueryId.split(IncrementalUpdateConstants.QUERY_PREFIX + "_"); + Preconditions.checkArgument(queryIdParts.length == 2 && queryIdParts[1]!= null && queryIdParts[1].length() > 0); + return queryIdParts[1]; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java index fd24af2..406ba4c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java @@ -20,7 +20,6 @@ package org.apache.rya.indexing.pcj.fluo.app.util; import static com.google.common.base.Preconditions.checkArgument; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; @@ -30,13 +29,8 @@ import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.data.Bytes; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.model.Literal; import org.openrdf.model.URI; import org.openrdf.model.Value; @@ -194,72 +188,6 @@ public class PeriodicQueryUtil { } /** - * Adds the variable "periodicBinId" to the beginning of all {@link VariableOrder}s for the - * Metadata nodes that appear above the PeriodicQueryNode. This ensures that the binId is - * written first in the Row so that bins can be easily scanned and deleted. - * @param builder - * @param nodeId - */ - public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, String nodeId) { - NodeType type = NodeType.fromNodeId(nodeId).orNull(); - if (type == null) { - throw new IllegalArgumentException("NodeId must be associated with an existing MetadataBuilder."); - } - switch (type) { - case AGGREGATION: - AggregationMetadata.Builder aggBuilder = builder.getAggregateBuilder(nodeId).orNull(); - if (aggBuilder != null) { - VariableOrder varOrder = aggBuilder.getVariableOrder(); - VariableOrder groupOrder = aggBuilder.getGroupByVariableOrder(); - // update varOrder with BIN_ID - List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); - orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); - aggBuilder.setVariableOrder(new VariableOrder(orderList)); - // update groupVarOrder with BIN_ID - List<String> groupOrderList = new ArrayList<>(groupOrder.getVariableOrders()); - groupOrderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); - aggBuilder.setGroupByVariableOrder(new VariableOrder(groupOrderList)); - // recursive call to update the VariableOrders of all ancestors - // of this node - updateVarOrdersToIncludeBin(builder, aggBuilder.getParentNodeId()); - } else { - throw new IllegalArgumentException("There is no AggregationMetadata.Builder for the indicated Id."); - } - break; - case PERIODIC_QUERY: - PeriodicQueryMetadata.Builder periodicBuilder = builder.getPeriodicQueryBuilder().orNull(); - if (periodicBuilder != null && periodicBuilder.getNodeId().equals(nodeId)) { - VariableOrder varOrder = periodicBuilder.getVarOrder(); - List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); - orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); - periodicBuilder.setVarOrder(new VariableOrder(orderList)); - // recursive call to update the VariableOrders of all ancestors - // of this node - updateVarOrdersToIncludeBin(builder, periodicBuilder.getParentNodeId()); - } else { - throw new IllegalArgumentException( - "PeriodicQueryMetadata.Builder id does not match the indicated id. A query cannot have more than one PeriodicQueryMetadata Node."); - } - break; - case QUERY: - QueryMetadata.Builder queryBuilder = builder.getQueryBuilder().orNull(); - if (queryBuilder != null && queryBuilder.getNodeId().equals(nodeId)) { - VariableOrder varOrder = queryBuilder.getVariableOrder(); - List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); - orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); - queryBuilder.setVariableOrder(new VariableOrder(orderList)); - } else { - throw new IllegalArgumentException( - "QueryMetadata.Builder id does not match the indicated id. A query cannot have more than one QueryMetadata Node."); - } - break; - default: - throw new IllegalArgumentException( - "Incorrectly positioned PeriodicQueryNode. The PeriodicQueryNode can only be positioned below Projections, Extensions, and ConstructQueryNodes."); - } - } - - /** * Collects all Metadata node Ids that are ancestors of the PeriodicQueryNode and contain the variable * {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}. * @param sx - Fluo Snapshot for scanning Fluo @@ -277,6 +205,10 @@ public class PeriodicQueryUtil { case PERIODIC_QUERY: ids.add(nodeId); break; + case PROJECTION: + ids.add(nodeId); + getPeriodicQueryNodeAncestorIds(sx, sx.get( Bytes.of(nodeId), FluoQueryColumns.PROJECTION_CHILD_NODE_ID).toString(), ids); + break; case QUERY: ids.add(nodeId); getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java new file mode 100644 index 0000000..f433849 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/VariableOrderUpdateVisitor.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryBuilderVisitorBase; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Preconditions; + +/** + * Visitor that traverses a {@link FluoQuery.Builder} and performs the indicated {@link UpdateAction} + * on the {@link VariableOrder}s of each node using a provided list of variables. The visitor + * either adds the provided list of variables to the VariableOrder of each node or deletes the + * provided variables from the VariableOrder of each node. + * + */ +public class VariableOrderUpdateVisitor extends QueryBuilderVisitorBase { + + /** + * Enum class indicating whether to add or delete variables from + * the VariableOrders of nodes in the FluoQuery. + * + */ + public static enum UpdateAction { + AddVariable, DeleteVariable + }; + + private UpdateAction action; + private List<String> variables; + private String stopNodeId; + + /** + * Creates a VariableOrderUpdateVisitor to update the variables in a given FluoQuery.Builder + * @param fluoBuilder - builder whose VariableOrder will be updated + * @param action - either add or delete + * @param variables - variables to be added or deleted + * @param stopNodeId - indicates the builder node to stop at + */ + public VariableOrderUpdateVisitor(FluoQuery.Builder fluoBuilder, UpdateAction action, List<String> variables, String stopNodeId) { + super(fluoBuilder); + this.action = Preconditions.checkNotNull(action); + this.variables = Preconditions.checkNotNull(variables); + this.stopNodeId = Preconditions.checkNotNull(stopNodeId); + } + + public void visit(QueryMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(ProjectionMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(action == UpdateAction.AddVariable) { + builder.setProjectedVars(updateOrder(builder.getProjectionVars())); + } + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(ConstructQueryMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(FilterMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(PeriodicQueryMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(JoinMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(AggregationMetadata.Builder builder) { + builder.setVarOrder(updateOrder(builder.getVariableOrder())); + builder.setGroupByVariableOrder(updateOrder(builder.getGroupByVariableOrder())); + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + public void visit(StatementPatternMetadata.Builder builder) { + if(!atStopNode(builder.getNodeId())) { + super.visit(builder); + } + } + + boolean atStopNode(String nodeId) { + return nodeId.equals(stopNodeId); + } + + private VariableOrder updateOrder(VariableOrder varOrder) { + + switch (action) { + case AddVariable: + varOrder = addBindingToOrder(varOrder); + break; + case DeleteVariable: + varOrder = deleteBindingFromOrder(varOrder); + break; + } + return varOrder; + } + + private VariableOrder addBindingToOrder(VariableOrder varOrder) { + List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); + orderList.addAll(0, variables); + return new VariableOrder(orderList); + } + + private VariableOrder deleteBindingFromOrder(VariableOrder varOrder) { + List<String> vars = new ArrayList<>(); + varOrder.getVariableOrders().forEach(x -> { + if (!variables.contains(x) || x.equals(IncrementalUpdateConstants.PERIODIC_BIN_ID)) { + vars.add(x); + } + }); + return new VariableOrder(vars); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java index c8ca6af..b40ba3f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor; @@ -162,17 +162,17 @@ public class PeriodicQueryUtilTest { + "?obs <uri:hasTime> ?time. " //n + "?obs <uri:hasLattitude> ?lat }"; //n - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq = parser.parseQuery(query, null); SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); - FluoQuery fluoQuery = builder.make(pq, new NodeIds()); + builder.setSparql(query); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + FluoQuery fluoQuery = builder.build(); PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull(); Assert.assertEquals(true, periodicMeta != null); VariableOrder periodicVars = periodicMeta.getVariableOrder(); Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0)); - QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get(); + QueryMetadata queryMeta = fluoQuery.getQueryMetadata(); VariableOrder queryVars = queryMeta.getVariableOrder(); Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java new file mode 100644 index 0000000..b432868 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryBuilderVisitorTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.junit.Assert; +import org.junit.Test; + +public class QueryBuilderVisitorTest { + + @Test + public void builderTest() { + + FluoQuery.Builder fluoBuilder = FluoQuery.builder(); + + String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY); + String projectionId = NodeType.generateNewFluoIdForType(NodeType.PROJECTION); + String joinId = NodeType.generateNewFluoIdForType(NodeType.JOIN); + String leftSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + String rightSp = NodeType.generateNewFluoIdForType(NodeType.STATEMENT_PATTERN); + + List<String> expected = Arrays.asList(queryId, projectionId, joinId, leftSp, rightSp); + + QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); + queryBuilder.setChildNodeId(projectionId); + + ProjectionMetadata.Builder projectionBuilder = ProjectionMetadata.builder(projectionId); + projectionBuilder.setChildNodeId(joinId); + + JoinMetadata.Builder joinBuilder = JoinMetadata.builder(joinId); + joinBuilder.setLeftChildNodeId(leftSp); + joinBuilder.setRightChildNodeId(rightSp); + + StatementPatternMetadata.Builder left = StatementPatternMetadata.builder(leftSp); + StatementPatternMetadata.Builder right = StatementPatternMetadata.builder(rightSp); + + fluoBuilder.setQueryMetadata(queryBuilder); + fluoBuilder.addProjectionBuilder(projectionBuilder); + fluoBuilder.addJoinMetadata(joinBuilder); + fluoBuilder.addStatementPatternBuilder(left); + fluoBuilder.addStatementPatternBuilder(right); + + QueryBuilderPrinter printer = new QueryBuilderPrinter(fluoBuilder); + printer.visit(); + Assert.assertEquals(expected, printer.getIds()); + } + + + public static class QueryBuilderPrinter extends QueryBuilderVisitorBase { + + private List<String> ids = new ArrayList<>(); + + public List<String> getIds() { + return ids; + } + + public QueryBuilderPrinter(FluoQuery.Builder builder) { + super(builder); + } + + public void visit(QueryMetadata.Builder queryBuilder) { + System.out.println(queryBuilder.getNodeId()); + ids.add(queryBuilder.getNodeId()); + super.visit(queryBuilder); + } + + public void visit(ProjectionMetadata.Builder projectionBuilder) { + System.out.println(projectionBuilder.getNodeId()); + ids.add(projectionBuilder.getNodeId()); + super.visit(projectionBuilder); + } + + public void visit(JoinMetadata.Builder joinBuilder) { + System.out.println(joinBuilder.getNodeId()); + ids.add(joinBuilder.getNodeId()); + super.visit(joinBuilder); + } + + public void visit(StatementPatternMetadata.Builder statementBuilder) { + System.out.println(statementBuilder.getNodeId()); + ids.add(statementBuilder.getNodeId()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java new file mode 100644 index 0000000..5c89a75 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadataVisitorTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.junit.Assert; +import org.junit.Test; + +public class QueryMetadataVisitorTest { + + @Test + public void builderTest() { + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n + + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + builder.setSparql(query); + FluoQuery fluoQuery = builder.build(); + + QueryMetadata queryMetadata = fluoQuery.getQueryMetadata(); + String queryId = queryMetadata.getNodeId(); + String projectionId = queryMetadata.getChildNodeId(); + String aggId = fluoQuery.getProjectionMetadata(projectionId).get().getChildNodeId(); + String periodicId = fluoQuery.getAggregationMetadata(aggId).get().getChildNodeId(); + String joinId = fluoQuery.getPeriodicQueryMetadata(periodicId).get().getChildNodeId(); + String leftSp = fluoQuery.getJoinMetadata(joinId).get().getLeftChildNodeId(); + String rightSp = fluoQuery.getJoinMetadata(joinId).get().getRightChildNodeId(); + + List<String> expected = Arrays.asList(queryId, projectionId, aggId, periodicId, joinId, leftSp, rightSp); + QueryMetadataVisitor visitor = new QueryMetadataVisitor(fluoQuery); + visitor.visit(); + + Assert.assertEquals(expected, visitor.getIds()); + } + + + public static class QueryMetadataVisitor extends QueryMetadataVisitorBase { + + private List<String> ids = new ArrayList<>(); + + public List<String> getIds() { + return ids; + } + + public QueryMetadataVisitor(FluoQuery metadata) { + super(metadata); + } + + public void visit(QueryMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(ProjectionMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(JoinMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(StatementPatternMetadata metadata) { + ids.add(metadata.getNodeId()); + } + + public void visit(PeriodicQueryMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(FilterMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + + public void visit(AggregationMetadata metadata) { + ids.add(metadata.getNodeId()); + super.visit(metadata); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java index 854798d..3f335f4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -41,7 +41,7 @@ import org.apache.logging.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; import org.apache.rya.indexing.pcj.storage.PcjException; @@ -124,7 +124,7 @@ public class NewQueryCommand implements PcjAdminClientCommand { log.trace("SPARQL Query: " + request.getQuery()); log.trace("Var Orders: " + request.getVarOrders()); log.trace("Loading these values into the Fluo app."); - final CreatePcj createPcj = new CreatePcj(); + final CreateFluoPcj createPcj = new CreateFluoPcj(); try { // Create the PCJ in Rya. final String sparql = request.getQuery();
