Repository: incubator-rya Updated Branches: refs/heads/master 6ce0b00b4 -> e387818ba
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 85c5030..6ecec02 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -36,7 +36,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.core.client.FluoClientImpl; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; @@ -68,25 +68,25 @@ import com.google.common.collect.Sets; */ public class QueryIT extends RyaExportITBase { - private enum ExporterType {Pcj, Periodic}; - + private enum ExporterType { + Pcj, Periodic + }; + @Test public void optionalStatements() throws Exception { // A query that has optional statement patterns. This query is looking for all // people who have Law degrees and any BAR exams they have passed (though they // do not have to have passed any). - final String sparql = - "SELECT ?person ?exam " + - "WHERE {" + - "?person <http://hasDegreeIn> <http://Law> . " + - "OPTIONAL {?person <http://passedExam> ?exam } . " + - "}"; + final String sparql = "SELECT ?person ?exam " + "WHERE {" + "?person <http://hasDegreeIn> <http://Law> . " + + "OPTIONAL {?person <http://passedExam> ?exam } . " + "}"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://hasDegreeIn"), vf.createURI("http://Computer Science")), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://passedExam"), vf.createURI("http://Certified Ethical Hacker")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://hasDegreeIn"), + vf.createURI("http://Computer Science")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://passedExam"), + vf.createURI("http://Certified Ethical Hacker")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://hasDegreeIn"), vf.createURI("http://Law")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://passedExam"), vf.createURI("http://MBE")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://passedExam"), vf.createURI("http://BAR-Kansas")), @@ -121,16 +121,10 @@ public class QueryIT extends RyaExportITBase { // A query that find people who live in the USA, have been recruited by Geek Squad, // and are skilled with computers. The resulting binding set includes everybody who // was involved in the recruitment process. - final String sparql = - "SELECT ?recruiter ?candidate ?leader " + - "{ " + - "?recruiter <http://recruiterFor> <http://GeekSquad>. " + - "?candidate <http://skilledWith> <http://Computers>. " + - "?candidate <http://livesIn> \"USA\". " + - "?leader <http://leaderOf> <http://GeekSquad>. " + - "?recruiter <http://talksTo> ?candidate. " + - "?candidate <http://talksTo> ?leader. " + - "}"; + final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + "?recruiter <http://recruiterFor> <http://GeekSquad>. " + + "?candidate <http://skilledWith> <http://Computers>. " + "?candidate <http://livesIn> \"USA\". " + + "?leader <http://leaderOf> <http://GeekSquad>. " + "?recruiter <http://talksTo> ?candidate. " + + "?candidate <http://talksTo> ?leader. " + "}"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); @@ -139,11 +133,11 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://leaderOf"), vf.createURI("http://GeekSquad")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://leaderOf"), vf.createURI("http://GeekSquad")), - // Recruiters + // Recruiters vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://recruiterFor"), vf.createURI("http://GeekSquad")), vf.createStatement(vf.createURI("http://David"), vf.createURI("http://recruiterFor"), vf.createURI("http://GeekSquad")), - // Candidates + // Candidates vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), @@ -155,7 +149,7 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://Ivan"), vf.createURI("http://skilledWith"), vf.createURI("http://Computers")), vf.createStatement(vf.createURI("http://Ivan"), vf.createURI("http://livesIn"), vf.createLiteral("USA")), - // Candidates the recruiters talk to. + // Candidates the recruiters talk to. vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://George")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Harry")), @@ -163,7 +157,7 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Frank")), vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Ivan")), - // Recruits that talk to leaders. + // Recruits that talk to leaders. vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), vf.createStatement(vf.createURI("http://George"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), vf.createStatement(vf.createURI("http://Harry"), vf.createURI("http://talksTo"), vf.createURI("http://Bob")), @@ -196,15 +190,9 @@ public class QueryIT extends RyaExportITBase { @Test public void withURIFilters() throws Exception { - final String sparql = - "SELECT ?customer ?worker ?city " + - "{ " + - "FILTER(?customer = <http://Alice>) " + - "FILTER(?city = <http://London>) " + - "?customer <http://talksTo> ?worker. " + - "?worker <http://livesIn> ?city. " + - "?worker <http://worksAt> <http://Chipotle>. " + - "}"; + final String sparql = "SELECT ?customer ?worker ?city " + "{ " + "FILTER(?customer = <http://Alice>) " + + "FILTER(?city = <http://London>) " + "?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + + "?worker <http://worksAt> <http://Chipotle>. " + "}"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); @@ -213,19 +201,19 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://livesIn"), vf.createURI("http://London")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Charlie")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Charlie")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://livesIn"), vf.createURI("http://London")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://David")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://David")), vf.createStatement(vf.createURI("http://David"), vf.createURI("http://livesIn"), vf.createURI("http://London")), vf.createStatement(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")), vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://livesIn"), vf.createURI("http://Leeds")), vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle")), - vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://livesIn"), vf.createURI("http://London")), vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://Chipotle"))); @@ -256,13 +244,8 @@ public class QueryIT extends RyaExportITBase { @Test public void withNumericFilters() throws Exception { - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; + final String sparql = "SELECT ?name ?age " + "{" + "FILTER(?age < 30) ." + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + "}"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); @@ -273,7 +256,7 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://David"), vf.createURI("http://hasAge"), vf.createLiteral(16)), vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://hasAge"), vf.createLiteral(35)), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), @@ -298,14 +281,8 @@ public class QueryIT extends RyaExportITBase { @Test public void withCustomFilters() throws Exception { - final String sparql = - "prefix ryafunc: <tag:rya.apache.org,2017:function#> " + - "SELECT ?name ?age " + - "{ " + - "FILTER( ryafunc:isTeen(?age) ) . " + - "?name <http://hasAge> ?age . " + - "?name <http://playsSport> \"Soccer\" . " + - "}"; + final String sparql = "prefix ryafunc: <tag:rya.apache.org,2017:function#> " + "SELECT ?name ?age " + "{ " + + "FILTER( ryafunc:isTeen(?age) ) . " + "?name <http://hasAge> ?age . " + "?name <http://playsSport> \"Soccer\" . " + "}"; // Register a custom Filter. final Function fooFunction = new Function() { @@ -335,10 +312,12 @@ public class QueryIT extends RyaExportITBase { final double doubleValue = literal.doubleValue(); return BooleanLiteralImpl.valueOf(doubleValue < TEEN_THRESHOLD); } else { - throw new ValueExprEvaluationException("unexpected datatype (expect decimal/int or floating) for function operand: " + args[0]); + throw new ValueExprEvaluationException( + "unexpected datatype (expect decimal/int or floating) for function operand: " + args[0]); } } else { - throw new ValueExprEvaluationException("unexpected input value (expect non-null and numeric) for function: " + args[0]); + throw new ValueExprEvaluationException( + "unexpected input value (expect non-null and numeric) for function: " + args[0]); } } else { throw new ValueExprEvaluationException("unexpected input value (expect literal) for function: " + args[0]); @@ -358,7 +337,7 @@ public class QueryIT extends RyaExportITBase { vf.createStatement(vf.createURI("http://David"), vf.createURI("http://hasAge"), vf.createLiteral(16)), vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://hasAge"), vf.createLiteral(35)), - vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Basketball")), vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://playsSport"), vf.createLiteral("Soccer")), @@ -387,29 +366,32 @@ public class QueryIT extends RyaExportITBase { final String dtPredUri = "http://www.w3.org/2006/time#inXSDDateTime"; final String dtPred = "<" + dtPredUri + ">"; - final String sparql = - "PREFIX time: <http://www.w3.org/2006/time#> " + - "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> " + - "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " + - "SELECT ?event ?time " + - "WHERE { " + - "?event " + dtPred + " ?time . " + - "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) " + - "}"; + final String sparql = "PREFIX time: <http://www.w3.org/2006/time#> " + "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> " + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " + "SELECT ?event ?time " + "WHERE { " + "?event " + dtPred + " ?time . " + + "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) " + "}"; // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("http://eventz"), vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createURI("http://www.w3.org/2006/time#Instant")), - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 1 second - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))), // 2 second - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 3 seconds - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 4 seconds - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 seconds - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))), - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))), - vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z")))); + vf.createStatement(vf.createURI("http://eventz"), vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + vf.createURI("http://www.w3.org/2006/time#Instant")), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 1 second + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))), // 2 second + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 3 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 4 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 seconds + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))), + vf.createStatement(vf.createURI("http://eventz"), vf.createURI(dtPredUri), + vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z")))); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResults = new HashSet<>(); @@ -442,97 +424,99 @@ public class QueryIT extends RyaExportITBase { // Verify the end results of the query match the expected results. runTest(sparql, statements, expectedResults, ExporterType.Pcj); } - - + @Test public void periodicQueryTestWithoutAggregation() throws Exception { - String query = "prefix function: <http://org.apache.rya/function#> " //n - + "prefix time: <http://www.w3.org/2006/time#> " //n - + "select ?id where {" //n - + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n - + "?obs <uri:hasTime> ?time. " //n - + "?obs <uri:hasId> ?id }"; //n + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id }"; // n // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); ZonedDateTime time = ZonedDateTime.now(); long currentTime = time.toInstant().toEpochMilli(); - + ZonedDateTime zTime1 = time.minusMinutes(30); String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime2 = zTime1.minusMinutes(30); String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime3 = zTime2.minusMinutes(30); String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime4 = zTime3.minusMinutes(30); String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); - + final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")) - ); + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResults = new HashSet<>(); long period = 1800000; - long binId = (currentTime/period)*period; - + long binId = (currentTime / period) * period; + MapBindingSet bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period)); expectedResults.add(bs); bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); @@ -541,183 +525,189 @@ public class QueryIT extends RyaExportITBase { // Verify the end results of the query match the expected results. runTest(query, statements, expectedResults, ExporterType.Periodic); } - - + @Test public void periodicQueryTestWithAggregation() throws Exception { - String query = "prefix function: <http://org.apache.rya/function#> " //n - + "prefix time: <http://www.w3.org/2006/time#> " //n - + "select (count(?obs) as ?total) where {" //n - + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n - + "?obs <uri:hasTime> ?time. " //n - + "?obs <uri:hasId> ?id }"; //n + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id }"; // n // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); ZonedDateTime time = ZonedDateTime.now(); long currentTime = time.toInstant().toEpochMilli(); - + ZonedDateTime zTime1 = time.minusMinutes(30); String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime2 = zTime1.minusMinutes(30); String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime3 = zTime2.minusMinutes(30); String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime4 = zTime3.minusMinutes(30); String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); - + final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")) - ); + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResults = new HashSet<>(); long period = 1800000; - long binId = (currentTime/period)*period; - + long binId = (currentTime / period) * period; + MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("3", XMLSchema.INTEGER)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period)); expectedResults.add(bs); - // Verify the end results of the query match the expected results. runTest(query, statements, expectedResults, ExporterType.Periodic); } - + @Test public void periodicQueryTestWithAggregationAndGroupBy() throws Exception { - String query = "prefix function: <http://org.apache.rya/function#> " //n - + "prefix time: <http://www.w3.org/2006/time#> " //n - + "select ?id (count(?obs) as ?total) where {" //n - + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n - + "?obs <uri:hasTime> ?time. " //n - + "?obs <uri:hasId> ?id } group by ?id"; //n + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n // Create the Statements that will be loaded into Rya. final ValueFactory vf = new ValueFactoryImpl(); final DatatypeFactory dtf = DatatypeFactory.newInstance(); ZonedDateTime time = ZonedDateTime.now(); long currentTime = time.toInstant().toEpochMilli(); - + ZonedDateTime zTime1 = time.minusMinutes(30); String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime2 = zTime1.minusMinutes(30); String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime3 = zTime2.minusMinutes(30); String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - + ZonedDateTime zTime4 = zTime3.minusMinutes(30); String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); - + final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")) - ); + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2"))); // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expectedResults = new HashSet<>(); long period = 1800000; - long binId = (currentTime/period)*period; - + long binId = (currentTime / period) * period; + MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period)); expectedResults.add(bs); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period)); expectedResults.add(bs); // Verify the end results of the query match the expected results. @@ -725,10 +715,190 @@ public class QueryIT extends RyaExportITBase { } + @Test + public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?location ?total " + + "where { Filter(?total > 1) {" + + "select ?location (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_4")), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_1")), + vf.createStatement(vf.createURI("urn:obs_6"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_6"), vf.createURI("uri:hasLoc"), vf.createLiteral("loc_2"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime / period) * period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createLiteral("loc_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createLiteral("loc_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createLiteral("loc_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } - + @Test + public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?location ?total ?population " + + "where { Filter(?total > 1)" + + "?location <uri:hasPopulation> ?population . {" + + "select ?location (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n + + // Create the Statements that will be loaded into Rya. + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_1")), + vf.createStatement(vf.createURI("uri:loc_1"), vf.createURI("uri:hasPopulation"), vf.createLiteral(3500)), + vf.createStatement(vf.createURI("uri:loc_2"), vf.createURI("uri:hasPopulation"), vf.createLiteral(8000)), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_4")), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_1")), + vf.createStatement(vf.createURI("urn:obs_6"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_6"), vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_2"))); + + // Create the expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expectedResults = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime / period) * period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createURI("uri:loc_1")); + bs.addBinding("population", vf.createLiteral("3500", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createURI("uri:loc_2")); + bs.addBinding("population", vf.createLiteral("8000", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId)); + expectedResults.add(bs); + - public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults, ExporterType type ) throws Exception { + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("location", vf.createURI("uri:loc_2")); + bs.addBinding("population", vf.createLiteral("8000", XMLSchema.INTEGER)); + bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); + expectedResults.add(bs); + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } + + @Test(expected= IllegalArgumentException.class) + public void nestedConstructPeriodicQueryWithAggregationAndGroupBy() throws Exception { + String query = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "construct{?location a <uri:highObservationArea> } " + + "where { Filter(?total > 1)" + + "?location <uri:hasPopulation> ?population . {" + + "select ?location (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n + + + final Collection<Statement> statements = Sets.newHashSet(); + final Set<BindingSet> expectedResults = new HashSet<>(); + + // Verify the end results of the query match the expected results. + runTest(query, statements, expectedResults, ExporterType.Periodic); + } + + public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults, + ExporterType type) throws Exception { requireNonNull(sparql); requireNonNull(statements); requireNonNull(expectedResults); @@ -754,9 +924,10 @@ public class QueryIT extends RyaExportITBase { PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName()); String periodicId = periodicStorage.createPeriodicQuery(sparql); try (FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration())) { - new CreatePcj().createPcj(periodicId, sparql, fluo); + new CreateFluoPcj().createPcj(periodicId, sparql, fluo); } addStatementsAndWait(statements); + final Set<BindingSet> results = Sets.newHashSet(); try (CloseableIterator<BindingSet> resultIter = periodicStorage.listResults(periodicId, Optional.empty())) { while (resultIter.hasNext()) { @@ -767,9 +938,9 @@ public class QueryIT extends RyaExportITBase { break; } } - + private void addStatementsAndWait(final Collection<Statement> statements) throws RepositoryException, Exception { - // Write the data to Rya. + // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); ryaConn.begin(); ryaConn.add(statements); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 12c69ca..15ff1b4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -110,7 +110,7 @@ public class RyaExportIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index e6d287e..5cd3ab1 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.indexing.external.PrecomputedJoinIndexer; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -99,7 +99,7 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Verify the end results of the query match the expected results. super.getMiniFluo().waitForObservers(); @@ -165,7 +165,7 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); super.getMiniFluo().waitForObservers(); @@ -236,7 +236,7 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); super.getMiniFluo().waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index 3f51311..e83a894 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -60,7 +60,7 @@ public class StreamingTestIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); // Task the Fluo app with the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Add Statements to the Fluo app. log.info("Adding Join Pairs..."); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java index ab42e89..eab99b8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java @@ -32,7 +32,7 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; @@ -107,7 +107,7 @@ public class HistoricStreamingVisibilityIT extends RyaExportITBase { final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); } // Verify the end results of the query match the expected results. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index dc2f859..2497793 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -49,7 +49,7 @@ import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; @@ -220,7 +220,7 @@ public class PcjVisibilityIT extends RyaExportITBase { try( final FluoClient fluoClient = FluoFactory.newClient( super.getFluoConfiguration() )) { // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, getRyaInstanceName()); + new CreateFluoPcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. for(final RyaStatement statement : streamedTriples.keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index 85da422..c828a20 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.config.ObserverSpecification; import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.fluo.recipes.test.FluoITHelper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -59,6 +60,7 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; @@ -72,7 +74,6 @@ import org.openrdf.model.Statement; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; - import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.KafkaConfig; @@ -117,6 +118,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); + observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); // Configure the export observer to export new PCJ results to the mini // accumulo cluster. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java index 6feadff..9c5732f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java @@ -32,6 +32,7 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; @@ -59,6 +60,7 @@ public class RyaExportITBase extends FluoITBase { observers.add(new ObserverSpecification(FilterObserver.class.getName())); observers.add(new ObserverSpecification(AggregationObserver.class.getName())); observers.add(new ObserverSpecification(PeriodicQueryObserver.class.getName())); + observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); // Configure the export observer to export new PCJ results to the mini accumulo cluster. final HashMap<String, String> exportParams = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java index 1902248..4d1bc75 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java @@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.core.client.FluoClientImpl; import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; @@ -49,11 +50,11 @@ public class PeriodicNotificationProviderIT extends AccumuloExportITBase { BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications); PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); - CreatePcj pcj = new CreatePcj(); + CreateFluoPcj pcj = new CreateFluoPcj(); String id = null; try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { - id = pcj.createPcj(sparql, fluo); + id = pcj.createPcj(sparql, fluo).getQueryId(); provider.processRegisteredNotifications(coord, fluo.newSnapshot()); } @@ -61,7 +62,7 @@ public class PeriodicNotificationProviderIT extends AccumuloExportITBase { Assert.assertEquals(5000, notification.getInitialDelay()); Assert.assertEquals(15000, notification.getPeriod()); Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit()); - Assert.assertEquals(id, notification.getId()); + Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notification.getId()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java index 7f71b52..6aade52 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java @@ -21,8 +21,9 @@ package org.apache.rya.periodic.notification.api; import java.util.Optional; import org.apache.fluo.api.client.FluoClient; -import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; @@ -31,6 +32,8 @@ import org.apache.rya.periodic.notification.notification.PeriodicNotification; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.evaluation.function.Function; +import com.google.common.base.Preconditions; + /** * Object that creates a Periodic Query. A Periodic Query is any query * requesting periodic updates about events that occurred within a given @@ -79,8 +82,9 @@ public class CreatePeriodicQuery { Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); if(optNode.isPresent()) { PeriodicQueryNode periodicNode = optNode.get(); - CreatePcj createPcj = new CreatePcj(); - String queryId = createPcj.createPcj(sparql, fluoClient); + CreateFluoPcj createPcj = new CreateFluoPcj(); + String queryId = createPcj.createPcj(sparql, fluoClient).getQueryId(); + queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId); periodicStorage.createPeriodicQuery(queryId, sparql); PeriodicNotification notification = PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod()) .timeUnit(periodicNode.getUnit()).build(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java index 8e8b1a2..27e06f0 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java @@ -126,11 +126,14 @@ public class PeriodicNotificationProvider { id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString()); break; case CONSTRUCT: - id = sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_NODE_ID).toString(); - id = id.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1]; + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString()); + break; + case PROJECTION: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString()); break; default: - throw new RuntimeException("Invalid NodeType."); + throw new IllegalArgumentException("Invalid node type"); + } return id; }
