http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 77d6a49..3396114 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -46,6 +46,18 @@ import edu.umd.cs.findbugs.annotations.NonNull; * </table> * </p> * <p> + * <b>Construct Query Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:nodeId</td> <td>The Node ID of the Query.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:sparql</td> <td>The original SPARQL query that is being computed by this query.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:graph</td> <td>The construct graph used to project BindingSets to statements.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:childNodeId</td> <td>The Node ID of the child who feeds this node.</td> </tr> + * <tr> <td>Node ID</td> <td>constructMetadata:statements</td> <td>The RDF statements produced by this construct query node.</td> </tr> + * </table> + * </p> + * <p> * <b>Filter Metadata</b> * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> @@ -104,6 +116,7 @@ public class FluoQueryColumns { public static final String JOIN_METADATA_CF = "joinMetadata"; public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; public static final String AGGREGATION_METADATA_CF = "aggregationMetadata"; + public static final String CONSTRUCT_METADATA_CF = "constructMetadata"; /** * New triples that have been added to Rya are written as a row in this @@ -151,6 +164,14 @@ public class FluoQueryColumns { public static final Column QUERY_CHILD_NODE_ID = new Column(QUERY_METADATA_CF, "childNodeId"); public static final Column QUERY_BINDING_SET = new Column(QUERY_METADATA_CF, "bindingSet"); + // Construct Query Metadata columns. + public static final Column CONSTRUCT_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "nodeId"); + public static final Column CONSTRUCT_VARIABLE_ORDER = new Column(CONSTRUCT_METADATA_CF, "variableOrder"); + public static final Column CONSTRUCT_GRAPH = new Column(CONSTRUCT_METADATA_CF, "graph"); + public static final Column CONSTRUCT_CHILD_NODE_ID = new Column(CONSTRUCT_METADATA_CF, "childNodeId"); + public static final Column CONSTRUCT_STATEMENTS = new Column(CONSTRUCT_METADATA_CF, "statements"); + public static final Column CONSTRUCT_SPARQL = new Column(CONSTRUCT_METADATA_CF, "sparql"); + // Filter Metadata columns. public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId"); public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder"); @@ -201,6 +222,18 @@ public class FluoQueryColumns { QUERY_CHILD_NODE_ID)), /** + * The columns a {@link ConstructQueryMetadata} object's fields are stored within. + */ + CONSTRUCT_COLUMNS( + Arrays.asList(CONSTRUCT_NODE_ID, + CONSTRUCT_VARIABLE_ORDER, + CONSTRUCT_GRAPH, + CONSTRUCT_CHILD_NODE_ID, + CONSTRUCT_SPARQL, + CONSTRUCT_STATEMENTS)), + + + /** * The columns a {@link FilterMetadata} object's fields are stored within. */ FILTER_COLUMNS(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index dfc3333..5e9d654 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -31,7 +31,8 @@ import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; +import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; @@ -39,6 +40,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -103,6 +105,59 @@ public class FluoQueryMetadataDAO { } /** + * Write an instance of {@link ConstructQueryMetadata} to the Fluo table. + * + * @param tx - The transaction that will be used to commit the metadata. (not null) + * @param metadata - The Construct Query node metadata that will be written to the table. (not null) + */ + public void write(final TransactionBase tx, final ConstructQueryMetadata metadata) { + requireNonNull(tx); + requireNonNull(metadata); + + final String rowId = metadata.getNodeId(); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID, metadata.getChildNodeId() ); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_SPARQL, metadata.getSparql()); + tx.set(rowId, FluoQueryColumns.CONSTRUCT_GRAPH, ConstructGraphSerializer.toConstructString(metadata.getConstructGraph())); + } + + /** + * Read an instance of {@link ConstructQueryMetadata} from the Fluo table. + * + * @param sx - The snapshot that will be used to read the metadata . (not null) + * @param nodeId - The nodeId of the Construct Query node that will be read. (not null) + * @return The {@link ConstructQueryMetadata} that was read from table. + */ + public ConstructQueryMetadata readConstructQueryMetadata(final SnapshotBase sx, final String nodeId) { + return readConstructQueryMetadataBuilder(sx, nodeId).build(); + } + + private ConstructQueryMetadata.Builder readConstructQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) { + requireNonNull(sx); + requireNonNull(nodeId); + + // Fetch the values from the Fluo table. + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, + FluoQueryColumns.CONSTRUCT_GRAPH, + FluoQueryColumns.CONSTRUCT_SPARQL, + FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID); + + final String graphString = values.get(FluoQueryColumns.CONSTRUCT_GRAPH); + final ConstructGraph graph = ConstructGraphSerializer.toConstructGraph(graphString); + final String childNodeId = values.get(FluoQueryColumns.CONSTRUCT_CHILD_NODE_ID); + final String sparql = values.get(FluoQueryColumns.CONSTRUCT_SPARQL); + + return ConstructQueryMetadata.builder() + .setNodeId(nodeId) + .setConstructGraph(graph) + .setSparql(sparql) + .setChildNodeId(childNodeId); + } + + + /** * Write an instance of {@link FilterMetadata} to the Fluo table. * * @param tx - The transaction that will be used to commit the metadata. (not null) @@ -376,13 +431,25 @@ public class FluoQueryMetadataDAO { requireNonNull(tx); requireNonNull(query); - // Store the Query ID so that it may be looked up from the original SPARQL string. - final String sparql = query.getQueryMetadata().getSparql(); - final String queryId = query.getQueryMetadata().getNodeId(); - tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); - // Write the rest of the metadata objects. - write(tx, query.getQueryMetadata()); + switch(query.getQueryType()) { + case Construct: + ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); + // Store the Query ID so that it may be looked up from the original SPARQL string. + final String constructSparql = constructMetadata.getSparql(); + final String constructQueryId = constructMetadata.getNodeId(); + tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId)); + write(tx, constructMetadata); + break; + case Projection: + QueryMetadata queryMetadata = query.getQueryMetadata().get(); + // Store the Query ID so that it may be looked up from the original SPARQL string. + final String sparql = queryMetadata.getSparql(); + final String queryId = queryMetadata.getNodeId(); + tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); + write(tx, queryMetadata); + break; + } for(final FilterMetadata filter : query.getFilterMetadata()) { write(tx, filter); @@ -423,50 +490,62 @@ public class FluoQueryMetadataDAO { requireNonNull(childNodeId); final NodeType childType = NodeType.fromNodeId(childNodeId).get(); - switch(childType) { - case QUERY: - // Add this node's metadata. - final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId); - builder.setQueryMetadata(queryBuilder); - - // Add it's child's metadata. - addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId()); - break; - - case JOIN: - // Add this node's metadata. - final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId); - builder.addJoinMetadata(joinBuilder); - - // Add it's children's metadata. - final JoinMetadata joinMetadata = joinBuilder.build(); - addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId()); - addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId()); - break; - - case FILTER: - // Add this node's metadata. - final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId); - builder.addFilterMetadata(filterBuilder); - - // Add it's child's metadata. - addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId()); - break; - - case STATEMENT_PATTERN: - // Add this node's metadata. - final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId); - builder.addStatementPatternBuilder(spBuilder); - break; - - case AGGREGATION: - // Add this node's metadata. - final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId); - builder.addAggregateMetadata(aggregationBuilder); - - // Add it's child's metadata. - addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId()); - break; + switch (childType) { + case QUERY: + // Add this node's metadata. + final QueryMetadata.Builder queryBuilder = readQueryMetadataBuilder(sx, childNodeId); + Preconditions.checkArgument(!builder.getQueryBuilder().isPresent()); + builder.setQueryMetadata(queryBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, queryBuilder.build().getChildNodeId()); + break; + + case CONSTRUCT: + final ConstructQueryMetadata.Builder constructBuilder = readConstructQueryMetadataBuilder(sx, childNodeId); + Preconditions.checkArgument(!builder.getQueryBuilder().isPresent()); + builder.setConstructQueryMetadata(constructBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId()); + break; + + case AGGREGATION: + // Add this node's metadata. + final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId); + builder.addAggregateMetadata(aggregationBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, aggregationBuilder.build().getChildNodeId()); + break; + + case JOIN: + // Add this node's metadata. + final JoinMetadata.Builder joinBuilder = readJoinMetadataBuilder(sx, childNodeId); + builder.addJoinMetadata(joinBuilder); + + // Add it's children's metadata. + final JoinMetadata joinMetadata = joinBuilder.build(); + addChildMetadata(sx, builder, joinMetadata.getLeftChildNodeId()); + addChildMetadata(sx, builder, joinMetadata.getRightChildNodeId()); + break; + + case FILTER: + // Add this node's metadata. + final FilterMetadata.Builder filterBuilder = readFilterMetadataBuilder(sx, childNodeId); + builder.addFilterMetadata(filterBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, filterBuilder.build().getChildNodeId()); + break; + + case STATEMENT_PATTERN: + // Add this node's metadata. + final StatementPatternMetadata.Builder spBuilder = readStatementPatternMetadataBuilder(sx, childNodeId); + builder.addStatementPatternBuilder(spBuilder); + break; + default: + break; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java index 064cfe8..23ac286 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -129,7 +129,7 @@ public class QueryMetadata extends CommonNodeMetadata { @DefaultAnnotation(NonNull.class) public static final class Builder { - private final String nodeId; + private String nodeId; private VariableOrder varOrder; private String sparql; private String childNodeId; @@ -143,6 +143,7 @@ public class QueryMetadata extends CommonNodeMetadata { this.nodeId = checkNotNull(nodeId); } + /** * Set the variable order of binding sets that are emitted by this node. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 562470a..631ce60 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -19,7 +19,9 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; @@ -28,6 +30,7 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -35,6 +38,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; +import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection; import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.NodeType; @@ -42,21 +47,33 @@ import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.Aggregatio import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Value; +import org.openrdf.model.impl.BNodeImpl; import org.openrdf.query.algebra.AggregateOperator; +import org.openrdf.query.algebra.BNodeGenerator; import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; import org.openrdf.query.algebra.Filter; import org.openrdf.query.algebra.Group; import org.openrdf.query.algebra.GroupElem; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.MultiProjection; import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.Reduced; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import org.openrdf.query.parser.ParsedQuery; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -158,16 +175,18 @@ public class SparqlFluoQueryBuilder { // Create the prefix of the id. This makes it a little bit more human readable. String prefix; - if(node instanceof StatementPattern) { + if (node instanceof StatementPattern) { prefix = SP_PREFIX; - } else if(node instanceof Filter) { + } else if (node instanceof Filter) { prefix = FILTER_PREFIX; - } else if(node instanceof Join || node instanceof LeftJoin) { + } else if (node instanceof Join || node instanceof LeftJoin) { prefix = JOIN_PREFIX; - } else if(node instanceof Projection) { + } else if (node instanceof Projection) { prefix = QUERY_PREFIX; } else if(node instanceof Extension) { prefix = AGGREGATION_PREFIX; + } else if (node instanceof Reduced) { + prefix = CONSTRUCT_PREFIX; } else { throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass()); } @@ -402,7 +421,7 @@ public class SparqlFluoQueryBuilder { final QueryModelNode child = node.getArg(); if(child == null) { - throw new IllegalArgumentException("Filter arg connot be null."); + throw new IllegalArgumentException("Projection arg connot be null."); } final String childNodeId = nodeIds.getOrMakeId(child); @@ -417,6 +436,60 @@ public class SparqlFluoQueryBuilder { // Walk to the next node. super.meet(node); } + + + public void meet(Reduced node) { + //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata + //builder with FluoQueryBuilder, and add metadata that we currently have + final String constructId = nodeIds.getOrMakeId(node); + final ConstructQueryMetadata.Builder constructBuilder = ConstructQueryMetadata.builder(); + constructBuilder.setNodeId(constructId); + fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); + constructBuilder.setSparql(sparql); + + //get child node + QueryModelNode child = node.getArg(); + Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection); + UnaryTupleOperator unary = (UnaryTupleOperator) child; + + //get ProjectionElemList to build ConstructGraph + final List<ProjectionElemList> projections = new ArrayList<>(); + if(unary instanceof Projection) { + projections.add(((Projection) unary).getProjectionElemList()); + } else { + projections.addAll(((MultiProjection)unary).getProjections()); + } + + //get ExtensionElems to build ConstructGraph + QueryModelNode grandChild = unary.getArg(); + Preconditions.checkArgument(grandChild instanceof Extension); + Extension extension = (Extension) grandChild; + final List<ExtensionElem> extensionElems = extension.getElements(); + final ConstructGraph graph = getConstructGraph(projections, extensionElems); + constructBuilder.setConstructGraph(graph); + + //set child to the next node we care about in Fluo + //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension + //otherwise set child to Extension's child (only care about Extensions if they are Aggregations) + if(extension.getArg() instanceof Group) { + child = extension; + } else { + child = extension.getArg(); + } + + //Set the child node in the ConstructQueryMetadataBuilder + String childNodeId = nodeIds.getOrMakeId(child); + constructBuilder.setChildNodeId(childNodeId); + + // Update the child node's metadata. + final Set<String> childVars = getVars((TupleExpr)child); + final VariableOrder childVarOrder = new VariableOrder(childVars); + setChildMetadata(childNodeId, childVarOrder, constructId); + + //fast forward visitor to next node we care about + child.visit(this); + } + /** * Update a query node's metadata to include it's binding set variable order @@ -433,57 +506,102 @@ public class SparqlFluoQueryBuilder { checkNotNull(parentNodeId); final NodeType childType = NodeType.fromNodeId(childNodeId).get(); - switch(childType) { - case STATEMENT_PATTERN: - StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull(); - if(spBuilder == null) { - spBuilder = StatementPatternMetadata.builder(childNodeId); - fluoQueryBuilder.addStatementPatternBuilder(spBuilder); - } + switch (childType) { + case STATEMENT_PATTERN: + StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull(); + if (spBuilder == null) { + spBuilder = StatementPatternMetadata.builder(childNodeId); + fluoQueryBuilder.addStatementPatternBuilder(spBuilder); + } - spBuilder.setVarOrder(childVarOrder); - spBuilder.setParentNodeId(parentNodeId); - break; + spBuilder.setVarOrder(childVarOrder); + spBuilder.setParentNodeId(parentNodeId); + break; - case JOIN: - JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull(); - if(joinBuilder == null) { - joinBuilder = JoinMetadata.builder(childNodeId); - fluoQueryBuilder.addJoinMetadata(joinBuilder); - } + case JOIN: + JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull(); + if (joinBuilder == null) { + joinBuilder = JoinMetadata.builder(childNodeId); + fluoQueryBuilder.addJoinMetadata(joinBuilder); + } - joinBuilder.setVariableOrder(childVarOrder); - joinBuilder.setParentNodeId(parentNodeId); - break; + joinBuilder.setVariableOrder(childVarOrder); + joinBuilder.setParentNodeId(parentNodeId); + break; - case FILTER: - FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull(); - if(filterBuilder == null) { - filterBuilder = FilterMetadata.builder(childNodeId); - fluoQueryBuilder.addFilterMetadata(filterBuilder); - } + case FILTER: + FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull(); + if (filterBuilder == null) { + filterBuilder = FilterMetadata.builder(childNodeId); + fluoQueryBuilder.addFilterMetadata(filterBuilder); + } - filterBuilder.setVarOrder(childVarOrder); - filterBuilder.setParentNodeId(parentNodeId); - break; + filterBuilder.setVarOrder(childVarOrder); + filterBuilder.setParentNodeId(parentNodeId); + break; - case AGGREGATION: - AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); - if(aggregationBuilder == null) { - aggregationBuilder = AggregationMetadata.builder(childNodeId); - fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); - } + case AGGREGATION: + AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); + if (aggregationBuilder == null) { + aggregationBuilder = AggregationMetadata.builder(childNodeId); + fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); + } - aggregationBuilder.setVariableOrder(childVarOrder); - aggregationBuilder.setParentNodeId(parentNodeId); - break; + aggregationBuilder.setVariableOrder(childVarOrder); + aggregationBuilder.setParentNodeId(parentNodeId); + break; - case QUERY: - throw new IllegalArgumentException("QUERY nodes do not have children."); - default: - throw new IllegalArgumentException("Unsupported NodeType: " + childType); + case QUERY: + throw new IllegalArgumentException("A QUERY node cannot be the child of another node."); + case CONSTRUCT: + throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node."); + default: + throw new IllegalArgumentException("Unsupported NodeType: " + childType); + } + } + + private ConstructGraph getConstructGraph(List<ProjectionElemList> projections, List<ExtensionElem> extensionElems) { + Map<String, Value> valueMap = new HashMap<>(); + //create valueMap to associate source names with Values + for(ExtensionElem elem: extensionElems) { + String name = elem.getName(); + ValueExpr expr = elem.getExpr(); + if(expr instanceof ValueConstant) { + Value value = ((ValueConstant) expr).getValue(); + valueMap.put(name, value); + } else if(expr instanceof BNodeGenerator) { + valueMap.put(name, new BNodeImpl(UUID.randomUUID().toString())); + } + } + + Set<ConstructProjection> constructProj = new HashSet<>(); + //build ConstructProjection for each ProjectionElemList + for(ProjectionElemList list: projections) { + validateProjectionElemList(list); + List<Var> vars = new ArrayList<>(); + for(ProjectionElem elem: list.getElements()) { + String sourceName = elem.getSourceName(); + Var var = new Var(sourceName); + if(valueMap.containsKey(sourceName)) { + var.setValue(valueMap.get(sourceName)); + } + vars.add(var); + } + constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2))); } + + return new ConstructGraph(constructProj); + } + + private void validateProjectionElemList(ProjectionElemList list) { + List<ProjectionElem> elements = list.getElements(); + checkArgument(elements.size() == 3); + checkArgument(elements.get(0).getTargetName().equals("subject")); + checkArgument(elements.get(1).getTargetName().equals("predicate")); + checkArgument(elements.get(2).getTargetName().equals("object")); } + + /** * Get the non-constant variables from a {@link TupleExpr}. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java new file mode 100644 index 0000000..94c8571 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTest.java @@ -0,0 +1,145 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.collect.Sets; + +public class ConstructGraphTest { + + private ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void testConstructGraph() throws MalformedQueryException, UnsupportedEncodingException { + String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructGraph graph = new ConstructGraph(patterns); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("x", vf.createURI("uri:Joe")); + bs.addBinding("y", vf.createURI("uri:Bob")); + bs.addBinding("z", vf.createURI("uri:BurgerShack")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs,"FOUO"); + Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs); + + RyaStatement statement1 = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob")); + RyaStatement statement2 = new RyaStatement(new RyaURI("uri:Bob"), new RyaURI("uri:worksAt"), new RyaURI("uri:BurgerShack")); + Set<RyaStatement> expected = Sets.newHashSet(Arrays.asList(statement1, statement2)); + expected.forEach(x-> x.setColumnVisibility("FOUO".getBytes())); + ConstructGraphTestUtils.ryaStatementSetsEqualIgnoresTimestamp(expected, statements); + } + + @Test + public void testConstructGraphBNode() throws MalformedQueryException { + String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?z }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructGraph graph = new ConstructGraph(patterns); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("x", vf.createURI("uri:Joe")); + bs.addBinding("z", vf.createURI("uri:BurgerShack")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO"); + Set<RyaStatement> statements = graph.createGraphFromBindingSet(vBs); + Set<RyaStatement> statements2 = graph.createGraphFromBindingSet(vBs); + + RyaURI subject = null; + for(RyaStatement statement: statements) { + RyaURI subjURI = statement.getSubject(); + if(subject == null) { + subject = subjURI; + } else { + assertEquals(subjURI, subject); + } + } + RyaURI subject2 = null; + for(RyaStatement statement: statements2) { + RyaURI subjURI = statement.getSubject(); + if(subject2 == null) { + subject2 = subjURI; + } else { + assertEquals(subjURI, subject2); + } + } + + assertTrue(!subject.equals(subject2)); + + ConstructGraphTestUtils.ryaStatementsEqualIgnoresBlankNode(statements, statements2); + } + + + @Test + public void testConstructGraphSerializer() throws MalformedQueryException { + + String query = "select ?x where { ?x <uri:talksTo> <uri:Bob>. ?y <uri:worksAt> ?z }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructGraph graph = new ConstructGraph(patterns); + + String constructString = ConstructGraphSerializer.toConstructString(graph); + ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString); + + assertEquals(graph, deserialized); + + } + + @Test + public void testConstructGraphSerializerBlankNode() throws MalformedQueryException { + + String query = "select ?x where { _:b <uri:talksTo> ?x. _:b <uri:worksAt> ?y }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructGraph graph = new ConstructGraph(patterns); + + String constructString = ConstructGraphSerializer.toConstructString(graph); + ConstructGraph deserialized = ConstructGraphSerializer.toConstructGraph(constructString); + + assertEquals(graph, deserialized); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java new file mode 100644 index 0000000..a12b6de --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructGraphTestUtils.java @@ -0,0 +1,126 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.junit.Assert; +import org.openrdf.model.Statement; + +import com.google.common.base.Objects; + +public class ConstructGraphTestUtils { + + public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) { + Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2)); + } + + public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) { + Set<VisibilityStatementSet> set1 = new HashSet<>(); + Set<VisibilityStatementSet> set2 = new HashSet<>(); + subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements()))); + subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements()))); + Assert.assertEquals(set1, set2); + } + + public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) { + Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>(); + subgraph1.forEach(x->subGraphMap.put(getKey(x), x)); + subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements())); + } + + private static int getKey(RyaSubGraph subgraph) { + int key = 0; + for(RyaStatement statement: subgraph.getStatements()) { + key += statement.getObject().hashCode(); + } + return key; + } + + public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) { + Map<String, RyaURI> bNodeMap = new HashMap<>(); + statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject())); + statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData()))); + ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2); + } + + + /** + * Class used for comparing Sets of RyaStatements while ignoring timestamps. + * It is assumed that all RyaStatements in the Set used to construct this class + * have the same visibility. + */ + public static class VisibilityStatementSet { + + private Set<Statement> statements; + private String visibility; + + public VisibilityStatementSet(Set<RyaStatement> statements) { + this.statements = new HashSet<>(); + statements.forEach(x -> { + this.statements.add(RyaToRdfConversions.convertStatement(x)); + if (visibility == null) { + if (x.getColumnVisibility() != null) { + visibility = new String(x.getColumnVisibility()); + } else { + this.visibility = ""; + } + } + }); + } + + public VisibilityStatementSet(RyaSubGraph subgraph) { + this(subgraph.getStatements()); + } + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } + + if(o instanceof VisibilityStatementSet) { + VisibilityStatementSet that = (VisibilityStatementSet) o; + return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(visibility, statements); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + return builder.append("Visiblity Statement Set \n").append(" Statements: " + statements + "\n") + .append(" Visibilities: " + visibility + " \n").toString(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java new file mode 100644 index 0000000..080031e --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjectionTest.java @@ -0,0 +1,112 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static org.junit.Assert.assertEquals; + +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.BNode; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +public class ConstructProjectionTest { + + private static final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void testConstructProjectionProjectSubj() throws MalformedQueryException, UnsupportedEncodingException { + String query = "select ?x where { ?x <uri:talksTo> <uri:Bob> }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructProjection projection = new ConstructProjection(patterns.get(0)); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("x", vf.createURI("uri:Joe")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs, "FOUO"); + RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>()); + + RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob")); + expected.setColumnVisibility("FOUO".getBytes("UTF-8")); + expected.setTimestamp(statement.getTimestamp()); + + assertEquals(expected, statement); + } + + @Test + public void testConstructProjectionProjPred() throws MalformedQueryException { + String query = "select ?p where { <uri:Joe> ?p <uri:Bob> }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructProjection projection = new ConstructProjection(patterns.get(0)); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("p", vf.createURI("uri:worksWith")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + RyaStatement statement = projection.projectBindingSet(vBs, new HashMap<>()); + + RyaStatement expected = new RyaStatement(new RyaURI("uri:Joe"), new RyaURI("uri:worksWith"), new RyaURI("uri:Bob")); + expected.setTimestamp(statement.getTimestamp()); + expected.setColumnVisibility(new byte[0]); + + assertEquals(expected, statement); + } + + @Test + public void testConstructProjectionBNodes() throws MalformedQueryException { + String query = "select ?o where { _:b <uri:talksTo> ?o }"; + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + List<StatementPattern> patterns = StatementPatternCollector.process(pq.getTupleExpr()); + ConstructProjection projection = new ConstructProjection(patterns.get(0)); + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("o", vf.createURI("uri:Bob")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + BNode bNode = vf.createBNode(); + Map<String, BNode> bNodeMap = new HashMap<>(); + bNodeMap.put("-anon-1", bNode); + RyaStatement statement = projection.projectBindingSet(vBs,bNodeMap); + + RyaStatement expected = new RyaStatement(RdfToRyaConversions.convertResource(bNode), new RyaURI("uri:talksTo"), new RyaURI("uri:Bob")); + expected.setTimestamp(statement.getTimestamp()); + expected.setColumnVisibility(new byte[0]); + + assertEquals(expected, statement); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java index 4ad5189..60e1bc1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverterTest.java @@ -38,10 +38,8 @@ public class FluoStringConverterTest { // Setup a StatementPattern that represents "?x <http://worksAt> <http://Chipotle>." final Var subject = new Var("x"); final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt")); - predicate.setAnonymous(true); predicate.setConstant(true); final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle")); - object.setAnonymous(true); object.setConstant(true); final StatementPattern pattern = new StatementPattern(subject, predicate, object); @@ -69,10 +67,8 @@ public class FluoStringConverterTest { // Enusre it converted to the expected result. final Var subject = new Var("x"); final Var predicate = new Var("-const-http://worksAt", new URIImpl("http://worksAt")); - predicate.setAnonymous(true); predicate.setConstant(true); final Var object = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle")); - object.setAnonymous(true); object.setConstant(true); final StatementPattern expected = new StatementPattern(subject, predicate, object); @@ -89,7 +85,6 @@ public class FluoStringConverterTest { // Ensure it converted to the expected result. final Var expected = new Var("-const-http://Chipotle", new URIImpl("http://Chipotle")); - expected.setAnonymous(true); expected.setConstant(true); assertEquals(expected, var); @@ -105,7 +100,6 @@ public class FluoStringConverterTest { // Ensure it converted to the expected result. final Var expected = new Var("-const-5", new LiteralImpl("5", XMLSchema.INTEGER)); - expected.setAnonymous(true); expected.setConstant(true); assertEquals(expected, result); @@ -121,7 +115,6 @@ public class FluoStringConverterTest { // Ensure it converted to the expected result. final Var expected = new Var("-const-Chipotle", new LiteralImpl("Chipotle", XMLSchema.STRING)); - expected.setAnonymous(true); expected.setConstant(true); assertEquals(expected, result); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java new file mode 100644 index 0000000..8b9feaf --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/RyaSubGraphKafkaSerDeTest.java @@ -0,0 +1,57 @@ +package org.apache.rya.indexing.pcj.fluo.app; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static org.junit.Assert.assertEquals; + +import java.util.UUID; + +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.junit.Test; +import org.openrdf.model.vocabulary.XMLSchema; + +public class RyaSubGraphKafkaSerDeTest { + + private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe(); + + @Test + public void serializationTestWithURI() { + RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString()); + bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaURI("uri:345"))); + bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaURI("uri:789"))); + byte[] bundleBytes = serializer.toBytes(bundle); + RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes); + assertEquals(bundle, deserializedBundle); + } + + + @Test + public void serializationTestWithLiteral() { + RyaSubGraph bundle = new RyaSubGraph(UUID.randomUUID().toString()); + bundle.addStatement(new RyaStatement(new RyaURI("uri:123"), new RyaURI("uri:234"), new RyaType(XMLSchema.INTEGER, "345"))); + bundle.addStatement(new RyaStatement(new RyaURI("uri:345"), new RyaURI("uri:567"), new RyaType(XMLSchema.INTEGER, "789"))); + byte[] bundleBytes = serializer.toBytes(bundle); + RyaSubGraph deserializedBundle = serializer.fromBytes(bundleBytes); + assertEquals(bundle, deserializedBundle); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java index 74193cf..b9c10d4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Properties; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory; import org.junit.Test; /** @@ -93,7 +93,7 @@ public class KafkaExportParametersTest { @Test public void testKafkaResultExporterFactory() { - KafkaResultExporterFactory factory = new KafkaResultExporterFactory(); + KafkaBindingSetExporterFactory factory = new KafkaBindingSetExporterFactory(); assertNotNull(factory); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java index e1c386d..99ccc58 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java @@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; @@ -55,13 +56,26 @@ public class QueryReportRenderer { final FluoQuery metadata = queryReport.getFluoQuery(); - final QueryMetadata queryMetadata = metadata.getQueryMetadata(); - builder.appendItem( new ReportItem("QUERY NODE") ); - builder.appendItem( new ReportItem("Node ID", queryMetadata.getNodeId()) ); - builder.appendItem( new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()) ); - builder.appendItem( new ReportItem("SPARQL", prettyFormatSparql( queryMetadata.getSparql()) ) ); - builder.appendItem( new ReportItem("Child Node ID", queryMetadata.getChildNodeId()) ); - builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())) ); + switch (metadata.getQueryType()) { + case Projection: + final QueryMetadata queryMetadata = metadata.getQueryMetadata().get(); + builder.appendItem(new ReportItem("QUERY NODE")); + builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId())); + builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString())); + builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql()))); + builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId())); + builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId()))); + break; + case Construct: + final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get(); + builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE")); + builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId())); + builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString())); + builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql()))); + builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId())); + builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString())); + builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId()))); + } for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) { builder.appendItem( new ReportItem("") ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index 9263362..85edb11 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -1,100 +1,98 @@ <?xml version="1.0" encoding="utf-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.parent</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>rya.pcj.fluo.integration</artifactId> + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.integration</artifactId> - <name>Apache Rya PCJ Fluo Integration Tests</name> - <description>Integration tests for the Rya Fluo application.</description> + <name>Apache Rya PCJ Fluo Integration Tests</name> + <description>Integration tests for the Rya Fluo application.</description> - <dependencies> - <!-- Rya Runtime Dependencies. --> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing</artifactId> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-api</artifactId> - </dependency> + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> - <!-- Testing dependencies. --> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-mini</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-api</artifactId> - </dependency> + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-mini</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.10.1.0</version> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - <!-- Testing dependencies. --> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>0.10.1.0</version> - <classifier>test</classifier> - <exclusions> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-recipes-test</artifactId> - <scope>test</scope> - </dependency> - </dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <classifier>test</classifier> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-test</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java new file mode 100644 index 0000000..124569b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java @@ -0,0 +1,126 @@ +package org.apache.rya.indexing.pcj.fluo; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.junit.Assert; +import org.openrdf.model.Statement; + +import com.google.common.base.Objects; + +public class ConstructGraphTestUtils { + + public static void ryaStatementSetsEqualIgnoresTimestamp(Set<RyaStatement> statements1, Set<RyaStatement> statements2) { + Assert.assertEquals(new VisibilityStatementSet(statements1), new VisibilityStatementSet(statements2)); + } + + public static void subGraphsEqualIgnoresTimestamp(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) { + Set<VisibilityStatementSet> set1 = new HashSet<>(); + Set<VisibilityStatementSet> set2 = new HashSet<>(); + subgraph1.forEach(x->set1.add(new VisibilityStatementSet(x.getStatements()))); + subgraph2.forEach(x->set2.add(new VisibilityStatementSet(x.getStatements()))); + Assert.assertEquals(set1, set2); + } + + public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) { + Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>(); + subgraph1.forEach(x->subGraphMap.put(getKey(x), x)); + subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements())); + } + + private static int getKey(RyaSubGraph subgraph) { + int key = 0; + for(RyaStatement statement: subgraph.getStatements()) { + key += statement.getObject().hashCode(); + } + return key; + } + + public static void ryaStatementsEqualIgnoresBlankNode(Set<RyaStatement> statements1, Set<RyaStatement> statements2) { + Map<String, RyaURI> bNodeMap = new HashMap<>(); + statements1.forEach(x-> bNodeMap.put(x.getPredicate().getData(), x.getSubject())); + statements2.forEach(x -> x.setSubject(bNodeMap.get(x.getPredicate().getData()))); + ryaStatementSetsEqualIgnoresTimestamp(statements1, statements2); + } + + + /** + * Class used for comparing Sets of RyaStatements while ignoring timestamps. + * It is assumed that all RyaStatements in the Set used to construct this class + * have the same visibility. + */ + public static class VisibilityStatementSet { + + private Set<Statement> statements; + private String visibility; + + public VisibilityStatementSet(Set<RyaStatement> statements) { + this.statements = new HashSet<>(); + statements.forEach(x -> { + this.statements.add(RyaToRdfConversions.convertStatement(x)); + if (visibility == null) { + if (x.getColumnVisibility() != null) { + visibility = new String(x.getColumnVisibility()); + } else { + this.visibility = ""; + } + } + }); + } + + public VisibilityStatementSet(RyaSubGraph subgraph) { + this(subgraph.getStatements()); + } + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } + + if(o instanceof VisibilityStatementSet) { + VisibilityStatementSet that = (VisibilityStatementSet) o; + return Objects.equal(this.visibility, that.visibility) && Objects.equal(this.statements, that.statements); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(visibility, statements); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + return builder.append("Visiblity Statement Set \n").append(" Statements: " + statements + "\n") + .append(" Visibilities: " + visibility + " \n").toString(); + } + + } + +}