http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index f3f486c..747f6e5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -23,26 +23,29 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Optional; import com.google.common.collect.Sets; /** * Performs integration tests over the Fluo application geared towards Rya PCJ exporting. - * <p> - * These tests are being ignore so that they will not run as unit tests while building the application. */ -public class RyaExportIT extends ITBase { +public class RyaExportIT extends RyaExportITBase { @Test public void resultsExported() throws Exception { @@ -57,59 +60,69 @@ public class RyaExportIT extends ITBase { "}"; // Triples that will be streamed into Fluo after the PCJ has been created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<RyaStatement> streamedTriples = Sets.newHashSet( - makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), - makeRyaStatement("http://Bob", "http://livesIn", "http://London"), - makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Bob")), + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://livesIn"), new RyaURI("http://London")), + new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), - makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), - makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), - makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Charlie")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://livesIn"), new RyaURI("http://London")), + new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), - makeRyaStatement("http://Alice", "http://talksTo", "http://David"), - makeRyaStatement("http://David", "http://livesIn", "http://London"), - makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://David")), + new RyaStatement(new RyaURI("http://David"), new RyaURI("http://livesIn"), new RyaURI("http://London")), + new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), - makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), - makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), - makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), + new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"), new RyaURI("http://Eve")), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://livesIn"), new RyaURI("http://Leeds")), + new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle")), - makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), - makeRyaStatement("http://Frank", "http://livesIn", "http://London"), - makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://talksTo"), new RyaURI("http://Alice")), + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"), new RyaURI("http://London")), + new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"), new RyaURI("http://Chipotle"))); // The expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London")))); - expected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London")))); - expected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://David")), - new BindingImpl("city", new URIImpl("http://London")))); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Bob")); + bs.addBinding("city", vf.createURI("http://London")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://Charlie")); + bs.addBinding("city", vf.createURI("http://London")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", vf.createURI("http://Alice")); + bs.addBinding("worker", vf.createURI("http://David")); + bs.addBinding("city", vf.createURI("http://London")); + expected.add(bs); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Stream the data into Fluo. - new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); + // Stream the data into Fluo. + new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); - // Fetch the exported results from Accumulo once the observers finish working. - fluo.waitForObservers(); + // Fetch the exported results from Accumulo once the observers finish working. + super.getMiniFluo().waitForObservers(); - // Fetch expected results from the PCJ table that is in Accumulo. - final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + // Fetch expected results from the PCJ table that is in Accumulo. + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); - // Verify the end results of the query match the expected results. - assertEquals(expected, results); + // Verify the end results of the query match the expected results. + assertEquals(expected, results); + } } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index fd70a19..3c74b13 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -23,86 +23,104 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; +import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.indexing.external.PrecomputedJoinIndexer; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import org.junit.Test; import org.openrdf.model.Statement; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepositoryConnection; import com.google.common.collect.Sets; - /** - * This test ensures that the correct updates are pushed by Fluo - * to the external PCJ table as triples are added to Rya through - * the {@link RepositoryConnection}. The key difference between these - * tests and those in {@link InputIT} is that streaming triples are added through - * the RepositoryConnection and not through the {@link FluoClient}. These tests are - * designed to verify that the {@link AccumuloRyaDAO} has been integrated - * with the {@link PrecomputedJoinIndexer} and that the associated {@link PrecomputedJoinUpdater} updates - * Fluo accordingly. - * + * This test ensures that the correct updates are pushed by Fluo to the external PCJ table as triples are added to Rya + * through the {@link RepositoryConnection}. The key difference between these tests and those in {@link InputIT} is + * that streaming triples are added through the RepositoryConnection and not through the {@link FluoClient}. These + * tests are designed to verify that the {@link AccumuloRyaDAO} has been integrated with the {@link PrecomputedJoinIndexer} + * and that the associated {@link PrecomputedJoinUpdater} updates Fluo accordingly. */ - -public class RyaInputIncrementalUpdateIT extends ITBase { +public class RyaInputIncrementalUpdateIT extends RyaExportITBase { /** * Ensure historic matches are included in the result. */ @Test public void streamResultsThroughRya() throws Exception { - // A query that finds people who talk to Eve and work at Chipotle. - final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " - + "?x <http://worksAt> <http://Chipotle>." + "}"; + final String sparql = + "SELECT ?x " + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Bob", "http://talksTo", "http://Eve"), - makeStatement("http://Charlie", "http://talksTo", "http://Eve"), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), - makeStatement("http://Eve", "http://helps", "http://Kevin"), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")), - makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), - makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), - makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), - makeStatement("http://David", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // The expected results of the SPARQL query once the PCJ has been // computed. final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie")))); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Bob")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Charlie")); + expected.add(bs); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Verify the end results of the query match the expected results. - fluo.waitForObservers(); + // Verify the end results of the query match the expected results. + super.getMiniFluo().waitForObservers(); - // Load the historic data into Rya. - for (final Statement triple : historicTriples) { - ryaConn.add(triple); - } + // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); + for (final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + super.getMiniFluo().waitForObservers(); + + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultIt = pcjStorage.listResults(pcjId)) { + while(resultIt.hasNext()) { + results.add( resultIt.next() ); + } + } - fluo.waitForObservers(); - - final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + assertEquals(expected, results); + } } /** @@ -115,97 +133,146 @@ public class RyaInputIncrementalUpdateIT extends ITBase { @Test public void historicThenStreamedResults() throws Exception { // A query that finds people who talk to Eve and work at Chipotle. - final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " - + "?x <http://worksAt> <http://Chipotle>." + "}"; + final String sparql = + "SELECT ?x " + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Alice", "http://worksAt", "http://Chipotle"), - makeStatement("http://Joe", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Joe"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Triples that will be streamed into Fluo after the PCJ has been final Set<Statement> streamedTriples = Sets.newHashSet( - makeStatement("http://Frank", "http://talksTo", "http://Eve"), - makeStatement("http://Joe", "http://talksTo", "http://Eve"), - makeStatement("http://Frank", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Joe"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); for (final Statement triple : historicTriples) { ryaConn.add(triple); } // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - fluo.waitForObservers(); + super.getMiniFluo().waitForObservers(); - // Load the streaming data into Rya. - for (final Statement triple : streamedTriples) { - ryaConn.add(triple); - } + // Load the streaming data into Rya. + for (final Statement triple : streamedTriples) { + ryaConn.add(triple); + } - // Ensure Alice is a match. - fluo.waitForObservers(); - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")))); + // Ensure Alice is a match. + super.getMiniFluo().waitForObservers(); - Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + final Set<BindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Alice")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Frank")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Joe")); + expected.add(bs); + + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultIt = pcjStorage.listResults(pcjId)) { + while(resultIt.hasNext()) { + results.add( resultIt.next() ); + } + } + + assertEquals(expected, results); + } } @Test public void historicAndStreamMultiVariables() throws Exception { - // A query that finds people who talk to Eve and work at Chipotle. - // A query that finds people who talk to Eve and work at Chipotle. - final String sparql = "SELECT ?x ?y " + "WHERE { " + "?x <http://talksTo> ?y. " - + "?x <http://worksAt> <http://Chipotle>." + "}"; + // A query that finds people who talk to other people and work at Chipotle. + final String sparql = + "SELECT ?x ?y " + "WHERE { " + + "?x <http://talksTo> ?y. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); final Set<Statement> historicTriples = Sets.newHashSet( - makeStatement("http://Alice", "http://talksTo", "http://Eve"), - makeStatement("http://Alice", "http://worksAt", "http://Chipotle"), - makeStatement("http://Joe", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), + vf.createStatement(vf.createURI("http://Joe"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Triples that will be streamed into Fluo after the PCJ has been final Set<Statement> streamedTriples = Sets.newHashSet( - makeStatement("http://Frank", "http://talksTo", "http://Betty"), - makeStatement("http://Joe", "http://talksTo", "http://Alice"), - makeStatement("http://Frank", "http://worksAt", "http://Chipotle")); + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Betty")), + vf.createStatement(vf.createURI("http://Joe"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); // Load the historic data into Rya. + final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); for (final Statement triple : historicTriples) { ryaConn.add(triple); } // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - fluo.waitForObservers(); + super.getMiniFluo().waitForObservers(); - // Load the streaming data into Rya. - for (final Statement triple : streamedTriples) { - ryaConn.add(triple); - } + // Load the streaming data into Rya. + for (final Statement triple : streamedTriples) { + ryaConn.add(triple); + } - // Ensure Alice is a match. - fluo.waitForObservers(); - final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Alice")), new BindingImpl("y", new URIImpl("http://Eve")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Frank")), new BindingImpl("y", new URIImpl("http://Betty")))); - expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Joe")), new BindingImpl("y", new URIImpl("http://Alice")))); + // Ensure Alice is a match. + super.getMiniFluo().waitForObservers(); + + final Set<BindingSet> expected = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Alice")); + bs.addBinding("y", vf.createURI("http://Eve")); + expected.add(bs); - Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); - assertEquals(expected, results); + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Frank")); + bs.addBinding("y", vf.createURI("http://Betty")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Joe")); + bs.addBinding("y", vf.createURI("http://Alice")); + expected.add(bs); + + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultIt = pcjStorage.listResults(pcjId)) { + while(resultIt.hasNext()) { + results.add( resultIt.next() ); + } + } + + assertEquals(expected, results); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index 092e8a9..52d6caa 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -18,24 +18,21 @@ */ package org.apache.rya.indexing.pcj.fluo.integration; +import static org.junit.Assert.assertEquals; + import java.util.HashSet; import java.util.Set; import java.util.UUID; -import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.sail.config.RyaSailFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.openrdf.model.Resource; import org.openrdf.model.Statement; @@ -43,95 +40,58 @@ import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.impl.StatementImpl; import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.repository.sail.SailRepositoryConnection; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailException; +public class StreamingTestIT extends RyaExportITBase { -public class StreamingTestIT extends ITBase { + private static final Logger log = Logger.getLogger(StreamingTestIT.class); - private static final Logger log = Logger.getLogger(ITBase.class); - private static String query = "select ?name ?uuid where { ?uuid <http://pred1> ?name ; <http://pred2> \"literal\".}"; - private static String uuidPrefix = "http://uuid_"; - private static String name = "number_"; - private static String pred1 = "http://pred1"; - private static String pred2 = "http://pred2"; - - private PcjTables pcjTables = new PcjTables(); - private String pcjTableName; - - private Sail sail; - private SailRepository repo; - private SailRepositoryConnection conn; - - - @Before - public void init() throws Exception { - AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); - conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U"); - conf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "U"); - accumuloConn.securityOperations().changeUserAuthorizations("root", new Authorizations("U")); - sail = RyaSailFactory.getInstance(conf); - repo = new SailRepository(sail); - conn = repo.getConnection(); - } - - @After - public void close() throws RepositoryException, SailException { - conn.close(); - repo.shutDown(); - sail.shutDown(); - } - - @Test public void testRandomStreamingIngest() throws Exception { - - pcjTableName = createPcj(query); - log.info("Adding Join Pairs..."); - addRandomQueryStatementPairs(100); - Assert.assertEquals(100, countPcjs()); - - } - - private String createPcj(String pcj) throws Exception { - accumuloConn.securityOperations().changeUserAuthorizations("root", new Authorizations("U")); - // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final String pcjId = pcjStorage.createPcj(pcj); - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - String tableName = RYA_INSTANCE_NAME + "INDEX_" + pcjId; - - return tableName; + final String sparql = + "select ?name ?uuid where { " + + "?uuid <http://pred1> ?name ; " + + "<http://pred2> \"literal\"." + + "}"; + + try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Task the Fluo app with the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Add Statements to the Fluo app. + log.info("Adding Join Pairs..."); + addRandomQueryStatementPairs(100); + + super.getMiniFluo().waitForObservers(); + + int resultCount = 0; + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + resultCount++; + resultsIt.next(); + } + } + + // Show the correct number of Binding Sets were created for the PCJ. + assertEquals(100, resultCount); + } } - - private void addRandomQueryStatementPairs(int numPairs) throws Exception { - Set<Statement> statementPairs = new HashSet<>(); + private void addRandomQueryStatementPairs(final int numPairs) throws Exception { + final Set<Statement> statementPairs = new HashSet<>(); for (int i = 0; i < numPairs; i++) { - String uri = uuidPrefix + UUID.randomUUID().toString(); - Statement statement1 = new StatementImpl(new URIImpl(uri), new URIImpl(pred1), - new LiteralImpl(name + (i + 1))); - Statement statement2 = new StatementImpl(new URIImpl(uri), new URIImpl(pred2), new LiteralImpl("literal")); + final String uri = "http://uuid_" + UUID.randomUUID().toString(); + final Statement statement1 = new StatementImpl(new URIImpl(uri), new URIImpl("http://pred1"), + new LiteralImpl("number_" + (i + 1))); + final Statement statement2 = new StatementImpl(new URIImpl(uri), new URIImpl("http://pred2"), new LiteralImpl("literal")); statementPairs.add(statement1); statementPairs.add(statement2); } - conn.add(statementPairs, new Resource[0]); - fluo.waitForObservers(); - } - - private int countPcjs() throws Exception { - Iterable<BindingSet> bindingsets = pcjTables.listResults(accumuloConn, pcjTableName, new Authorizations("U")); - int count = 0; - for (BindingSet bs : bindingsets) { -// System.out.println(bs); - count++; - } -// IncUpdateDAO.printAll(fluoClient); - return count; + super.getRyaSailRepository().getConnection().add(statementPairs, new Resource[0]); + super.getMiniFluo().waitForObservers(); } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java index 6f4596f..30b6842 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java @@ -22,23 +22,27 @@ import java.io.UnsupportedEncodingException; import java.util.HashSet; import java.util.Set; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.security.Authorizations; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Assert; import org.junit.Test; import org.openrdf.model.Statement; -import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.Sets; @@ -47,7 +51,7 @@ import com.google.common.collect.Sets; * <p> * These tests are being ignore so that they will not run as unit tests while building the application. */ -public class HistoricStreamingVisibilityIT extends ITBase { +public class HistoricStreamingVisibilityIT extends RyaExportITBase { /** * Ensure historic matches are included in the result. @@ -61,70 +65,74 @@ public class HistoricStreamingVisibilityIT extends ITBase { "?x <http://talksTo> <http://Eve>. " + "?x <http://worksAt> <http://Chipotle>." + "}"; - + + final Connector accumuloConn = super.getAccumuloConnector(); accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("U","V","W")); - AccumuloRyaDAO dao = new AccumuloRyaDAO(); + final AccumuloRyaDAO dao = new AccumuloRyaDAO(); dao.setConnector(accumuloConn); dao.setConf(makeConfig()); dao.init(); // Triples that are loaded into Rya before the PCJ is created. + final ValueFactory vf = new ValueFactoryImpl(); + final Set<RyaStatement> historicTriples = Sets.newHashSet( - makeRyaStatement(makeStatement("http://Alice", "http://talksTo", "http://Eve"),"U"), - makeRyaStatement(makeStatement("http://Bob", "http://talksTo", "http://Eve"),"V"), - makeRyaStatement(makeStatement("http://Charlie", "http://talksTo", "http://Eve"),"W"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),"U"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),"V"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")),"W"), - makeRyaStatement(makeStatement("http://Eve", "http://helps", "http://Kevin"), "U"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")), "U"), - makeRyaStatement(makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), "W"), - makeRyaStatement(makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), "V"), - makeRyaStatement(makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), "U"), - makeRyaStatement(makeStatement("http://David", "http://worksAt", "http://Chipotle"), "V")); + makeRyaStatement(vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), "W"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), "V"), + makeRyaStatement(vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), "U"), + makeRyaStatement(vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), "V")); dao.add(historicTriples.iterator()); dao.flush(); - + // The expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expected = new HashSet<>(); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Bob")))); - expected.add(makeBindingSet( - new BindingImpl("x", new URIImpl("http://Charlie")))); - + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Bob")); + expected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("x", vf.createURI("http://Charlie")); + expected.add(bs); + // Create the PCJ table. final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(sparql); - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + } // Verify the end results of the query match the expected results. - fluo.waitForObservers(); - Set<BindingSet> results = Sets.newHashSet(pcjStorage.listResults(pcjId)); + super.getMiniFluo().waitForObservers(); + + final Set<BindingSet> results = Sets.newHashSet(pcjStorage.listResults(pcjId)); Assert.assertEquals(expected, results); } - - + private AccumuloRdfConfiguration makeConfig() { final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setTablePrefix(RYA_INSTANCE_NAME); // Accumulo connection information. conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER); conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getMiniAccumuloCluster().getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getMiniAccumuloCluster().getZooKeepers()); conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W"); return conf; } - - - private static RyaStatement makeRyaStatement(Statement statement, String visibility) throws UnsupportedEncodingException { - - RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); + + private static RyaStatement makeRyaStatement(final Statement statement, final String visibility) throws UnsupportedEncodingException { + final RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); ryaStatement.setColumnVisibility(visibility.getBytes("UTF-8")); return ryaStatement; - } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index ccc2c20..e7ced90 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -37,6 +37,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; import org.apache.hadoop.io.Text; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; @@ -44,11 +47,13 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.apache.rya.rdftriplestore.RyaSailRepository; @@ -56,10 +61,9 @@ import org.apache.rya.sail.config.RyaSailFactory; import org.junit.Test; import org.openrdf.model.URI; import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.URIImpl; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.BindingImpl; +import org.openrdf.query.impl.MapBindingSet; import org.openrdf.repository.RepositoryConnection; import org.openrdf.sail.Sail; @@ -70,7 +74,7 @@ import com.google.common.base.Optional; * Integration tests that ensure the Fluo Application properly exports PCJ * results with the correct Visibility values. */ -public class PcjVisibilityIT extends ITBase { +public class PcjVisibilityIT extends RyaExportITBase { private static final ValueFactory VF = new ValueFactoryImpl(); @@ -94,6 +98,10 @@ public class PcjVisibilityIT extends ITBase { "?worker <" + WORKS_AT + "> <" + BURGER_JOINT + ">. " + "}"; + final Connector accumuloConn = super.getAccumuloConnector(); + final String instanceName = super.getMiniAccumuloCluster().getInstanceName(); + final String zookeepers = super.getMiniAccumuloCluster().getZooKeepers(); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), @@ -103,7 +111,7 @@ public class PcjVisibilityIT extends ITBase { final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); // Grant the root user the "u" authorization. - accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("u")); + super.getAccumuloConnector().securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("u")); // Setup a connection to the Rya instance that uses the "u" authorizations. This ensures // any statements that are inserted will have the "u" authorization on them and that the @@ -127,7 +135,7 @@ public class PcjVisibilityIT extends ITBase { ryaConn.add(VF.createStatement(BOB, WORKS_AT, BURGER_JOINT)); // Wait for Fluo to finish processing. - fluo.waitForObservers(); + super.getMiniFluo().waitForObservers(); // Fetch the exported result and show that its column visibility has been simplified. final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_INSTANCE_NAME, pcjId); @@ -171,41 +179,46 @@ public class PcjVisibilityIT extends ITBase { // Triples that will be streamed into Fluo after the PCJ has been created. final Map<RyaStatement, String> streamedTriples = new HashMap<>(); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), "A&B"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Bob", "http://livesIn", "http://London"), "A"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"), "B"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"),new RyaURI("http://Bob")), "A&B"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://livesIn"),new RyaURI("http://London")), "A"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Bob"), new RyaURI("http://worksAt"),new RyaURI("http://Chipotle")), "B"); + + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"),new RyaURI("http://Charlie")), "B&C"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://livesIn"),new RyaURI("http://London")), "B"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Charlie"), new RyaURI("http://worksAt"),new RyaURI("http://Chipotle")), "C"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), "B&C"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), "B"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"), "C"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"),new RyaURI("http://David")), "C&D"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://David"), new RyaURI("http://livesIn"),new RyaURI("http://London")), "C"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://David"), new RyaURI("http://worksAt"),new RyaURI("http://Chipotle")), "D"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Alice", "http://talksTo", "http://David"), "C&D"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://David", "http://livesIn", "http://London"), "C"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"), "D"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Alice"), new RyaURI("http://talksTo"),new RyaURI("http://Eve")), "D&E"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://livesIn"),new RyaURI("http://Leeds")), "D"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Eve"), new RyaURI("http://worksAt"),new RyaURI("http://Chipotle")), "E"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), "D&E"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), "D"); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"), "E"); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://talksTo"),new RyaURI("http://Alice")), ""); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"),new RyaURI("http://London")), ""); + addStatementVisibilityEntry(streamedTriples, new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://worksAt"),new RyaURI("http://Chipotle")), ""); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), ""); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Frank", "http://livesIn", "http://London"), ""); - addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle"), ""); + final Connector accumuloConn = super.getAccumuloConnector(); // Create the PCJ Table in Accumulo. final PrecomputedJoinStorage rootStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = rootStorage.createPcj(sparql); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); - // Stream the data into Fluo. - for(final RyaStatement statement : streamedTriples.keySet()) { - final Optional<String> visibility = Optional.of(streamedTriples.get(statement)); - new InsertTriples().insert(fluoClient, statement, visibility); + try( final FluoClient fluoClient = FluoFactory.newClient( super.getFluoConfiguration() )) { + // Create the PCJ in Fluo. + new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Stream the data into Fluo. + for(final RyaStatement statement : streamedTriples.keySet()) { + final Optional<String> visibility = Optional.of(streamedTriples.get(statement)); + new InsertTriples().insert(fluoClient, statement, visibility); + } } // Fetch the exported results from Accumulo once the observers finish working. - fluo.waitForObservers(); + super.getMiniFluo().waitForObservers(); setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId); @@ -213,80 +226,98 @@ public class PcjVisibilityIT extends ITBase { final Set<BindingSet> rootResults = toSet( rootStorage.listResults(pcjId)); final Set<BindingSet> rootExpected = Sets.newHashSet(); - rootExpected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London")))); - rootExpected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London")))); - rootExpected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Eve")), - new BindingImpl("city", new URIImpl("http://Leeds")))); - rootExpected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://David")), - new BindingImpl("city", new URIImpl("http://London")))); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Bob")); + bs.addBinding("city", VF.createURI("http://London")); + rootExpected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Charlie")); + bs.addBinding("city", VF.createURI("http://London")); + rootExpected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Eve")); + bs.addBinding("city", VF.createURI("http://Leeds")); + rootExpected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://David")); + bs.addBinding("city", VF.createURI("http://London")); + rootExpected.add(bs); assertEquals(rootExpected, rootResults); + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + // Verify AB final Connector abConn = cluster.getConnector("abUser", "password"); - final PrecomputedJoinStorage abStorage = new AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME); - final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) ); + try(final PrecomputedJoinStorage abStorage = new AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME)) { + final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) ); - final Set<BindingSet> abExpected = Sets.newHashSet(); - abExpected.add( makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London")))); + final Set<BindingSet> abExpected = Sets.newHashSet(); + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Bob")); + bs.addBinding("city", VF.createURI("http://London")); + abExpected.add(bs); - assertEquals(abExpected, abResults); + assertEquals(abExpected, abResults); + } // Verify ABC final Connector abcConn = cluster.getConnector("abcUser", "password"); - final PrecomputedJoinStorage abcStorage = new AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME); - final Set<BindingSet> abcResults = toSet( abcStorage.listResults(pcjId) ); - - final Set<BindingSet> abcExpected = Sets.newHashSet(); - abcExpected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London")))); - abcExpected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London")))); - - assertEquals(abcExpected, abcResults); + try(final PrecomputedJoinStorage abcStorage = new AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME)) { + final Set<BindingSet> abcResults = toSet( abcStorage.listResults(pcjId) ); + + final Set<BindingSet> abcExpected = Sets.newHashSet(); + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Bob")); + bs.addBinding("city", VF.createURI("http://London")); + abcExpected.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Charlie")); + bs.addBinding("city", VF.createURI("http://London")); + abcExpected.add(bs); + + assertEquals(abcExpected, abcResults); + } // Verify ADE final Connector adeConn = cluster.getConnector("adeUser", "password"); - final PrecomputedJoinStorage adeStorage = new AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME); - final Set<BindingSet> adeResults = toSet( adeStorage.listResults(pcjId) ); + try(final PrecomputedJoinStorage adeStorage = new AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME)) { + final Set<BindingSet> adeResults = toSet( adeStorage.listResults(pcjId) ); - final Set<BindingSet> adeExpected = Sets.newHashSet(); - adeExpected.add(makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Eve")), - new BindingImpl("city", new URIImpl("http://Leeds")))); + final Set<BindingSet> adeExpected = Sets.newHashSet(); + bs = new MapBindingSet(); + bs.addBinding("customer", VF.createURI("http://Alice")); + bs.addBinding("worker", VF.createURI("http://Eve")); + bs.addBinding("city", VF.createURI("http://Leeds")); + adeExpected.add(bs); - assertEquals(adeExpected, adeResults); + assertEquals(adeExpected, adeResults); + } // Verify no auths. final Connector noAuthConn = cluster.getConnector("noAuth", "password"); - final PrecomputedJoinStorage noAuthStorage = new AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME); - final Set<BindingSet> noAuthResults = toSet( noAuthStorage.listResults(pcjId) ); - assertTrue( noAuthResults.isEmpty() ); + try(final PrecomputedJoinStorage noAuthStorage = new AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME)) { + final Set<BindingSet> noAuthResults = toSet( noAuthStorage.listResults(pcjId) ); + assertTrue( noAuthResults.isEmpty() ); + } } private void setupTestUsers(final Connector accumuloConn, final String ryaInstanceName, final String pcjId) throws AccumuloException, AccumuloSecurityException { final PasswordToken pass = new PasswordToken("password"); final SecurityOperations secOps = accumuloConn.securityOperations(); - // XXX We need the table name so that we can update security for the users. + // We need the table name so that we can update security for the users. final String pcjTableName = new PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId); // Give the 'roor' user authorizations to see everything. @@ -317,11 +348,15 @@ public class PcjVisibilityIT extends ITBase { triplesMap.put(statement, visibility); } - private Set<BindingSet> toSet(final Iterable<BindingSet> bindingSets) { + private Set<BindingSet> toSet(final CloseableIterator<BindingSet> bindingSets) throws Exception { final Set<BindingSet> set = new HashSet<>(); - for(final BindingSet bindingSet : bindingSets) { - set.add( bindingSet ); + try { + while(bindingSets.hasNext()) { + set.add( bindingSets.next() ); + } + } finally { + bindingSets.close(); } return set; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml index 8aa257b..885a076 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/pom.xml @@ -82,11 +82,8 @@ under the License. <scope>test</scope> </dependency> <dependency> - <!-- BaseIT exists here, needs test jar to be generated. --> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.integration</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-test</artifactId> <scope>test</scope> </dependency> </dependencies> @@ -125,5 +122,42 @@ under the License. </configuration> </plugin> </plugins> + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId> + org.codehaus.mojo + </groupId> + <artifactId> + properties-maven-plugin + </artifactId> + <versionRange> + [1.0.0,) + </versionRange> + <goals> + <goal> + set-system-properties + </goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java new file mode 100644 index 0000000..5fe999f --- /dev/null +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.openrdf.sail.Sail; + +/** + * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index. + */ +public class RyaExportITBase extends AccumuloExportITBase { + + protected static final String RYA_INSTANCE_NAME = "test_"; + + private RyaSailRepository ryaSailRepo = null; + + public RyaExportITBase() { + // Indicates that MiniFluo should be started before each test. + super(true); + } + + @BeforeClass + public static void setupLogging() { + BasicConfigurator.configure(); + Logger.getRootLogger().setLevel(Level.ERROR); + } + + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); + ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); + ryaParams.setExporterUsername(ACCUMULO_USER); + ryaParams.setExporterPassword(ACCUMULO_PASSWORD); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); + observers.add(exportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + + @Before + public void setupRya() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Install the Rya instance to the mini accumulo cluster. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableFreeTextIndex(false) + .setEnableEntityCentricIndex(false) + .setEnableGeoIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(true) + .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() ) + .build()); + + // Connect to the Rya instance that was just installed. + final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); + final Sail sail = RyaSailFactory.getInstance(conf); + ryaSailRepo = new RyaSailRepository(sail); + } + + @After + public void teardownRya() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Uninstall the instance of Rya. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + instanceName, + zookeepers), + super.getAccumuloConnector()); + + ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); + + // Shutdown the repo. + ryaSailRepo.shutDown(); + } + + /** + * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into. + */ + protected RyaSailRepository getRyaSailRepository() throws Exception { + return ryaSailRepo; + } + + protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + + // Accumulo connection information. + conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); + conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); + conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); + conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); + conf.setAuths(""); + + // PCJ configuration information. + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + + conf.setDisplayQueryPlan(true); + + return conf; + } +} \ No newline at end of file
