http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 3bb54c4..3a42a23 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -24,58 +24,34 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.List; -import org.apache.rya.indexing.pcj.fluo.ITBase; -import org.junit.Test; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.client.FluoAdmin; -import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; -import org.apache.fluo.api.client.FluoAdmin.TableExistsException; +import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.api.mini.MiniFluo; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.junit.Test; + +import com.google.common.base.Optional; /** * Tests the methods of {@link CountStatements}. */ -public class CountStatementsIT extends ITBase { +public class CountStatementsIT extends RyaExportITBase { /** * Overriden so that no Observers will be started. This ensures whatever * statements are inserted as part of the test will not be consumed. - * - * @return A Mini Fluo cluster. */ @Override - protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { + protected void preFluoInitHook() throws Exception { // Setup the observers that will be used by the Fluo PCJ Application. final List<ObserverSpecification> observers = new ArrayList<>(); - // Configure how the mini fluo will run. - final FluoConfiguration config = new FluoConfiguration(); - config.setMiniStartAccumulo(false); - config.setAccumuloInstance(instanceName); - config.setAccumuloUser(ACCUMULO_USER); - config.setAccumuloPassword(ACCUMULO_PASSWORD); - config.setInstanceZookeepers(zookeepers + "/fluo"); - config.setAccumuloZookeepers(zookeepers); - - config.setApplicationName(FLUO_APP_NAME); - config.setAccumuloTable("fluo" + FLUO_APP_NAME); - - config.addObservers(observers); - - FluoFactory.newAdmin(config).initialize( - new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) ); - final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); - return miniFluo; + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); } - @Test public void countStatements() { // Insert some Triples into the Fluo app. @@ -86,12 +62,14 @@ public class CountStatementsIT extends ITBase { triples.add( RyaStatement.builder().setSubject(new RyaURI("http://David")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); triples.add( RyaStatement.builder().setSubject(new RyaURI("http://Eve")).setPredicate(new RyaURI("http://talksTo")).setObject(new RyaURI("http://Bob")).build() ); - new InsertTriples().insert(fluoClient, triples, Optional.<String>absent()); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + new InsertTriples().insert(fluoClient, triples, Optional.<String>absent()); - // Load some statements into the Fluo app. - final BigInteger count = new CountStatements().countStatements(fluoClient); + // Load some statements into the Fluo app. + final BigInteger count = new CountStatements().countStatements(fluoClient); - // Ensure the count matches the expected values. - assertEquals(BigInteger.valueOf(5), count); + // Ensure the count matches the expected values. + assertEquals(BigInteger.valueOf(5), count); + } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index 82b61bd..0aceaa3 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -26,8 +26,11 @@ import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; import org.apache.rya.indexing.pcj.storage.PcjException; @@ -47,7 +50,7 @@ import com.google.common.collect.Sets; /** * Integration tests the methods of {@link GetPcjMetadata}. */ -public class GetPcjMetadataIT extends ITBase { +public class GetPcjMetadataIT extends RyaExportITBase { @Test public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException { @@ -59,56 +62,60 @@ public class GetPcjMetadataIT extends ITBase { "}"; // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. - final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); - final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId); + // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. + final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); + final PcjMetadata metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient, queryId); - // Ensure the command returns the correct metadata. - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); - final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); - assertEquals(expected, metadata); + // Ensure the command returns the correct metadata. + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expected, metadata); + } } @Test public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException { - - final CreatePcj createPcj = new CreatePcj(); - + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - // Add a couple of queries to Accumulo. - final String q1Sparql = - "SELECT ?x " + - "WHERE { " + - "?x <http://talksTo> <http://Eve>. " + - "?x <http://worksAt> <http://Chipotle>." + - "}"; - final String q1PcjId = pcjStorage.createPcj(q1Sparql); - createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - final String q2Sparql = - "SELECT ?x ?y " + - "WHERE { " + - "?x <http://talksTo> ?y. " + - "?y <http://worksAt> <http://Chipotle>." + - "}"; - final String q2PcjId = pcjStorage.createPcj(q2Sparql); - createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Ensure the command returns the correct metadata. - final Set<PcjMetadata> expected = new HashSet<>(); - final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql); - final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql); - expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders)); - expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders)); - - final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); - assertEquals(expected, Sets.newHashSet( metadata.values() )); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Add a couple of queries to Accumulo. + final String q1Sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + final String q1PcjId = pcjStorage.createPcj(q1Sparql); + final CreatePcj createPcj = new CreatePcj(); + createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + final String q2Sparql = + "SELECT ?x ?y " + + "WHERE { " + + "?x <http://talksTo> ?y. " + + "?y <http://worksAt> <http://Chipotle>." + + "}"; + final String q2PcjId = pcjStorage.createPcj(q2Sparql); + createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Ensure the command returns the correct metadata. + final Set<PcjMetadata> expected = new HashSet<>(); + final Set<VariableOrder> q1VarOrders = new ShiftVarOrderFactory().makeVarOrders(q1Sparql); + final Set<VariableOrder> q2VarOrders = new ShiftVarOrderFactory().makeVarOrders(q2Sparql); + expected.add(new PcjMetadata(q1Sparql, 0L, q1VarOrders)); + expected.add(new PcjMetadata(q2Sparql, 0L, q2VarOrders)); + + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); + assertEquals(expected, Sets.newHashSet( metadata.values() )); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 85c31a0..10f2319 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -26,8 +26,12 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; @@ -42,7 +46,7 @@ import com.google.common.collect.Sets; /** * Integration tests the methods of {@link GetQueryReportl}. */ -public class GetQueryReportIT extends ITBase { +public class GetQueryReportIT extends RyaExportITBase { @Test public void getReport() throws Exception { @@ -56,69 +60,72 @@ public class GetQueryReportIT extends ITBase { // Triples that will be streamed into Fluo after the PCJ has been created. final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://worksAt", "http://Taco Shop"), - makeRyaStatement("http://Alice", "http://worksAt", "http://Burger Join"), - makeRyaStatement("http://Alice", "http://worksAt", "http://Pastery Shop"), - makeRyaStatement("http://Alice", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://Alice", "http://livesIn", "http://Lost County"), - makeRyaStatement("http://Alice", "http://livesIn", "http://Big City"), - makeRyaStatement("http://Bob", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://Bob", "http://livesIn", "http://Big City"), - makeRyaStatement("http://Charlie", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://Charlie", "http://livesIn", "http://Big City"), - makeRyaStatement("http://David", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://David", "http://livesIn", "http://Lost County"), - makeRyaStatement("http://Eve", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://Eve", "http://livesIn", "http://Big City"), - makeRyaStatement("http://Frank", "http://worksAt", "http://Burrito Place"), - makeRyaStatement("http://Frank", "http://livesIn", "http://Lost County")); + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Taco Shop")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burger Join")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Pastery Shop")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")), + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")), + new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://David"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County")), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://livesIn"), new RyaURI("http://Big City")), + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Burrito Place")), + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County"))); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); - // Wait for the results to finish processing. - fluo.waitForObservers(); + // Wait for the results to finish processing. + super.getMiniFluo().waitForObservers(); - // Fetch the report. - final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); - final Set<String> queryIds = metadata.keySet(); - assertEquals(1, queryIds.size()); - final String queryId = queryIds.iterator().next(); + // Fetch the report. + final Map<String, PcjMetadata> metadata = new GetPcjMetadata().getMetadata(pcjStorage, fluoClient); + final Set<String> queryIds = metadata.keySet(); + assertEquals(1, queryIds.size()); + final String queryId = queryIds.iterator().next(); - final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId); + final QueryReport report = new GetQueryReport().getReport(fluoClient, queryId); - // Build the expected counts map. - final Map<String, BigInteger> expectedCounts = new HashMap<>(); + // Build the expected counts map. + final Map<String, BigInteger> expectedCounts = new HashMap<>(); - final FluoQuery fluoQuery = report.getFluoQuery(); + final FluoQuery fluoQuery = report.getFluoQuery(); - final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId(); - expectedCounts.put(queryNodeId, BigInteger.valueOf(8)); + final String queryNodeId = fluoQuery.getQueryMetadata().getNodeId(); + expectedCounts.put(queryNodeId, BigInteger.valueOf(8)); - final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId(); - expectedCounts.put(filterNodeId, BigInteger.valueOf(8)); + final String filterNodeId = fluoQuery.getFilterMetadata().iterator().next().getNodeId(); + expectedCounts.put(filterNodeId, BigInteger.valueOf(8)); - final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId(); - expectedCounts.put(joinNodeId, BigInteger.valueOf(13)); + final String joinNodeId = fluoQuery.getJoinMetadata().iterator().next().getNodeId(); + expectedCounts.put(joinNodeId, BigInteger.valueOf(13)); - final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator(); - final StatementPatternMetadata sp1 = patterns.next(); - final StatementPatternMetadata sp2 = patterns.next(); - if(sp1.getStatementPattern().contains("http://worksAt")) { - expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9)); - expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7)); - } else { - expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9)); - expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7)); - } + final Iterator<StatementPatternMetadata> patterns = fluoQuery.getStatementPatternMetadata().iterator(); + final StatementPatternMetadata sp1 = patterns.next(); + final StatementPatternMetadata sp2 = patterns.next(); + if(sp1.getStatementPattern().contains("http://worksAt")) { + expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(9)); + expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(7)); + } else { + expectedCounts.put(sp2.getNodeId(), BigInteger.valueOf(9)); + expectedCounts.put(sp1.getNodeId(), BigInteger.valueOf(7)); + } - assertEquals(expectedCounts, report.getCounts()); + assertEquals(expectedCounts, report.getCounts()); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java index 19bc272..ec301ba 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -26,17 +26,18 @@ import java.util.List; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableExistsException; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Transaction; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.junit.Test; import com.beust.jcommander.internal.Lists; -import org.apache.fluo.api.client.Transaction; - /** * Integration tests the methods of {@link ListQueryIds}. */ -public class ListQueryIdsIT extends ITBase { +public class ListQueryIdsIT extends RyaExportITBase { /** * This test ensures that when there are PCJ tables in Accumulo as well as @@ -45,18 +46,20 @@ public class ListQueryIdsIT extends ITBase { */ @Test public void getQueryIds() throws AccumuloException, AccumuloSecurityException, TableExistsException { - // Store a few SPARQL/Query ID pairs in the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - tx.set("SPARQL_3", QUERY_ID, "ID_3"); - tx.set("SPARQL_1", QUERY_ID, "ID_1"); - tx.set("SPARQL_4", QUERY_ID, "ID_4"); - tx.set("SPARQL_2", QUERY_ID, "ID_2"); - tx.commit(); - } + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Store a few SPARQL/Query ID pairs in the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + tx.set("SPARQL_3", QUERY_ID, "ID_3"); + tx.set("SPARQL_1", QUERY_ID, "ID_1"); + tx.set("SPARQL_4", QUERY_ID, "ID_4"); + tx.set("SPARQL_2", QUERY_ID, "ID_2"); + tx.commit(); + } - // Ensure the correct list of Query IDs is retured. - final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4"); - final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient); - assertEquals(expected, queryIds); + // Ensure the correct list of Query IDs is retured. + final List<String> expected = Lists.newArrayList("ID_1", "ID_2", "ID_3", "ID_4"); + final List<String> queryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(expected, queryIds); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index d5ed447..082f46d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -20,7 +20,13 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; @@ -30,13 +36,10 @@ import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; import org.openrdf.repository.RepositoryException; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.Transaction; - /** * Integration tests the methods of {@link FluoQueryMetadataDAO}. */ -public class FluoQueryMetadataDAOIT extends ITBase { +public class FluoQueryMetadataDAOIT extends RyaExportITBase { @Test public void statementPatternMetadataTest() throws RepositoryException { @@ -49,20 +52,22 @@ public class FluoQueryMetadataDAOIT extends ITBase { builder.setParentNodeId("parentNodeId"); final StatementPatternMetadata originalMetadata = builder.build(); - // Write it to the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - dao.write(tx, originalMetadata); - tx.commit(); - } + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } - // Read it from the Fluo table. - StatementPatternMetadata storedMetadata = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId"); - } + // Read it from the Fluo table. + StatementPatternMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readStatementPatternMetadata(sx, "nodeId"); + } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalMetadata, storedMetadata); + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } } @Test @@ -78,20 +83,22 @@ public class FluoQueryMetadataDAOIT extends ITBase { builder.setFilterIndexWithinSparql(2); final FilterMetadata originalMetadata = builder.build(); - // Write it to the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - dao.write(tx, originalMetadata); - tx.commit(); - } + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } - // Read it from the Fluo table. - FilterMetadata storedMetadata = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedMetadata = dao.readFilterMetadata(sx, "nodeId"); - } + // Read it from the Fluo table. + FilterMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readFilterMetadata(sx, "nodeId"); + } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalMetadata, storedMetadata); + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } } @Test @@ -107,20 +114,22 @@ public class FluoQueryMetadataDAOIT extends ITBase { builder.setRightChildNodeId("rightChildNodeId"); final JoinMetadata originalMetadata = builder.build(); - // Write it to the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - dao.write(tx, originalMetadata); - tx.commit(); - } + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } - // Read it from the Fluo table. - JoinMetadata storedMetadata = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedMetadata = dao.readJoinMetadata(sx, "nodeId"); - } + // Read it from the Fluo table. + JoinMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readJoinMetadata(sx, "nodeId"); + } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalMetadata, storedMetadata); + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } } @Test @@ -134,20 +143,85 @@ public class FluoQueryMetadataDAOIT extends ITBase { builder.setChildNodeId("childNodeId"); final QueryMetadata originalMetadata = builder.build(); - // Write it to the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - dao.write(tx, originalMetadata); - tx.commit(); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + QueryMetadata storedMetdata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetdata = dao.readQueryMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetdata); } + } + + @Test + public void aggregationMetadataTest_withGroupByVarOrders() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId") + .setVariableOrder(new VariableOrder("totalCount")) + .setParentNodeId("parentNodeId") + .setChildNodeId("childNodeId") + .setGroupByVariableOrder(new VariableOrder("a", "b", "c")) + .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount")) + .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice")) + .build(); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } - // Read it from the Fluo table. - QueryMetadata storedMetdata = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedMetdata = dao.readQueryMetadata(sx, "nodeId"); + // Read it from the Fluo table. + AggregationMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readAggregationMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); } + } + + @Test + public void aggregationMetadataTest_noGroupByVarOrders() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + final AggregationMetadata originalMetadata = AggregationMetadata.builder("nodeId") + .setVariableOrder(new VariableOrder("totalCount")) + .setParentNodeId("parentNodeId") + .setChildNodeId("childNodeId") + .addAggregation(new AggregationElement(AggregationType.COUNT, "count", "totalCount")) + .addAggregation(new AggregationElement(AggregationType.AVERAGE, "privae", "avgPrice")) + .build(); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + AggregationMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readAggregationMetadata(sx, "nodeId"); + } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalMetadata, storedMetdata); + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } } @Test @@ -168,19 +242,21 @@ public class FluoQueryMetadataDAOIT extends ITBase { final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null); final FluoQuery originalQuery = new SparqlFluoQueryBuilder().make(query, new NodeIds()); - // Write it to the Fluo table. - try(Transaction tx = fluoClient.newTransaction()) { - dao.write(tx, originalQuery); - tx.commit(); - } + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalQuery); + tx.commit(); + } - // Read it from the Fluo table. - FluoQuery storedQuery = null; - try(Snapshot sx = fluoClient.newSnapshot()) { - storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); - } + // Read it from the Fluo table. + FluoQuery storedQuery = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedQuery = dao.readFluoQuery(sx, originalQuery.getQueryMetadata().getNodeId()); + } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalQuery, storedQuery); + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalQuery, storedQuery); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index b4c8d69..21d7db0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -18,100 +18,152 @@ */ package org.apache.rya.indexing.pcj.fluo.integration; +import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collection; import java.util.List; import java.util.Set; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.scanner.ColumnScanner; import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; -import org.apache.rya.indexing.pcj.fluo.ITBase; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.Statement; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.repository.sail.SailRepositoryConnection; import com.google.common.collect.Sets; -public class CreateDeleteIT extends ITBase { +/** + * Tests that ensure the PCJ delete support works. + */ +public class CreateDeleteIT extends RyaExportITBase { - /** - * Ensure historic matches are included in the result. - */ @Test - public void historicResults() throws Exception { + public void deletePCJ() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. - final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " - + "?x <http://worksAt> <http://Chipotle>." + "}"; + final String sparql = + "SELECT ?x " + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; // Triples that are loaded into Rya before the PCJ is created. - final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Bob", "http://talksTo", "http://Eve"), - makeStatement("http://Charlie", "http://talksTo", "http://Eve"), - - makeStatement("http://Eve", "http://helps", "http://Kevin"), - - makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), - makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), - makeStatement("http://David", "http://worksAt", "http://Chipotle")); - - // The expected results of the SPARQL query once the PCJ has been - // computed. - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie")))); - - // Load the historic data into Rya. - for (final Statement triple : historicTriples) { - ryaConn.add(triple); + final ValueFactory vf = new ValueFactoryImpl(); + final Set<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")), + + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Ensure the data was loaded. + final List<Bytes> rows = getFluoTableEntries(fluoClient); + assertEquals(17, rows.size()); + + // Delete the PCJ from the Fluo application. + new DeletePcj(1).deletePcj(fluoClient, pcjId); + + // Ensure all data related to the query has been removed. + final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); + assertEquals(0, empty_rows.size()); } + } + + @Test + public void deleteAggregation() throws Exception { + // A query that finds the maximum price for an item within the inventory. + final String sparql = + "SELECT (max(?price) as ?maxPrice) { " + + "?item <urn:price> ?price . " + + "}"; + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:price"), vf.createLiteral(2.50)), + vf.createStatement(vf.createURI("urn:gum"), vf.createURI("urn:price"), vf.createLiteral(0.99)), + vf.createStatement(vf.createURI("urn:sandwich"), vf.createURI("urn:price"), vf.createLiteral(4.99))); + + // Create the PCJ in Fluo and load the statements into Rya. + final String pcjId = loadData(sparql, statements); + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Ensure the data was loaded. + final List<Bytes> rows = getFluoTableEntries(fluoClient); + assertEquals(10, rows.size()); + + // Delete the PCJ from the Fluo application. + new DeletePcj(1).deletePcj(fluoClient, pcjId); + + // Ensure all data related to the query has been removed. + final List<Bytes> empty_rows = getFluoTableEntries(fluoClient); + assertEquals(0, empty_rows.size()); + } + } - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(sparql); + private String loadData(final String sparql, final Collection<Statement> statements) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + // Register the PCJ with Rya. + final Instance accInstance = super.getAccumuloConnector().getInstance(); + final Connector accumuloConn = super.getAccumuloConnector(); - // Verify the end results of the query match the expected results. - fluo.waitForObservers(); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + accInstance.getInstanceName(), + accInstance.getZooKeepers()), accumuloConn); - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); - List<Bytes> rows = getFluoTableEntries(fluoClient); - assertEquals(17, rows.size()); + // Write the data to Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); + ryaConn.begin(); + ryaConn.add(statements); + ryaConn.commit(); + ryaConn.close(); - // Delete the PCJ from the Fluo application. - new DeletePcj(1).deletePcj(fluoClient, pcjId); + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); - // Ensure all data related to the query has been removed. - List<Bytes> empty_rows = getFluoTableEntries(fluoClient); - assertEquals(0, empty_rows.size()); + // The PCJ Id is the topic name the results will be written to. + return pcjId; } - private List<Bytes> getFluoTableEntries(FluoClient fluoClient) { + private List<Bytes> getFluoTableEntries(final FluoClient fluoClient) { try (Snapshot snapshot = fluoClient.newSnapshot()) { - List<Bytes> rows = new ArrayList<>(); - RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build(); + final List<Bytes> rows = new ArrayList<>(); + final RowScanner rscanner = snapshot.scanner().over(Span.prefix("")).byRow().build(); - for(ColumnScanner cscanner: rscanner) { + for(final ColumnScanner cscanner: rscanner) { rows.add(cscanner.getRow()); } - + return rows; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index dcab997..ab97bbd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -19,32 +19,37 @@ package org.apache.rya.indexing.pcj.fluo.integration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.util.HashSet; import java.util.Set; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.Statement; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.sail.SailRepositoryConnection; import com.google.common.base.Optional; import com.google.common.collect.Sets; /** * Performs integration tests over the Fluo application geared towards various types of input. - * <p> - * These tests are being ignore so that they will not run as unit tests while building the application. */ -public class InputIT extends ITBase { +public class InputIT extends RyaExportITBase { /** * Ensure historic matches are included in the result. @@ -53,49 +58,64 @@ public class InputIT extends ITBase { public void historicResults() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. final String sparql = - "SELECT ?x " + - "WHERE { " + + "SELECT ?x WHERE { " + "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Bob", "http://talksTo", "http://Eve"), - makeStatement("http://Charlie", "http://talksTo", "http://Eve"), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), - makeStatement("http://Eve", "http://helps", "http://Kevin"), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")), - makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), - makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), - makeStatement("http://David", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // The expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Bob")))); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Charlie")))); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Bob")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Charlie")); + expected.add(bs); // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); for(final Statement triple : historicTriples) { ryaConn.add(triple); } + ryaConn.close(); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Verify the end results of the query match the expected results. + super.getMiniFluo().waitForObservers(); - // Verify the end results of the query match the expected results. - fluo.waitForObservers(); + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + assertEquals(expected, results); + } } /** @@ -105,51 +125,67 @@ public class InputIT extends ITBase { public void streamedResults() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. final String sparql = - "SELECT ?x " + - "WHERE { " + + "SELECT ?x WHERE { " + "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; // Triples that will be streamed into Fluo after the PCJ has been created. final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Bob", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Charlie", "http://talksTo", "http://Eve"), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), - makeRyaStatement("http://Eve", "http://helps", "http://Kevin"), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://helps"), new RyaURI("http://Kevin")), - makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), - makeRyaStatement("http://David", "http://worksAt", "http://Chipotle")); + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), + new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle"))); // The expected results of the SPARQL query once the PCJ has been computed. + final ValueFactory vf = new ValueFactoryImpl(); final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Bob")))); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Charlie")))); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Bob")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Charlie")); + expected.add(bs); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Ensure the query has no results yet. - fluo.waitForObservers(); - Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertTrue( results.isEmpty() ); + // Ensure the query has no results yet. + super.getMiniFluo().waitForObservers(); - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + assertFalse( resultsIt.hasNext() ); + } - // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + + // Verify the end results of the query match the expected results. + super.getMiniFluo().waitForObservers(); + + final HashSet<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + + assertEquals(expected, results); + } } /** @@ -162,53 +198,75 @@ public class InputIT extends ITBase { public void historicThenStreamedResults() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. final String sparql = - "SELECT ?x " + - "WHERE { " + + "SELECT ?x WHERE { " + "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Alice", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Triples that will be streamed into Fluo after the PCJ has been created. final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Frank", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle"))); // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); for(final Statement triple : historicTriples) { ryaConn.add(triple); } + ryaConn.close(); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Ensure Alice is a match. - fluo.waitForObservers(); - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Alice")))); + // Ensure Alice is a match. + super.getMiniFluo().waitForObservers(); - Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + final Set<BindingSet> expected = new HashSet<>(); - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Alice")); + expected.add(bs); - // Verify the end results of the query also include Frank. - fluo.waitForObservers(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Frank")))); + Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add(resultsIt.next()); + } + } - results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + assertEquals(expected, results); + + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + + // Verify the end results of the query also include Frank. + super.getMiniFluo().waitForObservers(); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Frank")); + expected.add(bs); + + results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add(resultsIt.next()); + } + } + + assertEquals(expected, results); + } } /** @@ -222,50 +280,69 @@ public class InputIT extends ITBase { public void historicAndStreamConflict() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. final String sparql = - "SELECT ?x " + - "WHERE { " + + "SELECT ?x WHERE { " + "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Alice", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Triples that will be streamed into Fluo after the PCJ has been created. final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Alice", "http://worksAt", "http://Chipotle")); + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle"))); // The expected final result. final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Alice")))); + + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Alice")); + expected.add(bs); // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); for(final Statement triple : historicTriples) { ryaConn.add(triple); } + ryaConn.close(); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - - // Ensure Alice is a match. - fluo.waitForObservers(); - Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); - - // Stream the same Alice triple into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); - - // Verify the end results of the query is stiill only Alice. - fluo.waitForObservers(); - results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Ensure Alice is a match. + super.getMiniFluo().waitForObservers(); + + Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + assertEquals(expected, results); + + // Stream the same Alice triple into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + + // Verify the end results of the query is stiill only Alice. + super.getMiniFluo().waitForObservers(); + + results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + assertEquals(expected, results); + } } } \ No newline at end of file
