Repository: incubator-rya
Updated Branches:
  refs/heads/master 646d21b4e -> 60090ad52


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
index 3ed1844..452dd27 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
@@ -18,18 +18,22 @@
  */
 package org.apache.rya.indexing.pcj.fluo;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.recipes.test.AccumuloExportITBase;
@@ -40,7 +44,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.client.Install.InstallConfiguration;
 import org.apache.rya.api.client.RyaClient;
 import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
@@ -48,7 +54,9 @@ import 
org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import 
org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -60,8 +68,11 @@ import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.sail.SailRepositoryConnection;
 import org.openrdf.sail.Sail;
 
+
 import kafka.admin.AdminUtils;
 import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
@@ -74,7 +85,8 @@ import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
 
 /**
- * The base Integration Test class used for Fluo applications that export to a 
Kakfa topic.
+ * The base Integration Test class used for Fluo applications that export to a
+ * Kakfa topic.
  */
 public class KafkaExportITBase extends AccumuloExportITBase {
 
@@ -88,8 +100,10 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
     private EmbeddedZookeeper zkServer;
     private ZkClient zkClient;
 
-    // The Rya instance statements are written to that will be fed into the 
Fluo app.
+    // The Rya instance statements are written to that will be fed into the 
Fluo
+    // app.
     private RyaSailRepository ryaSailRepo = null;
+    private AccumuloRyaDAO dao = null;
 
     /**
      * Add info about the Kafka queue/topic to receive the export.
@@ -104,7 +118,8 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
         observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
         observers.add(new 
ObserverSpecification(AggregationObserver.class.getName()));
 
-        // Configure the export observer to export new PCJ results to the mini 
accumulo cluster.
+        // Configure the export observer to export new PCJ results to the mini
+        // accumulo cluster.
         final HashMap<String, String> exportParams = new HashMap<>();
 
         final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(exportParams);
@@ -114,11 +129,29 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST 
+ ":" + BROKERPORT);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
         kafkaParams.addAllProducerConfig(producerConfig);
 
         final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
+        
+        //create construct query observer and tell it not to export to Kafka
+        //it will only add results back into Fluo
+        HashMap<String, String> constructParams = new HashMap<>();
+        final KafkaExportParameters kafkaConstructParams = new 
KafkaExportParameters(constructParams);
+        kafkaConstructParams.setExportToKafka(true);
+        
+        // Configure the Kafka Producer
+        final Properties constructProducerConfig = new Properties();
+        constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + BROKERPORT);
+        
constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        
constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
RyaSubGraphKafkaSerDe.class.getName());
+        kafkaConstructParams.addAllProducerConfig(constructProducerConfig);
+
+        final ObserverSpecification constructExportObserverConfig = new 
ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                constructParams);
+        observers.add(constructExportObserverConfig);
 
         // Add the observers to the Fluo Configuration.
         super.getFluoConfiguration().addObservers(observers);
@@ -150,24 +183,24 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
     }
 
     @After
-    public void teardownRya() throws Exception {
+    public void teardownRya() {
         final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
         final String instanceName = cluster.getInstanceName();
         final String zookeepers = cluster.getZooKeepers();
 
         // Uninstall the instance of Rya.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
+                new AccumuloConnectionDetails(ACCUMULO_USER, 
ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
                 super.getAccumuloConnector());
 
-        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
-
-        // Shutdown the repo.
-        ryaSailRepo.shutDown();
+        try {
+            ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
+            // Shutdown the repo.
+            if(ryaSailRepo != null) {ryaSailRepo.shutDown();}
+            if(dao != null ) {dao.destroy();}
+        } catch (Exception e) {
+            System.out.println("Encountered the following Exception when 
shutting down Rya: " + e.getMessage());
+        }
     }
 
     private void installRyaInstance() throws Exception {
@@ -177,26 +210,18 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
 
         // Install the Rya instance to the mini accumulo cluster.
         final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
+                new AccumuloConnectionDetails(ACCUMULO_USER, 
ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers),
                 super.getAccumuloConnector());
 
-        ryaClient.getInstall().install(RYA_INSTANCE_NAME, 
InstallConfiguration.builder()
-                .setEnableTableHashPrefix(false)
-                .setEnableFreeTextIndex(false)
-                .setEnableEntityCentricIndex(false)
-                .setEnableGeoIndex(false)
-                .setEnableTemporalIndex(false)
-                .setEnablePcjIndex(true)
-                .setFluoPcjAppName( 
super.getFluoConfiguration().getApplicationName() )
-                .build());
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME,
+                
InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false)
+                        
.setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true)
+                        
.setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build());
 
         // Connect to the Rya instance that was just installed.
         final AccumuloRdfConfiguration conf = makeConfig(instanceName, 
zookeepers);
         final Sail sail = RyaSailFactory.getInstance(conf);
+        dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf);
         ryaSailRepo = new RyaSailRepository(sail);
     }
 
@@ -211,15 +236,12 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
         
conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
         conf.setAuths("");
 
-
         // PCJ configuration information.
         conf.set(ConfigUtils.USE_PCJ, "true");
         conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
         conf.set(ConfigUtils.FLUO_APP_NAME, 
super.getFluoConfiguration().getApplicationName());
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
-                
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
-                
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, 
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, 
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
 
         conf.setDisplayQueryPlan(true);
 
@@ -227,20 +249,29 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
     }
 
     /**
-     * @return A {@link RyaSailRepository} that is connected to the Rya 
instance that statements are loaded into.
+     * @return A {@link RyaSailRepository} that is connected to the Rya 
instance
+     *         that statements are loaded into.
      */
     protected RyaSailRepository getRyaSailRepository() throws Exception {
         return ryaSailRepo;
     }
 
     /**
+     * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct
+     *         visibilities can be added to the Rya Instance
+     */
+    protected AccumuloRyaDAO getRyaDAO() {
+        return dao;
+    }
+
+    /**
      * Close all the Kafka mini server and mini-zookeeper
      */
     @After
     public void teardownKafka() {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
+        if(kafkaServer != null) {kafkaServer.shutdown();}
+        if(zkClient != null) {zkClient.close();}
+        if(zkServer != null) {zkServer.shutdown();}
     }
 
     /**
@@ -257,7 +288,7 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
         // setup producer
         final Properties producerProps = new Properties();
         producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
-        
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+        producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.IntegerSerializer");
         producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         final KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<>(producerProps);
 
@@ -266,7 +297,7 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
         consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
         consumerProps.setProperty("group.id", "group0");
         consumerProps.setProperty("client.id", "consumer0");
-        
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+        consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
         consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
         // to make sure the consumer starts from the beginning of the topic
@@ -296,8 +327,10 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
         consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + BROKERPORT);
         consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer0");
-        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
+        consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.IntegerDeserializer");
+        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                
"org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer");
 
         // to make sure the consumer starts from the beginning of the topic
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -306,4 +339,32 @@ public class KafkaExportITBase extends 
AccumuloExportITBase {
         consumer.subscribe(Arrays.asList(TopicName));
         return consumer;
     }
+
+    protected String loadData(final String sparql, final Collection<Statement> 
statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+
+        // Register the PCJ with Rya.
+        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
+        final Connector accumuloConn = super.getAccumuloConnector();
+
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(ACCUMULO_USER,
+                ACCUMULO_PASSWORD.toCharArray(), 
accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
+
+        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+
+        // Write the data to Rya.
+        final SailRepositoryConnection ryaConn = 
getRyaSailRepository().getConnection();
+        ryaConn.begin();
+        ryaConn.add(statements);
+        ryaConn.commit();
+        ryaConn.close();
+
+        // Wait for the Fluo application to finish computing the end result.
+        super.getMiniFluo().waitForObservers();
+
+        // The PCJ Id is the topic name the results will be written to.
+        return pcjId;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
index 84b6343..4eab0f6 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -26,8 +26,10 @@ import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import 
org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
@@ -68,6 +70,13 @@ public class RyaExportITBase extends FluoITBase {
 
         final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
+        
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(false);
+
+        final ObserverSpecification constructExportObserverConfig = new 
ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                exportParams);
+        observers.add(constructExportObserverConfig);
 
         // Add the observers to the Fluo Configuration.
         super.getFluoConfiguration().addObservers(observers);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index d56c23a..d19646e 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -106,7 +106,7 @@ public class GetQueryReportIT extends RyaExportITBase {
 
             final FluoQuery fluoQuery = report.getFluoQuery();
 
-            final String queryNodeId = 
fluoQuery.getQueryMetadata().getNodeId();
+            final String queryNodeId = 
fluoQuery.getQueryMetadata().get().getNodeId();
             expectedCounts.put(queryNodeId, BigInteger.valueOf(8));
 
             final String filterNodeId = 
fluoQuery.getFilterMetadata().iterator().next().getNodeId();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
index 082f46d..accabbf 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java
@@ -20,22 +20,30 @@ package org.apache.rya.indexing.pcj.fluo.app.query;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
+
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
 import 
org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.junit.Test;
 import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
 import org.openrdf.repository.RepositoryException;
 
+import com.google.common.base.Optional;
+
 /**
  * Integration tests the methods of {@link FluoQueryMetadataDAO}.
  */
@@ -160,6 +168,42 @@ public class FluoQueryMetadataDAOIT extends 
RyaExportITBase {
             assertEquals(originalMetadata, storedMetdata);
         }
     }
+    
+    @Test
+    public void constructQueryMetadataTest() throws MalformedQueryException {
+        
+        String query = "select ?x ?y where {?x <uri:p1> ?y. ?y <uri:p2> 
<uri:o1> }";
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq = parser.parseQuery(query, null);
+        List<StatementPattern> patterns = 
StatementPatternCollector.process(pq.getTupleExpr());
+        
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final ConstructQueryMetadata.Builder builder = 
ConstructQueryMetadata.builder();
+        builder.setNodeId("nodeId");
+        builder.setSparql(query);
+        builder.setChildNodeId("childNodeId");
+        builder.setConstructGraph(new ConstructGraph(patterns));
+        final ConstructQueryMetadata originalMetadata = builder.build();
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalMetadata);
+                tx.commit();
+            }
+
+            // Read it from the Fluo table.
+            ConstructQueryMetadata storedMetdata = null;
+            try(Snapshot sx = fluoClient.newSnapshot()) {
+                storedMetdata = dao.readConstructQueryMetadata(sx, "nodeId");
+            }
+
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalMetadata, storedMetdata);
+        }
+    }
 
     @Test
     public void aggregationMetadataTest_withGroupByVarOrders() {
@@ -242,6 +286,9 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase 
{
         final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
         final FluoQuery originalQuery = new 
SparqlFluoQueryBuilder().make(query, new NodeIds());
 
+        assertEquals(QueryType.Projection, originalQuery.getQueryType());
+        assertEquals(false, 
originalQuery.getConstructQueryMetadata().isPresent());
+        
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Write it to the Fluo table.
             try(Transaction tx = fluoClient.newTransaction()) {
@@ -249,12 +296,51 @@ public class FluoQueryMetadataDAOIT extends 
RyaExportITBase {
                 tx.commit();
             }
 
-            // Read it from the Fluo table.
-            FluoQuery storedQuery = null;
-            try(Snapshot sx = fluoClient.newSnapshot()) {
-                storedQuery = dao.readFluoQuery(sx, 
originalQuery.getQueryMetadata().getNodeId());
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, 
originalQuery.getQueryMetadata().get().getNodeId());
+        }
+
+            // Ensure the deserialized object is the same as the serialized 
one.
+            assertEquals(originalQuery, storedQuery);
+        }
+    }
+    
+    @Test
+    public void fluoConstructQueryTest() throws MalformedQueryException {
+        final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+        // Create the object that will be serialized.
+        final String sparql =
+                "CONSTRUCT { ?customer <http://travelsTo> <http://England> .  
?customer <http://friendsWith> ?worker }" +
+                "WHERE { " +
+                  "FILTER(?customer = <http://Alice>) " +
+                  "FILTER(?city = <http://London>) " +
+                  "?customer <http://talksTo> ?worker. " +
+                  "?worker <http://livesIn> ?city. " +
+                  "?worker <http://worksAt> <http://Chipotle>. " +
+                "}";
+
+        final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+        final FluoQuery originalQuery = new 
SparqlFluoQueryBuilder().make(query, new NodeIds());
+        
+        assertEquals(QueryType.Construct, originalQuery.getQueryType());
+        assertEquals(false, originalQuery.getQueryMetadata().isPresent());
+
+        try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
+            // Write it to the Fluo table.
+            try(Transaction tx = fluoClient.newTransaction()) {
+                dao.write(tx, originalQuery);
+                tx.commit();
             }
 
+        // Read it from the Fluo table.
+        FluoQuery storedQuery = null;
+        try(Snapshot sx = fluoClient.newSnapshot()) {
+            storedQuery = dao.readFluoQuery(sx, 
originalQuery.getConstructQueryMetadata().get().getNodeId());
+        }
+
             // Ensure the deserialized object is the same as the serialized 
one.
             assertEquals(originalQuery, storedQuery);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 349d391..414fa70 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -121,6 +121,7 @@ public class CreateDeleteIT extends RyaExportITBase {
             assertEquals(0, empty_rows.size());
         }
     }
+    
 
     private String loadData(final String sparql, final Collection<Statement> 
statements) throws Exception {
         requireNonNull(sparql);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
index 7fa28ab..219e079 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -29,14 +29,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -47,7 +42,6 @@ import org.openrdf.model.impl.ValueFactoryImpl;
 import org.openrdf.model.vocabulary.XMLSchema;
 import org.openrdf.query.BindingSet;
 import org.openrdf.query.impl.MapBindingSet;
-import org.openrdf.repository.sail.SailRepositoryConnection;
 
 import com.google.common.collect.Sets;
 
@@ -427,38 +421,10 @@ public class KafkaExportIT extends KafkaExportITBase {
 
         // Verify the end results of the query match the expected results.
         final Set<VisibilityBindingSet> results = readGroupedResults(pcjId, 
new VariableOrder("type", "location"));
+        System.out.println(results);
         assertEquals(expectedResults, results);
     }
 
-    private String loadData(final String sparql, final Collection<Statement> 
statements) throws Exception {
-        requireNonNull(sparql);
-        requireNonNull(statements);
-
-        // Register the PCJ with Rya.
-        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
-        final Connector accumuloConn = super.getAccumuloConnector();
-
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                accInstance.getInstanceName(),
-                accInstance.getZooKeepers()), accumuloConn);
-
-        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
-
-        // Write the data to Rya.
-        final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
-        ryaConn.begin();
-        ryaConn.add(statements);
-        ryaConn.commit();
-        ryaConn.close();
-
-        // Wait for the Fluo application to finish computing the end result.
-        super.getMiniFluo().waitForObservers();
-
-        // The PCJ Id is the topic name the results will be written to.
-        return pcjId;
-    }
 
     private Set<VisibilityBindingSet> readAllResults(final String pcjId) 
throws Exception {
         requireNonNull(pcjId);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
new file mode 100644
index 0000000..c8167c7
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaRyaSubGraphExportIT.java
@@ -0,0 +1,352 @@
+package org.apache.rya.indexing.pcj.fluo.integration;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.ConstructGraphTestUtils;
+import org.apache.rya.indexing.pcj.fluo.KafkaExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
+import 
org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+import com.google.common.collect.Sets;
+
+public class KafkaRyaSubGraphExportIT extends KafkaExportITBase {
+
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+
+    /**
+     * Add info about the Kafka queue/topic to receive the export.
+     *
+     * @see 
org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+     */
+    @Override
+    protected void preFluoInitHook() throws Exception {
+        // Setup the observers that will be used by the Fluo PCJ Application.
+        final List<ObserverSpecification> observers = new ArrayList<>();
+        observers.add(new 
ObserverSpecification(TripleObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(StatementPatternObserver.class.getName()));
+        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(FilterObserver.class.getName()));
+        observers.add(new 
ObserverSpecification(AggregationObserver.class.getName()));
+
+        // Configure the export observer to export new PCJ results to the mini
+        // accumulo cluster.
+        final HashMap<String, String> exportParams = new HashMap<>();
+
+        final KafkaExportParameters kafkaParams = new 
KafkaExportParameters(exportParams);
+        kafkaParams.setExportToKafka(true);
+
+        // Configure the Kafka Producer
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST 
+ ":" + BROKERPORT);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
RyaSubGraphKafkaSerDe.class.getName());
+        kafkaParams.addAllProducerConfig(producerConfig);
+
+        final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(ConstructQueryResultObserver.class.getName(),
+                exportParams);
+        observers.add(exportObserverConfig);
+
+        // Add the observers to the Fluo Configuration.
+        super.getFluoConfiguration().addObservers(observers);
+    }
+
+    @Test
+    public void basicConstructQuery() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . 
?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> 
?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:Joe"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")),
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:livesIn"), vf.createURI("urn:London")),
+                vf.createStatement(vf.createURI("urn:Bob"), 
vf.createURI("urn:worksAt"), vf.createURI("urn:burgerShack")));
+
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadStatements(sparql, statements);
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+        RyaSubGraph subGraph = new RyaSubGraph(pcjId);
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        // if no visibility indicated, then visibilities set to empty byte in
+        // Fluo - they are null by default in RyaStatement
+        // need to set visibility to empty byte so that RyaStatement's equals
+        // will return true
+        statement1.setColumnVisibility(new byte[0]);
+        statement2.setColumnVisibility(new byte[0]);
+
+        Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement1, 
statement2));
+        subGraph.setStatements(stmnts);
+        expectedResults.add(subGraph);
+
+        
ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, 
results);
+    }
+
+    @Test
+    public void basicConstructQueryWithVis() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . 
?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> 
?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, 
Arrays.asList(statement1, statement2, statement3));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has 
been
+        // computed.
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+        RyaSubGraph subGraph = new RyaSubGraph(pcjId);
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        // if no visibility indicated, then visibilities set to empty byte in
+        // Fluo - they are null by default in RyaStatement
+        // need to set visibility to empty byte so that RyaStatement's equals
+        // will return true
+        statement4.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement5.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+
+        Set<RyaStatement> stmnts = new HashSet<>(Arrays.asList(statement4, 
statement5));
+        subGraph.setStatements(stmnts);
+        expectedResults.add(subGraph);
+
+        
ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, 
results);
+    }
+
+    
+    @Test
+    public void constructQueryWithVisAndMultipleSubGraphs() throws Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { ?customer <urn:travelsTo> ?city . 
?customer <urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> 
?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new 
RyaURI("urn:talksTo"), new RyaURI("urn:Evan"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new 
RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new 
RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        statement4.setColumnVisibility("A&B".getBytes("UTF-8"));
+        statement5.setColumnVisibility("B".getBytes("UTF-8"));
+        statement6.setColumnVisibility("C".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, 
Arrays.asList(statement1, statement2, statement3, statement4, statement5, 
statement6));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has 
been
+        // computed.
+        RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), 
new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan"));
+        statement7.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement8.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement9.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+        statement10.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+        RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, 
statement8));
+        subGraph1.setStatements(stmnts1);
+        expectedResults.add(subGraph1);
+        
+        RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, 
statement10));
+        subGraph2.setStatements(stmnts2);
+        expectedResults.add(subGraph2);
+
+        
ConstructGraphTestUtils.subGraphsEqualIgnoresTimestamp(expectedResults, 
results);
+    }
+    
+    @Test
+    public void constructQueryWithBlankNodesAndMultipleSubGraphs() throws 
Exception {
+        // A query that groups what is aggregated by one of the keys.
+        final String sparql = "CONSTRUCT { _:b <urn:travelsTo> ?city . _:b 
<urn:friendsWith> ?worker }" + "WHERE { "
+                + "?customer <urn:talksTo> ?worker. " + "?worker <urn:livesIn> 
?city. " + "?worker <urn:worksAt> <urn:burgerShack>. " + "}";
+
+        // Create the Statements that will be loaded into Rya.
+        RyaStatement statement1 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:talksTo"), new RyaURI("urn:Bob"));
+        RyaStatement statement2 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:livesIn"), new RyaURI("urn:London"));
+        RyaStatement statement3 = new RyaStatement(new RyaURI("urn:Bob"), new 
RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        RyaStatement statement4 = new RyaStatement(new RyaURI("urn:John"), new 
RyaURI("urn:talksTo"), new RyaURI("urn:Evan"));
+        RyaStatement statement5 = new RyaStatement(new RyaURI("urn:Evan"), new 
RyaURI("urn:livesIn"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement6 = new RyaStatement(new RyaURI("urn:Evan"), new 
RyaURI("urn:worksAt"), new RyaURI("urn:burgerShack"));
+        statement1.setColumnVisibility("U&W".getBytes("UTF-8"));
+        statement2.setColumnVisibility("V".getBytes("UTF-8"));
+        statement3.setColumnVisibility("W".getBytes("UTF-8"));
+        statement4.setColumnVisibility("A&B".getBytes("UTF-8"));
+        statement5.setColumnVisibility("B".getBytes("UTF-8"));
+        statement6.setColumnVisibility("C".getBytes("UTF-8"));
+        
+        // Create the PCJ in Fluo and load the statements into Rya.
+        final String pcjId = loadRyaStatements(sparql, 
Arrays.asList(statement1, statement2, statement3, statement4, statement5, 
statement6));
+
+        // Verify the end results of the query match the expected results.
+        final Set<RyaSubGraph> results = readAllResults(pcjId);
+        // Create the expected results of the SPARQL query once the PCJ has 
been
+        // computed.
+        RyaStatement statement7 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:London"));
+        RyaStatement statement8 = new RyaStatement(new RyaURI("urn:Joe"), new 
RyaURI("urn:friendsWith"), new RyaURI("urn:Bob"));
+        RyaStatement statement9 = new RyaStatement(new RyaURI("urn:John"), new 
RyaURI("urn:travelsTo"), new RyaURI("urn:SanFrancisco"));
+        RyaStatement statement10 = new RyaStatement(new RyaURI("urn:John"), 
new RyaURI("urn:friendsWith"), new RyaURI("urn:Evan"));
+        statement7.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement8.setColumnVisibility("U&V&W".getBytes("UTF-8"));
+        statement9.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+        statement10.setColumnVisibility("A&B&C".getBytes("UTF-8"));
+
+        final Set<RyaSubGraph> expectedResults = new HashSet<>();
+
+        RyaSubGraph subGraph1 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts1 = new HashSet<>(Arrays.asList(statement7, 
statement8));
+        subGraph1.setStatements(stmnts1);
+        expectedResults.add(subGraph1);
+        
+        RyaSubGraph subGraph2 = new RyaSubGraph(pcjId);
+        Set<RyaStatement> stmnts2 = new HashSet<>(Arrays.asList(statement9, 
statement10));
+        subGraph2.setStatements(stmnts2);
+        expectedResults.add(subGraph2);
+
+        
ConstructGraphTestUtils.subGraphsEqualIgnoresBlankNode(expectedResults, 
results);
+    }
+    
+    protected KafkaConsumer<String, RyaSubGraph> makeRyaSubGraphConsumer(final 
String TopicName) {
+        // setup consumer
+        final Properties consumerProps = new Properties();
+        consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BROKERHOST + ":" + BROKERPORT);
+        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+        consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer0");
+        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
RyaSubGraphKafkaSerDe.class.getName());
+
+        // to make sure the consumer starts from the beginning of the topic
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final KafkaConsumer<String, RyaSubGraph> consumer = new 
KafkaConsumer<>(consumerProps);
+        consumer.subscribe(Arrays.asList(TopicName));
+        return consumer;
+    }
+
+    private Set<RyaSubGraph> readAllResults(final String pcjId) throws 
Exception {
+        requireNonNull(pcjId);
+
+        // Read all of the results from the Kafka topic.
+        final Set<RyaSubGraph> results = new HashSet<>();
+
+        try (final KafkaConsumer<String, RyaSubGraph> consumer = 
makeRyaSubGraphConsumer(pcjId)) {
+            final ConsumerRecords<String, RyaSubGraph> records = 
consumer.poll(5000);
+            final Iterator<ConsumerRecord<String, RyaSubGraph>> recordIterator 
= records.iterator();
+            while (recordIterator.hasNext()) {
+                results.add(recordIterator.next().value());
+            }
+        }
+
+        return results;
+    }
+    
+    protected String loadStatements(final String sparql, final 
Collection<Statement> statements) throws Exception {
+        return loadRyaStatements(sparql, statements.stream().map(x -> 
RdfToRyaConversions.convertStatement(x)).collect(Collectors.toSet()));
+    }
+    
+
+    protected String loadRyaStatements(final String sparql, final 
Collection<RyaStatement> statements) throws Exception {
+        requireNonNull(sparql);
+        requireNonNull(statements);
+        FluoClient client = null;
+
+        try {
+            CreatePcj createPcj = new CreatePcj();
+            client = new FluoClientImpl(super.getFluoConfiguration());
+            FluoQuery fluoQuery = createPcj.createFluoPcj(client, sparql);
+
+            AccumuloRyaDAO dao = getRyaDAO();
+            dao.add(statements.iterator());
+
+            // Wait for the Fluo application to finish computing the end 
result.
+            super.getMiniFluo().waitForObservers();
+
+            // FluoITHelper.printFluoTable(client);
+            return fluoQuery.getConstructQueryMetadata().get().getNodeId();
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
index cf9dfef..2fbe334 100644
--- 
a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
+++ 
b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/main/java/org/apache/rya/indexing/pcj/functions/geo/FunctionAdapter.java
@@ -44,7 +44,6 @@ class FunctionAdapter implements Function {
 
     @Override
     public Value evaluate(ValueFactory valueFactory, Value... args) throws 
ValueExprEvaluationException {
-        System.out.println("Evaluate: Valuefactory=" + valueFactory);
         // need a Adapter for org.eclipse.rdf4j.model.ValueFactory
         org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = 
org.eclipse.rdf4j.model.impl.SimpleValueFactory.getInstance();
         // org.eclipse.rdf4j.model.ValueFactory rdf4jValueFactory = new 
ValueFactoryAdapter(valueFactory);
@@ -61,7 +60,6 @@ class FunctionAdapter implements Function {
             org.eclipse.rdf4j.model.Literal vLiteral = 
(org.eclipse.rdf4j.model.Literal) v;
             org.openrdf.model.URI vType = 
valueFactory.createURI(vLiteral.getDatatype().stringValue());
             org.openrdf.model.Literal theReturnValue = 
valueFactory.createLiteral(vLiteral.getLabel(), vType);
-            System.out.println("Function RETURNS:" + theReturnValue + " 
class:" + theReturnValue.getClass() + " rdf4j=" + v + " class:" + v.getClass());
             return theReturnValue;
         }
         //

Reply via email to