http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index 05dfd32..f330825 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -29,12 +29,12 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java index 219e079..ab7610d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java @@ -32,9 +32,9 @@ import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java index c8167c7..7a4ed8d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java @@ -46,7 +46,6 @@ import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils; -import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; @@ -57,6 +56,7 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index f815a55..85c5030 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -22,20 +22,27 @@ import static java.util.Objects.requireNonNull; import static org.junit.Assert.assertEquals; import java.math.BigDecimal; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import javax.xml.datatype.DatatypeFactory; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.core.client.FluoClientImpl; import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Literal; import org.openrdf.model.Statement; @@ -51,6 +58,7 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; import org.openrdf.query.algebra.evaluation.function.Function; import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryException; import org.openrdf.repository.sail.SailRepositoryConnection; import com.google.common.collect.Sets; @@ -60,6 +68,8 @@ import com.google.common.collect.Sets; */ public class QueryIT extends RyaExportITBase { + private enum ExporterType {Pcj, Periodic}; + @Test public void optionalStatements() throws Exception { // A query that has optional statement patterns. This query is looking for all @@ -100,7 +110,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } /** @@ -181,7 +191,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } @Test @@ -241,7 +251,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } @Test @@ -283,7 +293,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } @Test @@ -368,7 +378,7 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } @Test @@ -430,10 +440,295 @@ public class QueryIT extends RyaExportITBase { expectedResults.add(bs); // Verify the end results of the query match the expected results. - runTest(sparql, statements, expectedResults); + runTest(sparql, statements, expectedResults, ExporterType.Pcj); } + + + @Test + public void periodicQueryTestWithoutAggregation() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "select ?id where {" //n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasId> ?id }"; //n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")) + ); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime/period)*period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + expectedResults.add(bs); - public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults) throws Exception { + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } + + + @Test + public void periodicQueryTestWithAggregation() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "select (count(?obs) as ?total) where {" //n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasId> ?id }"; //n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")) + ); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime/period)*period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("3", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + expectedResults.add(bs); + + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } + + @Test + public void periodicQueryTestWithAggregationAndGroupBy() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "select ?id (count(?obs) as ?total) where {" //n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasId> ?id } group by ?id"; //n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")) + ); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime/period)*period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + expectedResults.add(bs); + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } + + + + + + public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults, ExporterType type ) throws Exception { requireNonNull(sparql); requireNonNull(statements); requireNonNull(expectedResults); @@ -443,9 +738,38 @@ public class QueryIT extends RyaExportITBase { final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn); - ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); - - // Write the data to Rya. + switch (type) { + case Pcj: + ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); + addStatementsAndWait(statements); + // Fetch the value that is stored within the PCJ table. + try (final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) { + final String pcjId = pcjStorage.listPcjs().get(0); + final Set<BindingSet> results = Sets.newHashSet(pcjStorage.listResults(pcjId)); + // Ensure the result of the query matches the expected result. + assertEquals(expectedResults, results); + } + break; + case Periodic: + PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName()); + String periodicId = periodicStorage.createPeriodicQuery(sparql); + try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { + new CreatePcj().createPcj(periodicId, sparql, fluo); + } + addStatementsAndWait(statements); + final Set<BindingSet> results = Sets.newHashSet(); + try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) { + while (resultIter.hasNext()) { + results.add(resultIter.next()); + } + } + assertEquals(expectedResults, results); + break; + } + } + + private void addStatementsAndWait(final Collection<Statement> statements) throws RepositoryException, Exception { + // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); ryaConn.begin(); ryaConn.add(statements); @@ -454,14 +778,5 @@ public class QueryIT extends RyaExportITBase { // Wait for the Fluo application to finish computing the end result. super.getMiniFluo().waitForObservers(); - - // Fetch the value that is stored within the PCJ table. - try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) { - final String pcjId = pcjStorage.listPcjs().get(0); - final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); - - // Ensure the result of the query matches the expected result. - assertEquals(expectedResults, results); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 9c21afd..12c69ca 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -28,11 +28,11 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index a8d470f..e6d287e 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -28,12 +28,12 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.indexing.external.PrecomputedJoinIndexer; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index 72759bb..3f51311 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -28,11 +28,11 @@ import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Resource; import org.openrdf.model.Statement; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java index 150492f..ab42e89 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java @@ -32,10 +32,10 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Assert; import org.junit.Test; import org.openrdf.model.Statement; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index 46bc7b0..dc2f859 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -44,19 +44,18 @@ import org.apache.hadoop.io.Text; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.apache.rya.rdftriplestore.RyaSailRepository; import org.apache.rya.sail.config.RyaSailFactory; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml new file mode 100644 index 0000000..67bd0f0 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml @@ -0,0 +1,108 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.pcj.fluo.test.base</artifactId> + + <name>Apache Rya Integration Base</name> + <description>Base classes for Integration tests.</description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-mini</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <!-- Testing dependencies. --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>0.10.1.0</version> + <classifier>test</classifier> + <scope>compile</scope> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-test</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java new file mode 100644 index 0000000..b9be828 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/kafka/base/KafkaITBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.base; + +import java.nio.file.Files; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.junit.After; +import org.junit.Before; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +public class KafkaITBase { + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + @Before + public void setupKafka() throws Exception { + + // Setup Kafka. + zkServer = new EmbeddedZookeeper(); + final String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils.apply(zkClient, false); + + // setup Broker + final Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + } + + /** + * Close all the Kafka mini server and mini-zookeeper + * + * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() + */ + @After + public void teardownKafka() { + kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java new file mode 100644 index 0000000..32ee962 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/FluoITBase.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.pcj.fluo.test.base; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.UnknownHostException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.accumulo.MiniAccumuloSingleton; +import org.apache.rya.accumulo.RyaTestInstanceRule; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloInstall; +import org.apache.zookeeper.ClientCnxn; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import org.apache.fluo.api.client.FluoAdmin; +import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.DuplicateInstanceNameException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; + +/** + * Integration tests that ensure the Fluo application processes PCJs results + * correctly. + * <p> + * This class is being ignored because it doesn't contain any unit tests. + */ +public abstract class FluoITBase { + private static final Logger log = Logger.getLogger(FluoITBase.class); + + // Mini Accumulo Cluster + private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance(); + private static MiniAccumuloCluster cluster; + + private static String instanceName = null; + private static String zookeepers = null; + + protected static Connector accumuloConn = null; + + // Fluo data store and connections. + protected MiniFluo fluo = null; + protected FluoConfiguration fluoConfig = null; + protected FluoClient fluoClient = null; + + // Rya data store and connections. + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + + @Rule + public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); + + @BeforeClass + public static void beforeClass() throws Exception { + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + + // Setup and start the Mini Accumulo. + cluster = clusterInstance.getCluster(); + + // Store a connector to the Mini Accumulo. + instanceName = cluster.getInstanceName(); + zookeepers = cluster.getZooKeepers(); + + final Instance instance = new ZooKeeperInstance(instanceName, zookeepers); + accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword())); + } + + @Before + public void setupMiniResources() throws Exception { + // Initialize the Mini Fluo that will be used to store created queries. + fluoConfig = createFluoConfig(); + preFluoInitHook(); + FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions() + .setClearTable(true) + .setClearZookeeper(true)); + postFluoInitHook(); + fluo = FluoFactory.newMiniFluo(fluoConfig); + fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); + + // Initialize the Rya that will be used by the tests. + ryaRepo = setupRya(); + ryaConn = ryaRepo.getConnection(); + } + + @After + public void shutdownMiniResources() { + if (ryaConn != null) { + try { + log.info("Shutting down Rya Connection."); + ryaConn.close(); + log.info("Rya Connection shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Connection.", e); + } + } + + if (ryaRepo != null) { + try { + log.info("Shutting down Rya Repo."); + ryaRepo.shutDown(); + log.info("Rya Repo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Repo.", e); + } + } + + if (fluoClient != null) { + try { + log.info("Shutting down Fluo Client."); + fluoClient.close(); + log.info("Fluo Client shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Fluo Client.", e); + } + } + + if (fluo != null) { + try { + log.info("Shutting down Mini Fluo."); + fluo.close(); + log.info("Mini Fluo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo.", e); + } + } + } + + protected void preFluoInitHook() throws Exception { + + } + + protected void postFluoInitHook() throws Exception { + + } + + protected MiniAccumuloCluster getMiniAccumuloCluster() { + return cluster; + } + + protected MiniFluo getMiniFluo() { + return fluo; + } + + public RyaSailRepository getRyaSailRepository() { + return ryaRepo; + } + + public Connector getAccumuloConnector() { + return accumuloConn; + } + + public String getRyaInstanceName() { + return testInstance.getRyaInstanceName(); + } + + protected String getUsername() { + return clusterInstance.getUsername(); + } + + protected String getPassword() { + return clusterInstance.getPassword(); + } + + protected FluoConfiguration getFluoConfiguration() { + return fluoConfig; + } + + public AccumuloConnectionDetails createConnectionDetails() { + return new AccumuloConnectionDetails( + clusterInstance.getUsername(), + clusterInstance.getPassword().toCharArray(), + clusterInstance.getInstanceName(), + clusterInstance.getZookeepers()); + } + + private FluoConfiguration createFluoConfig() { + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(clusterInstance.getUsername()); + config.setAccumuloPassword(clusterInstance.getPassword()); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(getRyaInstanceName()); + config.setAccumuloTable("fluo" + getRyaInstanceName()); + return config; + } + + /** + * Sets up a Rya instance. + */ + protected RyaSailRepository setupRya() + throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, + NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException, + RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException { + checkNotNull(instanceName); + checkNotNull(zookeepers); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(getRyaInstanceName()); + conf.setDisplayQueryPlan(true); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); + conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername()); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword()); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers()); + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + + // Install the test instance of Rya. + final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setEnableGeoIndex(true) + .setFluoPcjAppName(getRyaInstanceName()) + .build(); + install.install(getRyaInstanceName(), installConfig); + + // Connect to the instance of Rya that was just installed. + final Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); + + return ryaRepo; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java new file mode 100644 index 0000000..85da422 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.pcj.fluo.test.base; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.Time; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +/** + * The base Integration Test class used for Fluo applications that export to a + * Kakfa topic. + */ +public class KafkaExportITBase extends AccumuloExportITBase { + + protected static final String RYA_INSTANCE_NAME = "test_"; + + private static final String ZKHOST = "127.0.0.1"; + private static final String BROKERHOST = "127.0.0.1"; + private static final String BROKERPORT = "9092"; + private ZkUtils zkUtils; + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkClient zkClient; + + // The Rya instance statements are written to that will be fed into the Fluo + // app. + private RyaSailRepository ryaSailRepo = null; + private AccumuloRyaDAO dao = null; + + /** + * Add info about the Kafka queue/topic to receive the export. + */ + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini + // accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + + final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); + kafkaParams.setExportToKafka(true); + + // Configure the Kafka Producer + final Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + kafkaParams.addAllProducerConfig(producerConfig); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); + observers.add(exportObserverConfig); + + //create construct query observer and tell it not to export to Kafka + //it will only add results back into Fluo + HashMap<String, String> constructParams = new HashMap<>(); + final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams); + kafkaConstructParams.setExportToKafka(true); + + // Configure the Kafka Producer + final Properties constructProducerConfig = new Properties(); + constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); + kafkaConstructParams.addAllProducerConfig(constructProducerConfig); + + final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), + constructParams); + observers.add(constructExportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + + /** + * setup mini kafka and call the super to setup mini fluo + */ + @Before + public void setupKafka() throws Exception { + // Install an instance of Rya on the Accumulo cluster. + installRyaInstance(); + + // Setup Kafka. + zkServer = new EmbeddedZookeeper(); + final String zkConnect = ZKHOST + ":" + zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + final Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + final KafkaConfig config = new KafkaConfig(brokerProps); + final Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + } + + @After + public void teardownRya() { + final MiniAccumuloCluster cluster = getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Uninstall the instance of Rya. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), + super.getAccumuloConnector()); + + try { + ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); + // Shutdown the repo. + if(ryaSailRepo != null) {ryaSailRepo.shutDown();} + if(dao != null ) {dao.destroy();} + } catch (Exception e) { + System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage()); + } + } + + private void installRyaInstance() throws Exception { + final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); + final String instanceName = cluster.getInstanceName(); + final String zookeepers = cluster.getZooKeepers(); + + // Install the Rya instance to the mini accumulo cluster. + final RyaClient ryaClient = AccumuloRyaClientFactory.build( + new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), + super.getAccumuloConnector()); + + ryaClient.getInstall().install(RYA_INSTANCE_NAME, + InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false) + .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true) + .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build()); + + // Connect to the Rya instance that was just installed. + final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); + final Sail sail = RyaSailFactory.getInstance(conf); + dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf); + ryaSailRepo = new RyaSailRepository(sail); + } + + protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + + // Accumulo connection information. + conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); + conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); + conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); + conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); + conf.setAuths(""); + + // PCJ configuration information. + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + + conf.setDisplayQueryPlan(true); + + return conf; + } + + /** + * @return A {@link RyaSailRepository} that is connected to the Rya instance + * that statements are loaded into. + */ + protected RyaSailRepository getRyaSailRepository() throws Exception { + return ryaSailRepo; + } + + /** + * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct + * visibilities can be added to the Rya Instance + */ + protected AccumuloRyaDAO getRyaDAO() { + return dao; + } + + /** + * Close all the Kafka mini server and mini-zookeeper + */ + @After + public void teardownKafka() { + if(kafkaServer != null) {kafkaServer.shutdown();} + if(zkClient != null) {zkClient.close();} + if(zkServer != null) {zkServer.shutdown();} + } + + /** + * Test kafka without rya code to make sure kafka works in this environment. + * If this test fails then its a testing environment issue, not with Rya. + * Source: https://github.com/asmaier/mini-kafka + */ + @Test + public void embeddedKafkaTest() throws Exception { + // create topic + final String topic = "testTopic"; + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + + // setup producer + final Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps); + + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty("group.id", "group0"); + consumerProps.setProperty("client.id", "consumer0"); + consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put("auto.offset.reset", "earliest"); + + final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(topic)); + + // send message + final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8)); + producer.send(data); + producer.close(); + + // starting consumer + final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); + assertEquals(1, records.count()); + final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); + final ConsumerRecord<Integer, byte[]> record = recordIterator.next(); + assertEquals(42, (int) record.key()); + assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); + consumer.close(); + } + + protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) { + // setup consumer + final Properties consumerProps = new Properties(); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.IntegerDeserializer"); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); + + // to make sure the consumer starts from the beginning of the topic + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(Arrays.asList(TopicName)); + return consumer; + } + + protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception { + requireNonNull(sparql); + requireNonNull(statements); + + // Register the PCJ with Rya. + final Instance accInstance = super.getAccumuloConnector().getInstance(); + final Connector accumuloConn = super.getAccumuloConnector(); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); + + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + + // Write the data to Rya. + final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); + ryaConn.begin(); + ryaConn.add(statements); + ryaConn.commit(); + ryaConn.close(); + + // Wait for the Fluo application to finish computing the end result. + super.getMiniFluo().waitForObservers(); + + // The PCJ Id is the topic name the results will be written to. + return pcjId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java new file mode 100644 index 0000000..6feadff --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.pcj.fluo.test.base; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.fluo.api.config.ObserverSpecification; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.junit.BeforeClass; + +/** + * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index. + */ +public class RyaExportITBase extends FluoITBase { + + @BeforeClass + public static void setupLogging() { + BasicConfigurator.configure(); + Logger.getRootLogger().setLevel(Level.ERROR); + } + + @Override + protected void preFluoInitHook() throws Exception { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverSpecification> observers = new ArrayList<>(); + observers.add(new ObserverSpecification(BatchObserver.class.getName())); + observers.add(new ObserverSpecification(TripleObserver.class.getName())); + observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); + observers.add(new ObserverSpecification(JoinObserver.class.getName())); + observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + observers.add(new ObserverSpecification(PeriodicQueryObserver.class.getName())); + + // Configure the export observer to export new PCJ results to the mini accumulo cluster. + final HashMap<String, String> exportParams = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setRyaInstanceName(getRyaInstanceName()); + ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); + ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); + ryaParams.setExporterUsername(getUsername()); + ryaParams.setExporterPassword(getPassword()); + + final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); + observers.add(exportObserverConfig); + + // Add the observers to the Fluo Configuration. + super.getFluoConfiguration().addObservers(observers); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pom.xml b/extras/rya.pcj.fluo/pom.xml index 54a22fc..6979768 100644 --- a/extras/rya.pcj.fluo/pom.xml +++ b/extras/rya.pcj.fluo/pom.xml @@ -38,6 +38,7 @@ <module>pcj.fluo.app</module> <module>pcj.fluo.client</module> <module>pcj.fluo.integration</module> + <module>pcj.fluo.test.base</module> <module>pcj.fluo.demo</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml new file mode 100644 index 0000000..bcd60aa --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml @@ -0,0 +1,77 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to you under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.service.integration.tests</artifactId> + + <name>Apache Rya Periodic Service Integration Tests</name> + <description>Integration Tests for Rya Periodic Service</description> + + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.test.base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>log4j-1.2-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-core</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.notification</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>logback-classic</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + <exclusion> + <artifactId>logback-core</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <encoding>UTF-8</encoding> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file
