Repository: incubator-rya
Updated Branches:
  refs/heads/master 6ce0b00b4 -> e387818ba


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 85c5030..6ecec02 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -36,7 +36,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -68,25 +68,25 @@ import com.google.common.collect.Sets;
  */
 public class QueryIT extends RyaExportITBase {
 
-    private enum ExporterType {Pcj, Periodic};
-    
+    private enum ExporterType {
+        Pcj, Periodic
+    };
+
     @Test
     public void optionalStatements() throws Exception {
         // A query that has optional statement patterns. This query is looking 
for all
         // people who have Law degrees and any BAR exams they have passed 
(though they
         // do not have to have passed any).
-        final String sparql =
-                "SELECT ?person ?exam " +
-                "WHERE {" +
-                    "?person <http://hasDegreeIn> <http://Law> . " +
-                    "OPTIONAL {?person <http://passedExam> ?exam } . " +
-                "}";
+        final String sparql = "SELECT ?person ?exam " + "WHERE {" + "?person 
<http://hasDegreeIn> <http://Law> . "
+                + "OPTIONAL {?person <http://passedExam> ?exam } . " + "}";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
         final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://hasDegreeIn";), vf.createURI("http://Computer Science")),
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://passedExam";), vf.createURI("http://Certified Ethical 
Hacker")),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://hasDegreeIn";),
+                        vf.createURI("http://Computer Science")),
+                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://passedExam";),
+                        vf.createURI("http://Certified Ethical Hacker")),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://hasDegreeIn";), vf.createURI("http://Law";)),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://passedExam";), vf.createURI("http://MBE";)),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://passedExam";), vf.createURI("http://BAR-Kansas";)),
@@ -121,16 +121,10 @@ public class QueryIT extends RyaExportITBase {
         // A query that find people who live in the USA, have been recruited 
by Geek Squad,
         // and are skilled with computers. The resulting binding set includes 
everybody who
         // was involved in the recruitment process.
-        final String sparql =
-                "SELECT ?recruiter ?candidate ?leader " +
-                "{ " +
-                  "?recruiter <http://recruiterFor> <http://GeekSquad>. " +
-                  "?candidate <http://skilledWith> <http://Computers>. " +
-                  "?candidate <http://livesIn> \"USA\". " +
-                  "?leader <http://leaderOf> <http://GeekSquad>. " +
-                  "?recruiter <http://talksTo> ?candidate. " +
-                  "?candidate <http://talksTo> ?leader. " +
-                "}";
+        final String sparql = "SELECT ?recruiter ?candidate ?leader " + "{ " + 
"?recruiter <http://recruiterFor> <http://GeekSquad>. "
+                + "?candidate <http://skilledWith> <http://Computers>. " + 
"?candidate <http://livesIn> \"USA\". "
+                + "?leader <http://leaderOf> <http://GeekSquad>. " + 
"?recruiter <http://talksTo> ?candidate. "
+                + "?candidate <http://talksTo> ?leader. " + "}";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -139,11 +133,11 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://leaderOf";), vf.createURI("http://GeekSquad";)),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://leaderOf";), vf.createURI("http://GeekSquad";)),
 
-                // Recruiters
+        // Recruiters
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://recruiterFor";), vf.createURI("http://GeekSquad";)),
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://recruiterFor";), vf.createURI("http://GeekSquad";)),
 
-                // Candidates
+        // Candidates
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
                 vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
@@ -155,7 +149,7 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://Ivan";), 
vf.createURI("http://skilledWith";), vf.createURI("http://Computers";)),
                 vf.createStatement(vf.createURI("http://Ivan";), 
vf.createURI("http://livesIn";), vf.createLiteral("USA")),
 
-                // Candidates the recruiters talk to.
+        // Candidates the recruiters talk to.
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://George";)),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://talksTo";), vf.createURI("http://Harry";)),
@@ -163,7 +157,7 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://talksTo";), vf.createURI("http://Frank";)),
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://talksTo";), vf.createURI("http://Ivan";)),
 
-                // Recruits that talk to leaders.
+        // Recruits that talk to leaders.
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
                 vf.createStatement(vf.createURI("http://George";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
                 vf.createStatement(vf.createURI("http://Harry";), 
vf.createURI("http://talksTo";), vf.createURI("http://Bob";)),
@@ -196,15 +190,9 @@ public class QueryIT extends RyaExportITBase {
 
     @Test
     public void withURIFilters() throws Exception {
-        final String sparql =
-                "SELECT ?customer ?worker ?city " +
-                "{ " +
-                  "FILTER(?customer = <http://Alice>) " +
-                  "FILTER(?city = <http://London>) " +
-                  "?customer <http://talksTo> ?worker. " +
-                  "?worker <http://livesIn> ?city. " +
-                  "?worker <http://worksAt> <http://Chipotle>. " +
-                "}";
+        final String sparql = "SELECT ?customer ?worker ?city " + "{ " + 
"FILTER(?customer = <http://Alice>) "
+                + "FILTER(?city = <http://London>) " + "?customer 
<http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. "
+                + "?worker <http://worksAt> <http://Chipotle>. " + "}";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -213,19 +201,19 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
 
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Charlie";)),
+        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Charlie";)),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
 
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://David";)),
+        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://David";)),
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
 
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
+        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://talksTo";), vf.createURI("http://Eve";)),
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://livesIn";), vf.createURI("http://Leeds";)),
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)),
 
-                vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
+        vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://talksTo";), vf.createURI("http://Alice";)),
                 vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://livesIn";), vf.createURI("http://London";)),
                 vf.createStatement(vf.createURI("http://Frank";), 
vf.createURI("http://worksAt";), vf.createURI("http://Chipotle";)));
 
@@ -256,13 +244,8 @@ public class QueryIT extends RyaExportITBase {
 
     @Test
     public void withNumericFilters() throws Exception {
-        final String sparql =
-                "SELECT ?name ?age " +
-                "{" +
-                  "FILTER(?age < 30) ." +
-                  "?name <http://hasAge> ?age." +
-                  "?name <http://playsSport> \"Soccer\" " +
-                "}";
+        final String sparql = "SELECT ?name ?age " + "{" + "FILTER(?age < 30) 
." + "?name <http://hasAge> ?age."
+                + "?name <http://playsSport> \"Soccer\" " + "}";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
@@ -273,7 +256,7 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://hasAge";), vf.createLiteral(16)),
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://hasAge";), vf.createLiteral(35)),
 
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
@@ -298,14 +281,8 @@ public class QueryIT extends RyaExportITBase {
 
     @Test
     public void withCustomFilters() throws Exception {
-        final String sparql =
-                "prefix ryafunc: <tag:rya.apache.org,2017:function#> " +
-                "SELECT ?name ?age "  +
-                "{ "  +
-                    "FILTER( ryafunc:isTeen(?age) ) . "  +
-                    "?name <http://hasAge> ?age . "  +
-                    "?name <http://playsSport> \"Soccer\" . "  +
-                "}";
+        final String sparql = "prefix ryafunc: 
<tag:rya.apache.org,2017:function#> " + "SELECT ?name ?age " + "{ "
+                + "FILTER( ryafunc:isTeen(?age) ) . " + "?name <http://hasAge> 
?age . " + "?name <http://playsSport> \"Soccer\" . " + "}";
 
         // Register a custom Filter.
         final Function fooFunction = new Function() {
@@ -335,10 +312,12 @@ public class QueryIT extends RyaExportITBase {
                             final double doubleValue = literal.doubleValue();
                             return BooleanLiteralImpl.valueOf(doubleValue < 
TEEN_THRESHOLD);
                         } else {
-                            throw new ValueExprEvaluationException("unexpected 
datatype (expect decimal/int or floating) for function operand: " + args[0]);
+                            throw new ValueExprEvaluationException(
+                                    "unexpected datatype (expect decimal/int 
or floating) for function operand: " + args[0]);
                         }
                     } else {
-                        throw new ValueExprEvaluationException("unexpected 
input value (expect non-null and numeric) for function: " + args[0]);
+                        throw new ValueExprEvaluationException(
+                                "unexpected input value (expect non-null and 
numeric) for function: " + args[0]);
                     }
                 } else {
                     throw new ValueExprEvaluationException("unexpected input 
value (expect literal) for function: " + args[0]);
@@ -358,7 +337,7 @@ public class QueryIT extends RyaExportITBase {
                 vf.createStatement(vf.createURI("http://David";), 
vf.createURI("http://hasAge";), vf.createLiteral(16)),
                 vf.createStatement(vf.createURI("http://Eve";), 
vf.createURI("http://hasAge";), vf.createLiteral(35)),
 
-                vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
+        vf.createStatement(vf.createURI("http://Alice";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
                 vf.createStatement(vf.createURI("http://Bob";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Basketball")),
                 vf.createStatement(vf.createURI("http://Charlie";), 
vf.createURI("http://playsSport";), vf.createLiteral("Soccer")),
@@ -387,29 +366,32 @@ public class QueryIT extends RyaExportITBase {
         final String dtPredUri = "http://www.w3.org/2006/time#inXSDDateTime";;
         final String dtPred = "<" + dtPredUri + ">";
 
-        final String sparql =
-                "PREFIX time: <http://www.w3.org/2006/time#> " +
-                "PREFIX xml: <http://www.w3.org/2001/XMLSchema#> "  +
-                "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " +
-                "SELECT ?event ?time "  +
-                "WHERE { "  +
-                    "?event " + dtPred + " ?time . " +
-                    "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) 
" +
-                "}";
+        final String sparql = "PREFIX time: <http://www.w3.org/2006/time#> " + 
"PREFIX xml: <http://www.w3.org/2001/XMLSchema#> "
+                + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " + "SELECT 
?event ?time " + "WHERE { " + "?event " + dtPred + " ?time . "
+                + "FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) " 
+ "}";
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type";), 
vf.createURI("http://www.w3.org/2006/time#Instant";)),
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 
1 second
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))),
 // 2 second
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 
3 seconds
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 
4 seconds
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 
seconds
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))),
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))),
-                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri), 
vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z"))));
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type";),
+                        vf.createURI("http://www.w3.org/2006/time#Instant";)),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 
1 second
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))),
 // 2 second
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 
3 seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:04-08:00"))), // 
4 seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 
seconds
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))),
+                vf.createStatement(vf.createURI("http://eventz";), 
vf.createURI(dtPredUri),
+                        
vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z"))));
 
         // Create the expected results of the SPARQL query once the PCJ has 
been computed.
         final Set<BindingSet> expectedResults = new HashSet<>();
@@ -442,97 +424,99 @@ public class QueryIT extends RyaExportITBase {
         // Verify the end results of the query match the expected results.
         runTest(sparql, statements, expectedResults, ExporterType.Pcj);
     }
-    
-    
+
     @Test
     public void periodicQueryTestWithoutAggregation() throws Exception {
-        String query = "prefix function: <http://org.apache.rya/function#> " 
//n
-                + "prefix time: <http://www.w3.org/2006/time#> " //n
-                + "select ?id where {" //n
-                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
-                + "?obs <uri:hasTime> ?time. " //n
-                + "?obs <uri:hasId> ?id }"; //n
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id }"; // n
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         ZonedDateTime time = ZonedDateTime.now();
         long currentTime = time.toInstant().toEpochMilli();
-        
+
         ZonedDateTime zTime1 = time.minusMinutes(30);
         String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime2 = zTime1.minusMinutes(30);
         String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime3 = zTime2.minusMinutes(30);
         String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime4 = zTime3.minusMinutes(30);
         String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
                 vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
                 vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4"))
-                );
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
 
         // Create the expected results of the SPARQL query once the PCJ has 
been computed.
         final Set<BindingSet> expectedResults = new HashSet<>();
 
         long period = 1800000;
-        long binId = (currentTime/period)*period;
-        
+        long binId = (currentTime / period) * period;
+
         MapBindingSet bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period));
         expectedResults.add(bs);
 
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
@@ -541,183 +525,189 @@ public class QueryIT extends RyaExportITBase {
         // Verify the end results of the query match the expected results.
         runTest(query, statements, expectedResults, ExporterType.Periodic);
     }
-    
-    
+
     @Test
     public void periodicQueryTestWithAggregation() throws Exception {
-        String query = "prefix function: <http://org.apache.rya/function#> " 
//n
-                + "prefix time: <http://www.w3.org/2006/time#> " //n
-                + "select (count(?obs) as ?total) where {" //n
-                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
-                + "?obs <uri:hasTime> ?time. " //n
-                + "?obs <uri:hasId> ?id }"; //n
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id }"; // n
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         ZonedDateTime time = ZonedDateTime.now();
         long currentTime = time.toInstant().toEpochMilli();
-        
+
         ZonedDateTime zTime1 = time.minusMinutes(30);
         String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime2 = zTime1.minusMinutes(30);
         String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime3 = zTime2.minusMinutes(30);
         String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime4 = zTime3.minusMinutes(30);
         String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
                 vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
                 vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4"))
-                );
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
 
         // Create the expected results of the SPARQL query once the PCJ has 
been computed.
         final Set<BindingSet> expectedResults = new HashSet<>();
 
         long period = 1800000;
-        long binId = (currentTime/period)*period;
-        
+        long binId = (currentTime / period) * period;
+
         MapBindingSet bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("3", XMLSchema.INTEGER));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period));
         expectedResults.add(bs);
 
-
         // Verify the end results of the query match the expected results.
         runTest(query, statements, expectedResults, ExporterType.Periodic);
     }
-    
+
     @Test
     public void periodicQueryTestWithAggregationAndGroupBy() throws Exception {
-        String query = "prefix function: <http://org.apache.rya/function#> " 
//n
-                + "prefix time: <http://www.w3.org/2006/time#> " //n
-                + "select ?id (count(?obs) as ?total) where {" //n
-                + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n
-                + "?obs <uri:hasTime> ?time. " //n
-                + "?obs <uri:hasId> ?id } group by ?id"; //n
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
 
         // Create the Statements that will be loaded into Rya.
         final ValueFactory vf = new ValueFactoryImpl();
         final DatatypeFactory dtf = DatatypeFactory.newInstance();
         ZonedDateTime time = ZonedDateTime.now();
         long currentTime = time.toInstant().toEpochMilli();
-        
+
         ZonedDateTime zTime1 = time.minusMinutes(30);
         String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime2 = zTime1.minusMinutes(30);
         String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime3 = zTime2.minusMinutes(30);
         String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         ZonedDateTime zTime4 = zTime3.minusMinutes(30);
         String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
-        
+
         final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
                 vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
                 vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
                 vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
                 vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
                 vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"), 
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2"))
-                );
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")));
 
         // Create the expected results of the SPARQL query once the PCJ has 
been computed.
         final Set<BindingSet> expectedResults = new HashSet<>();
 
         long period = 1800000;
-        long binId = (currentTime/period)*period;
-        
+        long binId = (currentTime / period) * period;
+
         MapBindingSet bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
         bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 2 * period));
         expectedResults.add(bs);
-        
+
         bs = new MapBindingSet();
         bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
         bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + 3 * period));
         expectedResults.add(bs);
 
         // Verify the end results of the query match the expected results.
@@ -725,10 +715,190 @@ public class QueryIT extends RyaExportITBase {
     }
     
     
+    @Test
+    public void nestedPeriodicQueryTestWithAggregationAndGroupBy() throws 
Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?location ?total "
+                + "where { Filter(?total > 1) {"
+                + "select ?location (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_4")),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_1")),
+                vf.createStatement(vf.createURI("urn:obs_6"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_6"), 
vf.createURI("uri:hasLoc"), vf.createLiteral("loc_2")));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime / period) * period;
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createLiteral("loc_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createLiteral("loc_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createLiteral("loc_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
     
-    
+    @Test
+    public void nestedJoinPeriodicQueryWithAggregationAndGroupBy() throws 
Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?location ?total ?population "
+                + "where { Filter(?total > 1)"
+                + "?location <uri:hasPopulation> ?population . {"
+                + "select ?location (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_1")),
+                vf.createStatement(vf.createURI("uri:loc_1"), 
vf.createURI("uri:hasPopulation"), vf.createLiteral(3500)),
+                vf.createStatement(vf.createURI("uri:loc_2"), 
vf.createURI("uri:hasPopulation"), vf.createLiteral(8000)),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_4")),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_1")),
+                vf.createStatement(vf.createURI("urn:obs_6"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_6"), 
vf.createURI("uri:hasLoc"), vf.createURI("uri:loc_2")));
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been computed.
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime / period) * period;
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createURI("uri:loc_1"));
+        bs.addBinding("population", vf.createLiteral("3500", 
XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createURI("uri:loc_2"));
+        bs.addBinding("population", vf.createLiteral("8000", 
XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId));
+        expectedResults.add(bs);
+
 
-    public void runTest(final String sparql, final Collection<Statement> 
statements, final Collection<BindingSet> expectedResults, ExporterType type ) 
throws Exception {
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("location", vf.createURI("uri:loc_2"));
+        bs.addBinding("population", vf.createLiteral("8000", 
XMLSchema.INTEGER));
+        bs.addBinding("periodicBinId", vf.createLiteral(binId + period));
+        expectedResults.add(bs);
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
+
+    @Test(expected= IllegalArgumentException.class)
+    public void nestedConstructPeriodicQueryWithAggregationAndGroupBy() throws 
Exception {
+        String query = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "construct{?location a <uri:highObservationArea> } "
+                + "where { Filter(?total > 1)"
+                + "?location <uri:hasPopulation> ?population . {"
+                + "select ?location (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasLoc> ?location } group by ?location }}"; // n
+
+
+        final Collection<Statement> statements = Sets.newHashSet();
+        final Set<BindingSet> expectedResults = new HashSet<>();
+
+        // Verify the end results of the query match the expected results.
+        runTest(query, statements, expectedResults, ExporterType.Periodic);
+    }
+
+    public void runTest(final String sparql, final Collection<Statement> 
statements, final Collection<BindingSet> expectedResults,
+            ExporterType type) throws Exception {
         requireNonNull(sparql);
         requireNonNull(statements);
         requireNonNull(expectedResults);
@@ -754,9 +924,10 @@ public class QueryIT extends RyaExportITBase {
             PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(accumuloConn, getRyaInstanceName());
             String periodicId = periodicStorage.createPeriodicQuery(sparql);
             try (FluoClient fluo = new 
FluoClientImpl(super.getFluoConfiguration())) {
-                new CreatePcj().createPcj(periodicId, sparql, fluo);
+                new CreateFluoPcj().createPcj(periodicId, sparql, fluo);
             }
             addStatementsAndWait(statements);
+            
             final Set<BindingSet> results = Sets.newHashSet();
             try (CloseableIterator<BindingSet> resultIter = 
periodicStorage.listResults(periodicId, Optional.empty())) {
                 while (resultIter.hasNext()) {
@@ -767,9 +938,9 @@ public class QueryIT extends RyaExportITBase {
             break;
         }
     }
-    
+
     private void addStatementsAndWait(final Collection<Statement> statements) 
throws RepositoryException, Exception {
-     // Write the data to Rya.
+        // Write the data to Rya.
         final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
         ryaConn.begin();
         ryaConn.add(statements);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 12c69ca..15ff1b4 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -110,7 +110,7 @@ public class RyaExportIT extends RyaExportITBase {
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index e6d287e..5cd3ab1 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -28,7 +28,7 @@ import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -99,7 +99,7 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
             // Verify the end results of the query match the expected results.
             super.getMiniFluo().waitForObservers();
@@ -165,7 +165,7 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
             super.getMiniFluo().waitForObservers();
 
@@ -236,7 +236,7 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
             super.getMiniFluo().waitForObservers();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index 3f51311..e83a894 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -60,7 +60,7 @@ public class StreamingTestIT extends RyaExportITBase {
                final String pcjId = pcjStorage.createPcj(sparql);
 
                // Task the Fluo app with the PCJ.
-               new CreatePcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
+               new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
                // Add Statements to the Fluo app.
                log.info("Adding Join Pairs...");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
index ab42e89..eab99b8 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
@@ -32,7 +32,7 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
@@ -107,7 +107,7 @@ public class HistoricStreamingVisibilityIT extends 
RyaExportITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
         }
 
         // Verify the end results of the query match the expected results.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index dc2f859..2497793 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -49,7 +49,7 @@ import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
@@ -220,7 +220,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
 
         try( final FluoClient fluoClient = FluoFactory.newClient( 
super.getFluoConfiguration() )) {
             // Create the PCJ in Fluo.
-            new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
+            new CreateFluoPcj().withRyaIntegration(pcjId, rootStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             for(final RyaStatement statement : streamedTriples.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 85da422..c828a20 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.fluo.recipes.test.FluoITHelper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -59,6 +60,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import 
org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
@@ -72,7 +74,6 @@ import org.openrdf.model.Statement;
 import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
 
-
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
@@ -117,6 +118,7 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
         observers.add(new ObserverSpecification(JoinObserver.class.getName()));
         observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new 
ObserverSpecification(AggregationObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(ProjectionObserver.class.getName()));
 
         // Configure the export observer to export new PCJ results to the mini
         // accumulo cluster.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
index 6feadff..9c5732f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/RyaExportITBase.java
@@ -32,6 +32,7 @@ import 
org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
@@ -59,6 +60,7 @@ public class RyaExportITBase extends FluoITBase {
         observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new 
ObserverSpecification(AggregationObserver.class.getName()));
         observers.add(new 
ObserverSpecification(PeriodicQueryObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(ProjectionObserver.class.getName()));
 
         // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
index 1902248..4d1bc75 100644
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
 import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
 import 
org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
@@ -49,11 +50,11 @@ public class PeriodicNotificationProviderIT extends 
AccumuloExportITBase {
         BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
         PeriodicNotificationCoordinatorExecutor coord = new 
PeriodicNotificationCoordinatorExecutor(2, notifications);
         PeriodicNotificationProvider provider = new 
PeriodicNotificationProvider();
-        CreatePcj pcj = new CreatePcj();
+        CreateFluoPcj pcj = new CreateFluoPcj();
         
         String id = null;
         try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
-            id = pcj.createPcj(sparql, fluo);
+            id = pcj.createPcj(sparql, fluo).getQueryId();
             provider.processRegisteredNotifications(coord, fluo.newSnapshot());
         }
         
@@ -61,7 +62,7 @@ public class PeriodicNotificationProviderIT extends 
AccumuloExportITBase {
         Assert.assertEquals(5000, notification.getInitialDelay());
         Assert.assertEquals(15000, notification.getPeriod());
         Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
-        Assert.assertEquals(id, notification.getId());
+        Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), 
notification.getId());
         
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
index 7f71b52..6aade52 100644
--- 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java
@@ -21,8 +21,9 @@ package org.apache.rya.periodic.notification.api;
 import java.util.Optional;
 
 import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
 import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
@@ -31,6 +32,8 @@ import 
org.apache.rya.periodic.notification.notification.PeriodicNotification;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.algebra.evaluation.function.Function;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Object that creates a Periodic Query.  A Periodic Query is any query
  * requesting periodic updates about events that occurred within a given
@@ -79,8 +82,9 @@ public class CreatePeriodicQuery {
             Optional<PeriodicQueryNode> optNode = 
PeriodicQueryUtil.getPeriodicNode(sparql);
             if(optNode.isPresent()) {
                 PeriodicQueryNode periodicNode = optNode.get();
-                CreatePcj createPcj = new CreatePcj();
-                String queryId = createPcj.createPcj(sparql, fluoClient);
+                CreateFluoPcj createPcj = new CreateFluoPcj();
+                String queryId = createPcj.createPcj(sparql, 
fluoClient).getQueryId();
+                queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
                 periodicStorage.createPeriodicQuery(queryId, sparql);
                 PeriodicNotification notification = 
PeriodicNotification.builder().id(queryId).period(periodicNode.getPeriod())
                         .timeUnit(periodicNode.getUnit()).build();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
index 8e8b1a2..27e06f0 100644
--- 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
@@ -126,11 +126,14 @@ public class PeriodicNotificationProvider {
             id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());
             break;
         case CONSTRUCT:
-            id = sx.get(Bytes.of(nodeId), 
FluoQueryColumns.CONSTRUCT_NODE_ID).toString();
-            id = id.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1];
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString());
+            break;
+        case PROJECTION:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString());
             break;
         default:
-            throw new RuntimeException("Invalid NodeType.");
+            throw new IllegalArgumentException("Invalid node type");
+        
         }
         return id;
     }

Reply via email to