http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java index 7a73b41..f44db6c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java @@ -25,10 +25,12 @@ import edu.umd.cs.findbugs.annotations.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem; @@ -56,26 +58,41 @@ public class QueryReportRenderer { final FluoQuery metadata = queryReport.getFluoQuery(); - switch (metadata.getQueryType()) { - case Projection: - final QueryMetadata queryMetadata = metadata.getQueryMetadata().get(); - builder.appendItem(new ReportItem("QUERY NODE")); - builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId())); - builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString())); - builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(queryMetadata.getSparql()))); - builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId())); - builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId()))); - break; - case Construct: + QueryMetadata queryMetadata = metadata.getQueryMetadata(); + builder.appendItem( new ReportItem("") ); + + builder.appendItem(new ReportItem("QUERY NODE")); + builder.appendItem(new ReportItem("Node ID", queryMetadata.getNodeId())); + builder.appendItem(new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString())); + builder.appendItem( new ReportItem("SPARQL", queryMetadata.getSparql()) ); + builder.appendItem(new ReportItem("Child Node ID", queryMetadata.getChildNodeId())); + builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId()))); + + + + if (metadata.getQueryType() == QueryType.Construct) { + builder.appendItem( new ReportItem("") ); + final ConstructQueryMetadata constructMetadata = metadata.getConstructQueryMetadata().get(); builder.appendItem(new ReportItem("CONSTRUCT QUERY NODE")); builder.appendItem(new ReportItem("Node ID", constructMetadata.getNodeId())); builder.appendItem(new ReportItem("Variable Order", constructMetadata.getVariableOrder().toString())); - builder.appendItem(new ReportItem("SPARQL", prettyFormatSparql(constructMetadata.getSparql()))); + builder.appendItem( new ReportItem("Parent Node ID", constructMetadata.getParentNodeId()) ); builder.appendItem(new ReportItem("Child Node ID", constructMetadata.getChildNodeId())); builder.appendItem(new ReportItem("Construct Graph", constructMetadata.getConstructGraph().toString())); builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(constructMetadata.getNodeId()))); } + + for (ProjectionMetadata projectionMetadata : metadata.getProjectionMetadata()) { + builder.appendItem( new ReportItem("") ); + + builder.appendItem(new ReportItem("PROJECTION NODE")); + builder.appendItem(new ReportItem("Node ID", projectionMetadata.getNodeId())); + builder.appendItem(new ReportItem("Variable Order", projectionMetadata.getVariableOrder().toString())); + builder.appendItem( new ReportItem("Parent Node ID", projectionMetadata.getParentNodeId()) ); + builder.appendItem(new ReportItem("Child Node ID", projectionMetadata.getChildNodeId())); + builder.appendItem(new ReportItem("Count", "" + queryReport.getCount(projectionMetadata.getNodeId()))); + } for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) { builder.appendItem( new ReportItem("") );
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index c8dc737..f25b573 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -33,7 +33,7 @@ import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; @@ -179,7 +179,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain it. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix); } catch (MalformedQueryException | PcjException | RyaDAOException e) { throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java index 124569b..bbdcfec 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ConstructGraphTestUtils.java @@ -30,6 +30,7 @@ import org.junit.Assert; import org.openrdf.model.Statement; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; public class ConstructGraphTestUtils { @@ -47,8 +48,18 @@ public class ConstructGraphTestUtils { public static void subGraphsEqualIgnoresBlankNode(Set<RyaSubGraph> subgraph1, Set<RyaSubGraph> subgraph2) { Map<Integer, RyaSubGraph> subGraphMap = new HashMap<>(); - subgraph1.forEach(x->subGraphMap.put(getKey(x), x)); - subgraph2.forEach(x->ryaStatementsEqualIgnoresBlankNode(x.getStatements(), subGraphMap.get(getKey(x)).getStatements())); + for(RyaSubGraph subgraph: subgraph1) { + int key = getKey(subgraph); + subGraphMap.put(key, subgraph); + } + + for(RyaSubGraph subgraph: subgraph2) { + int key = getKey(subgraph); + RyaSubGraph sub = subGraphMap.get(key); + Preconditions.checkNotNull(sub); + Set<RyaStatement> statements = sub.getStatements(); + ryaStatementsEqualIgnoresBlankNode(subgraph.getStatements(), statements); + } } private static int getKey(RyaSubGraph subgraph) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index d5c0e5f..263a19e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -68,7 +68,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); @@ -95,7 +95,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { "?x <http://worksAt> <http://Chipotle>." + "}"; final String q1PcjId = pcjStorage.createPcj(q1Sparql); - final CreatePcj createPcj = new CreatePcj(); + final CreateFluoPcj createPcj = new CreateFluoPcj(); createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); final String q2Sparql = http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 965a7b9..c0f0f16 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -85,7 +85,7 @@ public class GetQueryReportIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase { final FluoQuery fluoQuery = report.getFluoQuery(); - final String queryNodeId = fluoQuery.getQueryMetadata().get().getNodeId(); + final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId(); expectedCounts.put(queryNodeId, BigInteger.valueOf(8)); final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index d403404..315dddb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -20,6 +20,8 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; @@ -28,11 +30,12 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QueryType; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; -import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; @@ -113,7 +116,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Create the object that will be serialized. final JoinMetadata.Builder builder = JoinMetadata.builder("nodeId"); - builder.setVariableOrder(new VariableOrder("g;y;s")); + builder.setVarOrder(new VariableOrder("g;y;s")); builder.setJoinType(JoinType.NATURAL_JOIN); builder.setParentNodeId("parentNodeId"); builder.setLeftChildNodeId("leftChildNodeId"); @@ -143,10 +146,13 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); // Create the object that will be serialized. - final QueryMetadata.Builder builder = QueryMetadata.builder("nodeId"); - builder.setVariableOrder(new VariableOrder("y;s;d")); + String queryId = NodeType.generateNewFluoIdForType(NodeType.QUERY); + final QueryMetadata.Builder builder = QueryMetadata.builder(queryId); + builder.setQueryType(QueryType.Projection); + builder.setVarOrder(new VariableOrder("y;s;d")); builder.setSparql("sparql string"); builder.setChildNodeId("childNodeId"); + builder.setExportStrategies(new HashSet<>(Arrays.asList(ExportStrategy.Kafka))); final QueryMetadata originalMetadata = builder.build(); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -159,7 +165,37 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Read it from the Fluo table. QueryMetadata storedMetdata = null; try(Snapshot sx = fluoClient.newSnapshot()) { - storedMetdata = dao.readQueryMetadata(sx, "nodeId"); + storedMetdata = dao.readQueryMetadata(sx, queryId); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetdata); + } + } + + @Test + public void projectionMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final ProjectionMetadata.Builder builder = ProjectionMetadata.builder("nodeId"); + builder.setVarOrder(new VariableOrder("y;s;d")); + builder.setProjectedVars(new VariableOrder("x;y;z")); + builder.setChildNodeId("childNodeId"); + builder.setParentNodeId("parentNodeId"); + final ProjectionMetadata originalMetadata = builder.build(); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + ProjectionMetadata storedMetdata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetdata = dao.readProjectionMetadata(sx, "nodeId"); } // Ensure the deserialized object is the same as the serialized one. @@ -180,8 +216,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Create the object that will be serialized. final ConstructQueryMetadata.Builder builder = ConstructQueryMetadata.builder(); builder.setNodeId("nodeId"); - builder.setSparql(query); builder.setChildNodeId("childNodeId"); + builder.setParentNodeId("parentNodeId"); + builder.setVarOrder(new VariableOrder("a;b;c")); builder.setConstructGraph(new ConstructGraph(patterns)); final ConstructQueryMetadata originalMetadata = builder.build(); @@ -209,7 +246,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Create the object that will be serialized. final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId") - .setVariableOrder(new VariableOrder("totalCount")) + .setVarOrder(new VariableOrder("totalCount")) .setParentNodeId("parentNodeId") .setChildNodeId("childNodeId") .setGroupByVariableOrder(new VariableOrder("a", "b", "c")) @@ -240,7 +277,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Create the object that will be serialized. final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId") - .setVariableOrder(new VariableOrder("totalCount")) + .setVarOrder(new VariableOrder("totalCount")) .setParentNodeId("parentNodeId") .setChildNodeId("childNodeId") .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount")) @@ -315,8 +352,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { "?worker <http://worksAt> <http://Chipotle>. " + "}"; - final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setSparql(sparql); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + final FluoQuery originalQuery = builder.build(); assertEquals(QueryType.Projection, originalQuery.getQueryType()); assertEquals(false, originalQuery.getConstructQueryMetadata().isPresent()); @@ -331,7 +370,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Read it from the Fluo table. FluoQuery storedQuery = null; try(Snapshot sx = fluoClient.newSnapshot()) { - storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().get().getNodeId()); + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); } // Ensure the deserialized object is the same as the serialized one. @@ -354,11 +393,102 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { "?worker <http://worksAt> <http://Chipotle>. " + "}"; - final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setSparql(sparql); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + final FluoQuery originalQuery = builder.build(); + + assertEquals(QueryType.Construct, originalQuery.getQueryType()); + assertEquals(true, originalQuery.getConstructQueryMetadata().isPresent()); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalQuery); + tx.commit(); + } + + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalQuery, storedQuery); + } + } + + + @Test + public void fluoNestedQueryTest() throws MalformedQueryException { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final String sparql = + "SELECT ?id ?type ?location ?averagePrice ?vendor {" + + "FILTER(?averagePrice > 4) " + + "?type <urn:purchasedFrom> ?vendor ." + + "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" + + "?id <urn:type> ?type . " + + "?id <urn:location> ?location ." + + "?id <urn:price> ?price ." + + "} " + + "GROUP BY ?type ?location }}"; + + + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setSparql(sparql); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + final FluoQuery originalQuery = builder.build(); + + assertEquals(QueryType.Projection, originalQuery.getQueryType()); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalQuery); + tx.commit(); + } + + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalQuery, storedQuery); + } + } + + @Test + public void fluoNestedConstructQueryTest() throws MalformedQueryException { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final String sparql = "CONSTRUCT { " + + "_:b a <urn:highSpeedTrafficArea> . " + + "_:b <urn:hasCount> ?obsCount . " + + "_:b <urn:hasLocation> ?location ." + + "_:b <urn:hasAverageVelocity> ?avgVelocity ." + + "} WHERE { " + + "FILTER(?obsCount > 1) " + + "{ " + + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) " + + "WHERE { " + + "FILTER(?velocity > 75) " + + "?obs <urn:hasVelocity> ?velocity. " + + "?obs <urn:hasLocation> ?location. " + + "}GROUP BY ?location }}"; + + + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setSparql(sparql); + builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY)); + final FluoQuery originalQuery = builder.build(); assertEquals(QueryType.Construct, originalQuery.getQueryType()); - assertEquals(false, originalQuery.getQueryMetadata().isPresent()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Write it to the Fluo table. @@ -370,11 +500,12 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { // Read it from the Fluo table. FluoQuery storedQuery = null; try(Snapshot sx = fluoClient.newSnapshot()) { - storedQuery = dao.readFluoQuery(sx, originalQuery.getConstructQueryMetadata().get().getNodeId()); + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); } // Ensure the deserialized object is the same as the serialized one. assertEquals(originalQuery, storedQuery); } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java index 0cd7cfb..1707308 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java @@ -41,7 +41,7 @@ import org.apache.fluo.core.client.FluoClientImpl; import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; @@ -91,7 +91,7 @@ public class BatchDeleteIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); List<String> ids = getNodeIdStrings(fluoClient, queryId); List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); @@ -130,7 +130,7 @@ public class BatchDeleteIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(1); @@ -176,7 +176,7 @@ public class BatchDeleteIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + String queryId = new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); List<String> ids = getNodeIdStrings(fluoClient, queryId); String joinId = ids.get(1); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 0f2d892..7c4caa4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -35,7 +35,7 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; @@ -79,10 +79,10 @@ public class CreateDeleteIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(17, rows.size()); + assertEquals(20, rows.size()); // Delete the PCJ from the Fluo application. - new DeletePcj(1).deletePcj(fluoClient, pcjId); + new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); // Ensure all data related to the query has been removed. final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); @@ -111,10 +111,10 @@ public class CreateDeleteIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Ensure the data was loaded. final List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(10, rows.size()); + assertEquals(12, rows.size()); // Delete the PCJ from the Fluo application. - new DeletePcj(1).deletePcj(fluoClient, pcjId); + new DeleteFluoPcj(1).deletePcj(fluoClient, pcjId); // Ensure all data related to the query has been removed. final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index f330825..d623043 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; @@ -102,7 +102,7 @@ public class InputIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Verify the end results of the query match the expected results. super.getMiniFluo().waitForObservers(); @@ -162,7 +162,7 @@ public class InputIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure the query has no results yet. super.getMiniFluo().waitForObservers(); @@ -228,7 +228,7 @@ public class InputIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure Alice is a match. super.getMiniFluo().waitForObservers(); @@ -317,7 +317,7 @@ public class InputIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure Alice is a match. super.getMiniFluo().waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index ab7610d..3ee07a7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -244,6 +247,10 @@ public class KafkaExportIT extends KafkaExportITBase { // Create the PCJ in Fluo and load the statements into Rya. final String pcjId = loadData(sparql, statements); + + try(FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { + FluoITHelper.printFluoTable(fluo); + } // Create the expected results of the SPARQL query once the PCJ has been computed. final MapBindingSet expectedResult = new MapBindingSet(); @@ -350,7 +357,7 @@ public class KafkaExportIT extends KafkaExportITBase { } @Test - public void groupByManyBindings_avaerages() throws Exception { + public void groupByManyBindings_averages() throws Exception { // A query that groups what is aggregated by two of the keys. final String sparql = "SELECT ?type ?location (avg(?price) as ?averagePrice) {" + @@ -425,6 +432,160 @@ public class KafkaExportIT extends KafkaExportITBase { assertEquals(expectedResults, results); } + + @Test + public void nestedGroupByManyBindings_averages() throws Exception { + // A query that groups what is aggregated by two of the keys. + final String sparql = + "SELECT ?type ?location ?averagePrice {" + + "FILTER(?averagePrice > 4) " + + "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" + + "?id <urn:type> ?type . " + + "?id <urn:location> ?location ." + + "?id <urn:price> ?price ." + + "} " + + "GROUP BY ?type ?location }}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + // American items that will be averaged. + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createLiteral("apple")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(4.25)), + + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)), + + // French items that will be averaged. + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createLiteral("cheese")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(8.5)), + + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)), + + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createLiteral("cigarettes")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cigarettes", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createLiteral("cheese", XMLSchema.STRING)); + bs.addBinding("location", vf.createLiteral("USA", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("4.75", XMLSchema.DECIMAL)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + // Verify the end results of the query match the expected results. + final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location")); + System.out.println(results); + assertEquals(expectedResults, results); + } + + + @Test + public void nestedWithJoinGroupByManyBindings_averages() throws Exception { + + // A query that groups what is aggregated by two of the keys. + final String sparql = + "SELECT ?type ?location ?averagePrice ?milkType {" + + "FILTER(?averagePrice > 4) " + + "?type <urn:hasMilkType> ?milkType ." + + "{SELECT ?type ?location (avg(?price) as ?averagePrice) {" + + "?id <urn:type> ?type . " + + "?id <urn:location> ?location ." + + "?id <urn:price> ?price ." + + "} " + + "GROUP BY ?type ?location }}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:type"), vf.createURI("urn:blue")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:1"), vf.createURI("urn:price"), vf.createLiteral(8.5)), + vf.createStatement(vf.createURI("urn:blue"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)), + + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:type"), vf.createURI("urn:american")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:2"), vf.createURI("urn:price"), vf.createLiteral(.99)), + + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:type"), vf.createURI("urn:cheddar")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:location"), vf.createLiteral("USA")), + vf.createStatement(vf.createURI("urn:3"), vf.createURI("urn:price"), vf.createLiteral(5.25)), + + // French items that will be averaged. + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:type"), vf.createURI("urn:goat")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:location"), vf.createLiteral("France")), + vf.createStatement(vf.createURI("urn:4"), vf.createURI("urn:price"), vf.createLiteral(6.5)), + vf.createStatement(vf.createURI("urn:goat"), vf.createURI("urn:hasMilkType"), vf.createLiteral("goat", XMLSchema.STRING)), + + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:type"), vf.createURI("urn:fontina")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:location"), vf.createLiteral("Italy")), + vf.createStatement(vf.createURI("urn:5"), vf.createURI("urn:price"), vf.createLiteral(3.99)), + vf.createStatement(vf.createURI("urn:fontina"), vf.createURI("urn:hasMilkType"), vf.createLiteral("cow", XMLSchema.STRING)), + + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:type"), vf.createURI("urn:fontina")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:location"), vf.createLiteral("Italy")), + vf.createStatement(vf.createURI("urn:6"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("type", vf.createURI("urn:blue")); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("8.5", XMLSchema.DECIMAL)); + bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING)); + expectedResults.add( new VisibilityBindingSet(bs)); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createURI("urn:goat")); + bs.addBinding("location", vf.createLiteral("France", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("6.5", XMLSchema.DECIMAL)); + bs.addBinding("milkType", vf.createLiteral("goat", XMLSchema.STRING)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + bs = new MapBindingSet(); + bs.addBinding("type", vf.createURI("urn:fontina")); + bs.addBinding("location", vf.createLiteral("Italy", XMLSchema.STRING)); + bs.addBinding("averagePrice", vf.createLiteral("4.49", XMLSchema.DECIMAL)); + bs.addBinding("milkType", vf.createLiteral("cow", XMLSchema.STRING)); + expectedResults.add( new VisibilityBindingSet(bs) ); + + // Verify the end results of the query match the expected results. + final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, new VariableOrder("type", "location")); + System.out.println(results); + assertEquals(expectedResults, results); + } + private Set<VisibilityBindingSet> readAllResults(final String pcjId) throws Exception { requireNonNull(pcjId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java index 7a4ed8d..ca8de0d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java @@ -43,24 +43,28 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; +import org.junit.Assert; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.XMLSchema; import com.google.common.collect.Sets; @@ -83,6 +87,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); // Configure the export observer to export new PCJ results to the mini // accumulo cluster. @@ -285,6 +290,77 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results); } + + @Test + public void nestedConstructQuery() throws Exception { + // A query that groups what is aggregated by one of the keys. + final String sparql = "CONSTRUCT { " + + "_:b a <urn:highSpeedTrafficArea> . " + + "_:b <urn:hasCount> ?obsCount . " + + "_:b <urn:hasLocation> ?location ." + + "_:b <urn:hasAverageVelocity> ?avgVelocity ." + + "} WHERE { " + + "FILTER(?obsCount > 1) " + + "{ " + + "SELECT ?location (count(?obs) AS ?obsCount) (avg(?velocity) AS ?avgVelocity) " + + "WHERE { " + + "FILTER(?velocity > 75) " + + "?obs <urn:hasVelocity> ?velocity. " + + "?obs <urn:hasLocation> ?location. " + + "}GROUP BY ?location }}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)), + vf.createStatement(vf.createURI("urn:obs1"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")), + vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)), + vf.createStatement(vf.createURI("urn:obs2"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")), + vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasVelocity"), vf.createLiteral(70)), + vf.createStatement(vf.createURI("urn:obs3"), vf.createURI("urn:hasLocation"), vf.createLiteral("OldTown")), + vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)), + vf.createStatement(vf.createURI("urn:obs5"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")), + vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasVelocity"), vf.createLiteral(81)), + vf.createStatement(vf.createURI("urn:obs6"), vf.createURI("urn:hasLocation"), vf.createLiteral("Rosslyn")), + vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasVelocity"), vf.createLiteral(67)), + vf.createStatement(vf.createURI("urn:obs7"), vf.createURI("urn:hasLocation"), vf.createLiteral("Clarendon")), + vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasVelocity"), vf.createLiteral(77)), + vf.createStatement(vf.createURI("urn:obs8"), vf.createURI("urn:hasLocation"), vf.createLiteral("Ballston")), + vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasVelocity"), vf.createLiteral(87)), + vf.createStatement(vf.createURI("urn:obs9"), vf.createURI("urn:hasLocation"), vf.createLiteral("FallsChurch"))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadStatements(sparql, statements); + + // Verify the end results of the query match the expected results. + final Set<RyaSubGraph> results = readAllResults(pcjId); + + RyaStatement statement1 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2")); + RyaStatement statement2 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "84")); + RyaStatement statement3 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI("urn:hasLocation"), new RyaType("Rosslyn")); + RyaStatement statement4 = new RyaStatement(new RyaURI("urn:obs1"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea")); + RyaStatement statement5 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasCount"), new RyaType(XMLSchema.INTEGER, "2")); + RyaStatement statement6 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasAverageVelocity"), new RyaType(XMLSchema.DECIMAL, "79")); + RyaStatement statement7 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI("urn:hasLocation"), new RyaType("OldTown")); + RyaStatement statement8 = new RyaStatement(new RyaURI("urn:obs2"), new RyaURI(RDF.TYPE.toString()), new RyaURI("urn:highSpeedTrafficArea")); + + final Set<RyaSubGraph> expectedResults = new HashSet<>(); + + RyaSubGraph subGraph1 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement1, statement2, statement3, statement4)); + subGraph1.setStatements(stmnts1); + expectedResults.add(subGraph1); + + RyaSubGraph subGraph2 = new RyaSubGraph(pcjId); + Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement5, statement6, statement7, statement8)); + subGraph2.setStatements(stmnts2); + expectedResults.add(subGraph2); + + Assert.assertEquals(expectedResults.size(), results.size()); + ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, results);; + } + + protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final String TopicName) { // setup consumer final Properties consumerProps = new Properties(); @@ -330,9 +406,9 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { FluoClient client = null; try { - CreatePcj createPcj = new CreatePcj(); + CreateFluoPcj createPcj = new CreateFluoPcj(); client = new FluoClientImpl(super.getFluoConfiguration()); - FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql); + String id = createPcj.createPcj(sparql, client).getQueryId(); AccumuloRyaDAO dao = getRyaDAO(); dao.add(statements.iterator()); @@ -341,7 +417,7 @@ public class KafkaRyaSubGraphExportIT extends KafkaExportITBase { super.getMiniFluo().waitForObservers(); // FluoITHelper.printFluoTable(client); - return fluoQuery.getConstructQueryMetadata().get().getNodeId(); + return id; } finally { if (client != null) { client.close();
