Repository: incubator-rya
Updated Branches:
  refs/heads/master 22bdc7a2e -> 78f958d2f


Closes #125; RYA-222-Fixed Column Visibility Bug for Results Streamed into Fluo


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/78f958d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/78f958d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/78f958d2

Branch: refs/heads/master
Commit: 78f958d2ff31e2c45fa58b2ea7b226c5598e576e
Parents: 22bdc7a
Author: Caleb Meier <caleb.me...@parsons.com>
Authored: Wed Nov 30 09:07:33 2016 -0800
Committer: pujav65 <puja...@gmail.com>
Committed: Thu Dec 15 14:43:59 2016 -0500

----------------------------------------------------------------------
 .../api/client/accumulo/AccumuloCreatePCJ.java  |  62 ++---
 .../indexing/external/fluo/FluoPcjUpdater.java  |  18 +-
 .../external/fluo/FluoPcjUpdaterSupplier.java   |  16 +-
 .../benchmark/query/PCJOptimizerBenchmark.java  |  18 ++
 .../rya/indexing/pcj/fluo/api/CreatePcj.java    | 256 +++++++++++--------
 .../indexing/pcj/fluo/api/InsertTriples.java    |  39 +++
 .../fluo/client/command/NewQueryCommand.java    |  15 +-
 .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java  |   6 +-
 .../indexing/pcj/fluo/api/GetPcjMetadataIT.java |  11 +-
 .../indexing/pcj/fluo/api/GetQueryReportIT.java |   5 +-
 .../pcj/fluo/integration/CreateDeleteIT.java    |   2 +-
 .../indexing/pcj/fluo/integration/InputIT.java  |  11 +-
 .../indexing/pcj/fluo/integration/QueryIT.java  |  11 +-
 .../pcj/fluo/integration/RyaExportIT.java       |   5 +-
 .../RyaInputIncrementalUpdateIT.java            |  14 +-
 .../pcj/fluo/integration/StreamingTestIT.java   |   2 +-
 .../HistoricStreamingVisibilityIT.java          | 130 ++++++++++
 .../pcj/fluo/visibility/PcjVisibilityIT.java    |  21 +-
 18 files changed, 422 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 80ece33..ac8da66 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -18,27 +18,10 @@
  */
 package org.apache.rya.api.client.accumulo;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
 import org.apache.accumulo.core.client.Connector;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.repository.RepositoryException;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Optional;
-
 import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
 import org.apache.rya.api.client.CreatePCJ;
 import org.apache.rya.api.client.GetInstanceDetails;
@@ -54,8 +37,20 @@ import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryExce
 import org.apache.rya.api.instance.RyaDetailsUpdater;
 import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
 import 
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
-import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * An Accumulo implementation of the {@link CreatePCJ} command.
@@ -108,7 +103,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand 
implements CreatePCJ {
             final String fluoAppName = 
fluoDetailsHolder.get().getUpdateAppName();
             try {
                 updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
-            } catch (RepositoryException | MalformedQueryException | 
SailException | QueryEvaluationException | PcjException e) {
+            } catch (RepositoryException | MalformedQueryException | 
SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
                 throw new RyaClientException("Problem while initializing the 
Fluo application with the new PCJ.", e);
             }
 
@@ -138,7 +133,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand 
implements CreatePCJ {
         return pcjId;
     }
 
-    private void updateFluoApp(final String ryaInstance, final String 
fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) 
throws RepositoryException, MalformedQueryException, SailException, 
QueryEvaluationException, PcjException {
+    private void updateFluoApp(final String ryaInstance, final String 
fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) 
throws RepositoryException, MalformedQueryException, SailException, 
QueryEvaluationException, PcjException, RyaDAOException {
         requireNonNull(pcjStorage);
         requireNonNull(pcjId);
 
@@ -151,32 +146,9 @@ public class AccumuloCreatePCJ extends AccumuloCommand 
implements CreatePCJ {
                 cd.getZookeepers(),
                 fluoAppName);
 
-        // Setup the Rya client that is able to talk to scan Rya's statements.
-        final RyaSailRepository ryaSailRepo = 
makeRyaRepository(getConnector(), ryaInstance);
-
         // Initialize the PCJ within the Fluo application.
         final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = 
new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
-        fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaSailRepo);
+        fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, 
getConnector(), ryaInstance);
     }
 
-    private static RyaSailRepository makeRyaRepository(final Connector 
connector, final String ryaInstance) throws RepositoryException {
-        checkNotNull(connector);
-        checkNotNull(ryaInstance);
-
-        // Setup Rya configuration values.
-        final AccumuloRdfConfiguration ryaConf = new 
AccumuloRdfConfiguration();
-        ryaConf.setTablePrefix( ryaInstance );
-
-        // Connect to the Rya repo using the provided Connector.
-        final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO();
-        accumuloRyaDao.setConnector(connector);
-        accumuloRyaDao.setConf(ryaConf);
-
-        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
-        ryaStore.setRyaDAO(accumuloRyaDao);
-
-        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
-        ryaRepo.initialize();
-        return ryaRepo;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java
index 39a3ca2..0e496ca 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java
@@ -22,17 +22,14 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Collection;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.fluo.api.client.FluoClient;
 import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.api.domain.RyaStatement;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Updates the PCJ indices by forwarding the statement additions/removals to
@@ -47,24 +44,21 @@ public class FluoPcjUpdater implements 
PrecomputedJoinUpdater {
 
     private final FluoClient fluoClient;
     private final InsertTriples insertTriples = new InsertTriples();
-    private final String statementVis;
 
     /**
      * Constructs an instance of {@link FluoPcjUpdater}.
      *
      * @param fluoClient - A connection to the Fluo table new statements will 
be
      *   inserted into and deleted from. (not null)
-     * @param statementVis - The visibility label that will be applied to all
      *   statements that are inserted via the Fluo PCJ updater. (not null)
      */
-    public FluoPcjUpdater(final FluoClient fluoClient, final String 
statementVis) {
+    public FluoPcjUpdater(final FluoClient fluoClient) {
         this.fluoClient = checkNotNull(fluoClient);
-        this.statementVis = checkNotNull(statementVis);
     }
 
     @Override
     public void addStatements(final Collection<RyaStatement> statements) 
throws PcjUpdateException {
-        insertTriples.insert(fluoClient, statements, 
Optional.of(statementVis));
+        insertTriples.insert(fluoClient, statements);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
index 44a4b4a..7d2c181 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java
@@ -25,23 +25,21 @@ import static 
org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMUL
 import static 
org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERNAME;
 import static 
org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS;
 import static 
org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME;
-import static 
org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY;
+
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.hadoop.conf.Configuration;
 import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import 
org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
 /**
  * Creates instances of {@link FluoPcjUpdater} using the values found in a 
{@link Configuration}.
  */
@@ -81,7 +79,6 @@ public class FluoPcjUpdaterSupplier implements 
Supplier<PrecomputedJoinUpdater>
         checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), 
"Missing configuration: " + ACCUMULO_INSTANCE);
         checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), 
"Missing configuration: " + ACCUMULO_USERNAME);
         checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), 
"Missing configuration: " + ACCUMULO_PASSWORD);
-        checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), 
"Missing configuration: " + STATEMENT_VISIBILITY);
 
         // Fluo configuration values.
         final FluoConfiguration fluoClientConfig = new FluoConfiguration();
@@ -95,7 +92,6 @@ public class FluoPcjUpdaterSupplier implements 
Supplier<PrecomputedJoinUpdater>
         fluoClientConfig.setAccumuloPassword( 
fluoUpdaterConfig.getAccumuloPassword().get() );
 
         final FluoClient fluoClient = FluoFactory.newClient(fluoClientConfig);
-        final String statementVisibilities = 
fluoUpdaterConfig.getStatementVisibility().get();
-        return new FluoPcjUpdater(fluoClient, statementVisibilities);
+        return new FluoPcjUpdater(fluoClient);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
index 38abf87..fd74e8b 100644
--- 
a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
+++ 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
@@ -16,6 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+/**
+ll * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.rya.benchmark.query;
 
 import static com.google.common.base.Preconditions.checkArgument;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 1259a01..6567371 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -21,14 +21,27 @@ package org.apache.rya.indexing.pcj.fluo.api;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
+import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.persist.query.BatchRyaQuery;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -39,23 +52,18 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.openrdf.query.Binding;
-import org.openrdf.query.BindingSet;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
 import org.openrdf.query.MalformedQueryException;
 import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.impl.MapBindingSet;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
-import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.sail.SailConnection;
 import org.openrdf.sail.SailException;
 
-import info.aduna.iteration.CloseableIteration;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
@@ -105,6 +113,7 @@ public class CreatePcj {
         this.spInsertBatchSize = spInsertBatchSize;
     }
 
+    
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.
      * <p>
@@ -116,110 +125,145 @@ public class CreatePcj {
      * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
      * @param pcjStorage - Provides access to the PCJ index. (not null)
      * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
-     * @param rya - A connection to the Rya instance hosting the PCJ, (not 
null)
+     * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
      *
      * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
      * @throws SailException Historic PCJ results could not be loaded because 
of a problem with {@code rya}.
      * @throws QueryEvaluationException Historic PCJ results could not be 
loaded because of a problem with {@code rya}.
      */
-    public void withRyaIntegration(
-            final String pcjId,
-            final PrecomputedJoinStorage pcjStorage,
-            final FluoClient fluo,
-            final SailRepository rya)
-                    throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException {
-        requireNonNull(pcjId);
-        requireNonNull(pcjStorage);
-        requireNonNull(fluo);
-        requireNonNull(rya);
-
-        // Keeps track of the IDs that are assigned to each of the query's 
nodes in Fluo.
-        // We use these IDs later when scanning Rya for historic Statement 
Pattern matches
-        // as well as setting up automatic exports.
-        final NodeIds nodeIds = new NodeIds();
-
-        // Parse the query's structure for the metadata that will be written 
to fluo.
-        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
-        final String sparql = pcjMetadata.getSparql();
-        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, 
null);
-        final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
-
-        try(Transaction tx = fluo.newTransaction()) {
-            // Write the query's structure to Fluo.
-            new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
-            // The results of the query are eventually exported to an instance 
of Rya, so store the Rya ID for the PCJ.
-            final String queryId = fluoQuery.getQueryMetadata().getNodeId();
-            tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-            tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
-            
-            // Flush the changes to Fluo.
-            tx.commit();
-        }
-
-        // Get a connection to Rya. It's used to scan for Statement Pattern 
results.
-        final SailConnection ryaConn = rya.getSail().getConnection();
-
-        // Reuse the same set object while performing batch inserts.
-        final Set<BindingSet> batch = new HashSet<>();
-
-        // Iterate through each of the statement patterns and insert their 
historic matches into Fluo.
-        for(final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
-            // Get an iterator over all of the binding sets that match the 
statement pattern.
-            final StatementPattern pattern = 
FluoStringConverter.toStatementPattern( patternMetadata.getStatementPattern() );
-            final CloseableIteration<? extends BindingSet, 
QueryEvaluationException> bindingSets = ryaConn.evaluate(pattern, null, null, 
false);
-
-            // Insert batches of the binding sets into Fluo.
-            while(bindingSets.hasNext()) {
-                if(batch.size() == spInsertBatchSize) {
-                    writeBatch(fluo, patternMetadata, batch);
-                    batch.clear();
-                }
-
-                batch.add( bindingSets.next() );
-            }
-
-            if(!batch.isEmpty()) {
-                writeBatch(fluo, patternMetadata, batch);
-                batch.clear();
-            }
-        }
-    }
+       public void withRyaIntegration(final String pcjId, final 
PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
+                       final Connector accumulo, String ryaInstance )
+                                       throws MalformedQueryException, 
PcjException, SailException, QueryEvaluationException, RyaDAOException {
+               requireNonNull(pcjId);
+               requireNonNull(pcjStorage);
+               requireNonNull(fluo);
+               requireNonNull(accumulo);
+               requireNonNull(ryaInstance);
+               
+               //Create AccumuloRyaQueryEngine to query for historic results
+               AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+               conf.setTablePrefix(ryaInstance);
+               conf.setAuths(getAuths(accumulo));
+               AccumuloRyaQueryEngine queryEngine = new 
AccumuloRyaQueryEngine(accumulo, conf);
+               
 
-    /**
-     * Writes a batch of {@link BindingSet}s that match a statement pattern to 
Fluo.
-     *
-     * @param fluo - Creates transactions to Fluo. (not null)
-     * @param spMetadata - The Statement Pattern the batch matches. (not null)
-     * @param batch - A set of binding sets that are the result of the 
statement pattern. (not null)
-     */
-    private static void writeBatch(final FluoClient fluo, final 
StatementPatternMetadata spMetadata, final Set<BindingSet> batch) {
-        checkNotNull(fluo);
-        checkNotNull(spMetadata);
-        checkNotNull(batch);
+               // Keeps track of the IDs that are assigned to each of the 
query's nodes
+               // in Fluo.
+               // We use these IDs later when scanning Rya for historic 
Statement
+               // Pattern matches
+               // as well as setting up automatic exports.
+               final NodeIds nodeIds = new NodeIds();
+
+               // Parse the query's structure for the metadata that will be 
written to
+               // fluo.
+               final PcjMetadata pcjMetadata = 
pcjStorage.getPcjMetadata(pcjId);
+               final String sparql = pcjMetadata.getSparql();
+               final ParsedQuery parsedQuery = new 
SPARQLParser().parseQuery(sparql, null);
+               final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+
+               try (Transaction tx = fluo.newTransaction()) {
+                       // Write the query's structure to Fluo.
+                       new FluoQueryMetadataDAO().write(tx, fluoQuery);
+
+                       // The results of the query are eventually exported to 
an instance
+                       // of Rya, so store the Rya ID for the PCJ.
+                       final String queryId = 
fluoQuery.getQueryMetadata().getNodeId();
+                       tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+                       tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, 
queryId);
 
-        final BindingSetStringConverter converter = new 
BindingSetStringConverter();
+                       // Flush the changes to Fluo.
+                       tx.commit();
+               }
 
-        try(Transaction tx = fluo.newTransaction()) {
-            // Get the node's variable order.
-            final String spNodeId = spMetadata.getNodeId();
-            final VariableOrder varOrder = spMetadata.getVariableOrder();
+               // Reuse the same set object while performing batch inserts.
+               final Set<RyaStatement> queryBatch = new HashSet<>();
 
-            for(final BindingSet bindingSet : batch) {
-                final MapBindingSet spBindingSet = new MapBindingSet();
-                for(final String var : varOrder) {
-                    final Binding binding = bindingSet.getBinding(var);
-                    spBindingSet.addBinding(binding);
-                }
+               // Iterate through each of the statement patterns and insert 
their
+               // historic matches into Fluo.
+               for (final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
+                       // Get an iterator over all of the binding sets that 
match the
+                       // statement pattern.
+                       final StatementPattern pattern = FluoStringConverter
+                                       
.toStatementPattern(patternMetadata.getStatementPattern());
+                       queryBatch.add(spToRyaStatement(pattern));
+               }
 
-                final String bindingSetStr = converter.convert(spBindingSet, 
varOrder);
+               Iterator<RyaStatement> triples = queryEngine.query(new 
BatchRyaQuery(queryBatch)).iterator();
+               Set<RyaStatement> triplesBatch = new HashSet<>();
 
-                // Write the binding set entry to Fluo for the statement 
pattern.
-                tx.set(spNodeId + NODEID_BS_DELIM + bindingSetStr, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, bindingSetStr);
-            }
+               // Insert batches of the binding sets into Fluo.
+               while (triples.hasNext()) {
+                       if (triplesBatch.size() == spInsertBatchSize) {
+                               writeBatch(fluo, triplesBatch);
+                               triplesBatch.clear();
+                       }
 
-            tx.commit();
-        }
+                       triplesBatch.add(triples.next());
+               }
+
+               if (!triplesBatch.isEmpty()) {
+                       writeBatch(fluo, triplesBatch);
+                       triplesBatch.clear();
+               }
+       }
+    
+    
+    private static void writeBatch(final FluoClient fluo, final 
Set<RyaStatement> batch) {
+        checkNotNull(fluo);
+        checkNotNull(batch);
+        
+        new InsertTriples().insert(fluo, batch);
+
+    }
+    
+    
+    private static RyaStatement spToRyaStatement(StatementPattern sp) {
+    
+       Value subjVal = sp.getSubjectVar().getValue();
+       Value predVal = sp.getPredicateVar().getValue();
+       Value objVal = sp.getObjectVar().getValue();
+       
+       RyaURI subjURI = null;
+       RyaURI predURI = null;
+       RyaType objType = null;
+       
+       if(subjVal != null) {
+               if(!(subjVal instanceof Resource)) {
+                       throw new AssertionError("Subject must be a Resource.");
+               }
+               subjURI = RdfToRyaConversions.convertResource((Resource) 
subjVal);
+       }
+       
+               if (predVal != null) {
+                       if(!(predVal instanceof URI)) {
+                       throw new AssertionError("Predicate must be a URI.");
+               }
+                       predURI = RdfToRyaConversions.convertURI((URI) predVal);
+               }
+               
+               if (objVal != null ) {
+                       objType = RdfToRyaConversions.convertValue(objVal);
+               }
+               
+       return new RyaStatement(subjURI, predURI, objType);
     }
+    
+    
+    private String[] getAuths(Connector accumulo) {
+        Authorizations auths;
+               try {
+                       auths = 
accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
+                       List<byte[]> authList = auths.getAuthorizations();
+                String[] authArray = new String[authList.size()];
+                for(int i = 0; i < authList.size(); i++){
+                       authArray[i] = new String(authList.get(i), "UTF-8");
+                }
+                return authArray;
+               } catch (AccumuloException | AccumuloSecurityException | 
UnsupportedEncodingException e) {
+                       throw new RuntimeException("Cannot read authorizations 
for user: " + accumulo.whoami());
+               }
+   }
+    
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
index 9312523..1e86836 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java
@@ -88,6 +88,45 @@ public class InsertTriples {
             tx.commit();
         }
     }
+    
+    /**
+     * Inserts a triple into Fluo.
+     *
+     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
+     * @param triple - The RyaStatement to insert. (not null)
+     */
+    public void insert(final FluoClient fluo, final RyaStatement triple) {
+       checkNotNull(fluo);
+        checkNotNull(triple);
+
+        insert(fluo, Collections.singleton(triple));
+    }
+    
+    /**
+     * Insert a batch of RyaStatements into Fluo.  
+     *
+     * @param fluo - A connection to the Fluo table that will be updated. (not 
null)
+     * @param triples - The triples to insert. (not null)
+     */
+    public void insert(final FluoClient fluo, final Collection<RyaStatement> 
triples) {
+       checkNotNull(fluo);
+        checkNotNull(triples);
+
+        try(Transaction tx = fluo.newTransaction()) {
+            for(final RyaStatement triple : triples) {
+                Optional<byte[]> visibility = 
Optional.fromNullable(triple.getColumnVisibility());
+                try {
+                    tx.set(Bytes.of(spoFormat(triple)), 
FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+                } catch (final TripleRowResolverException e) {
+                    log.error("Could not convert a Triple into the SPO format: 
" + triple);
+                }
+            }
+
+            tx.commit();
+        }
+    }
+    
+    
 
     /**
      * Converts a triple into a byte[] holding the Rya SPO representation of 
it.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
index e612a07..43dac3c 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java
@@ -21,17 +21,26 @@ package org.apache.rya.indexing.pcj.fluo.client.command;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.commons.io.IOUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand;
 import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest;
@@ -124,12 +133,14 @@ public class NewQueryCommand implements 
PcjAdminClientCommand {
             final String pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo PCJ Updater app to maintain the PCJ.
-            createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, rya);
+            createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, accumulo, 
ryaTablePrefix);
 
-        } catch (MalformedQueryException | SailException | 
QueryEvaluationException | PcjException e) {
+        } catch (MalformedQueryException | SailException | 
QueryEvaluationException | PcjException | RyaDAOException e) {
             throw new ExecutionException("Could not create and load historic 
matches into the the Fluo app for the query.", e);
         }
 
         log.trace("Finished executing the New Query Command.");
     }
+  
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
index 09f2854..105f697 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java
@@ -47,9 +47,11 @@ import com.google.common.collect.Sets;
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.api.resolver.RyaToRdfConversions;
 import org.apache.rya.rdftriplestore.RyaSailRepository;
 
@@ -177,9 +179,9 @@ public class FluoAndHistoricPcjsDemo implements Demo {
             pcjId = pcjStorage.createPcj(sparql);
 
             // Tell the Fluo app to maintain it.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, ryaTablePrefix);
 
-        } catch (MalformedQueryException | SailException | 
QueryEvaluationException | PcjException e) {
+        } catch (MalformedQueryException | SailException | 
QueryEvaluationException | PcjException | RyaDAOException e) {
             throw new DemoExecutionException("Error while using Fluo to 
compute and export historic matches, so the demo can not continue. Exiting.", 
e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 94d974d..82b61bd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import 
org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException;
 import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException;
@@ -49,7 +50,7 @@ import com.google.common.collect.Sets;
 public class GetPcjMetadataIT extends ITBase {
 
     @Test
-    public void getMetadataByQueryId() throws RepositoryException, 
MalformedQueryException, SailException, QueryEvaluationException, PcjException, 
NotInFluoException, NotInAccumuloException {
+    public void getMetadataByQueryId() throws RepositoryException, 
MalformedQueryException, SailException, QueryEvaluationException, PcjException, 
NotInFluoException, NotInAccumuloException, RyaDAOException {
         final String sparql =
                 "SELECT ?x " +
                   "WHERE { " +
@@ -62,7 +63,7 @@ public class GetPcjMetadataIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
         final String queryId = new 
ListQueryIds().listQueryIds(fluoClient).get(0);
@@ -75,7 +76,7 @@ public class GetPcjMetadataIT extends ITBase {
     }
 
     @Test
-    public void getAllMetadata() throws MalformedQueryException, 
SailException, QueryEvaluationException, PcjException, NotInFluoException, 
NotInAccumuloException, AccumuloException, AccumuloSecurityException {
+    public void getAllMetadata() throws MalformedQueryException, 
SailException, QueryEvaluationException, PcjException, NotInFluoException, 
NotInAccumuloException, AccumuloException, AccumuloSecurityException, 
RyaDAOException {
 
         final CreatePcj createPcj = new CreatePcj();
 
@@ -89,7 +90,7 @@ public class GetPcjMetadataIT extends ITBase {
                   "?x <http://worksAt> <http://Chipotle>." +
                 "}";
         final String q1PcjId = pcjStorage.createPcj(q1Sparql);
-        createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, ryaRepo);
+        createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         final String q2Sparql =
                 "SELECT ?x ?y " +
@@ -98,7 +99,7 @@ public class GetPcjMetadataIT extends ITBase {
                   "?y <http://worksAt> <http://Chipotle>." +
                 "}";
         final String q2PcjId = pcjStorage.createPcj(q2Sparql);
-        createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, ryaRepo);
+        createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Ensure the command returns the correct metadata.
         final Set<PcjMetadata> expected = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 0fe44bd..85c31a0 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -38,8 +39,6 @@ import org.junit.Test;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.domain.RyaStatement;
-
 /**
  * Integration tests the methods of {@link GetQueryReportl}.
  */
@@ -79,7 +78,7 @@ public class GetQueryReportIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index d2ff98c..b4c8d69 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -84,7 +84,7 @@ public class CreateDeleteIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index d0f6b21..dcab997 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -38,8 +39,6 @@ import org.openrdf.query.impl.BindingImpl;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.domain.RyaStatement;
-
 /**
  * Performs integration tests over the Fluo application geared towards various 
types of input.
  * <p>
@@ -90,7 +89,7 @@ public class InputIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -137,7 +136,7 @@ public class InputIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Ensure the query has no results yet.
         fluo.waitForObservers();
@@ -189,7 +188,7 @@ public class InputIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Ensure Alice is a match.
         fluo.waitForObservers();
@@ -254,7 +253,7 @@ public class InputIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Ensure Alice is a match.
         fluo.waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 57b679a..4e9f265 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -38,8 +39,6 @@ import org.openrdf.query.impl.BindingImpl;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.domain.RyaStatement;
-
 /**
  * Performs integration tests over the Fluo application geared towards various 
query structures.
  * <p>
@@ -84,7 +83,7 @@ public class QueryIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -170,7 +169,7 @@ public class QueryIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -235,7 +234,7 @@ public class QueryIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());
@@ -283,7 +282,7 @@ public class QueryIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 07b4640..f3f486c 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -36,8 +37,6 @@ import org.openrdf.query.impl.BindingImpl;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.api.domain.RyaStatement;
-
 /**
  * Performs integration tests over the Fluo application geared towards Rya PCJ 
exporting.
  * <p>
@@ -99,7 +98,7 @@ public class RyaExportIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index 9056c99..fd70a19 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -23,9 +23,11 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
-import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater;
@@ -38,10 +40,6 @@ import org.openrdf.repository.RepositoryConnection;
 
 import com.google.common.collect.Sets;
 
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexer;
-
 
 /**
  * This test ensures that the correct updates are pushed by Fluo
@@ -91,7 +89,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Verify the end results of the query match the expected results.
         fluo.waitForObservers();
@@ -142,7 +140,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         fluo.waitForObservers();
 
@@ -191,7 +189,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase {
         final String pcjId = pcjStorage.createPcj(sparql);
 
         // Tell the Fluo app to maintain the PCJ.
-        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         fluo.waitForObservers();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index 2573925..29ef8f7 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -98,7 +98,7 @@ public class StreamingTestIT extends ITBase {
            // Create the PCJ table.
         final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
         final String pcjId = pcjStorage.createPcj(pcj);
-               new CreatePcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
                String tableName = RYA_INSTANCE_NAME + "INDEX_" + pcjId;
                
                return tableName;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
new file mode 100644
index 0000000..6f4596f
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.visibility;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Performs integration tests over the Fluo application geared towards various 
types of input.
+ * <p>
+ * These tests are being ignore so that they will not run as unit tests while 
building the application.
+ */
+public class HistoricStreamingVisibilityIT extends ITBase {
+
+    /**
+     * Ensure historic matches are included in the result.
+     */
+    @Test
+    public void historicResults() throws Exception {
+        // A query that finds people who talk to Eve and work at Chipotle.
+        final String sparql =
+              "SELECT ?x " +
+                "WHERE { " +
+                "?x <http://talksTo> <http://Eve>. " +
+                "?x <http://worksAt> <http://Chipotle>." +
+              "}";
+        
+        
accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new 
Authorizations("U","V","W"));
+        AccumuloRyaDAO dao = new AccumuloRyaDAO();
+        dao.setConnector(accumuloConn);
+        dao.setConf(makeConfig());
+        dao.init();
+
+        // Triples that are loaded into Rya before the PCJ is created.
+        final Set<RyaStatement> historicTriples = Sets.newHashSet(
+                makeRyaStatement(makeStatement("http://Alice";, 
"http://talksTo";, "http://Eve";),"U"),
+                makeRyaStatement(makeStatement("http://Bob";, "http://talksTo";, 
"http://Eve";),"V"),
+                makeRyaStatement(makeStatement("http://Charlie";, 
"http://talksTo";, "http://Eve";),"W"),
+
+                makeRyaStatement(makeStatement("http://Eve";, "http://helps";, 
"http://Kevin";), "U"),
+
+                makeRyaStatement(makeStatement("http://Bob";, "http://worksAt";, 
"http://Chipotle";), "W"),
+                makeRyaStatement(makeStatement("http://Charlie";, 
"http://worksAt";, "http://Chipotle";), "V"),
+                makeRyaStatement(makeStatement("http://Eve";, "http://worksAt";, 
"http://Chipotle";), "U"),
+                makeRyaStatement(makeStatement("http://David";, 
"http://worksAt";, "http://Chipotle";), "V"));
+
+        dao.add(historicTriples.iterator());
+        dao.flush();
+        
+        // The expected results of the SPARQL query once the PCJ has been 
computed.
+        final Set<BindingSet> expected = new HashSet<>();
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Bob";))));
+        expected.add(makeBindingSet(
+                new BindingImpl("x", new URIImpl("http://Charlie";))));
+        
+        // Create the PCJ table.
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final String pcjId = pcjStorage.createPcj(sparql);
+
+        new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+
+        // Verify the end results of the query match the expected results.
+        fluo.waitForObservers();
+        Set<BindingSet> results = 
Sets.newHashSet(pcjStorage.listResults(pcjId));
+        Assert.assertEquals(expected, results);
+    }
+    
+    
+    private AccumuloRdfConfiguration makeConfig() {
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(RYA_INSTANCE_NAME);
+        // Accumulo connection information.
+        conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName);
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W");
+
+        return conf;
+    }
+    
+    
+    private static RyaStatement makeRyaStatement(Statement statement, String 
visibility) throws UnsupportedEncodingException {
+       
+       RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
+       ryaStatement.setColumnVisibility(visibility.getBytes("UTF-8"));
+       return ryaStatement;
+       
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index e799ddd..ccc2c20 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -38,12 +38,21 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.hadoop.io.Text;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.pcj.fluo.ITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.sail.config.RyaSailFactory;
 import org.junit.Test;
 import org.openrdf.model.URI;
 import org.openrdf.model.ValueFactory;
@@ -57,15 +66,6 @@ import org.openrdf.sail.Sail;
 import com.beust.jcommander.internal.Sets;
 import com.google.common.base.Optional;
 
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-
 /**
  * Integration tests that ensure the Fluo Application properly exports PCJ
  * results with the correct Visibility values.
@@ -110,6 +110,7 @@ public class PcjVisibilityIT extends ITBase {
         // PCJ updating application will have to maintain visibilities.
         final AccumuloRdfConfiguration ryaConf = 
super.makeConfig(instanceName, zookeepers);
         ryaConf.set(ConfigUtils.CLOUDBASE_AUTHS, "u");
+        ryaConf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "u");
 
         Sail sail = null;
         RyaSailRepository ryaRepo = null;
@@ -195,7 +196,7 @@ public class PcjVisibilityIT extends ITBase {
         final String pcjId = rootStorage.createPcj(sparql);
 
         // Create the PCJ in Fluo.
-        new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
ryaRepo);
+        new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
 
         // Stream the data into Fluo.
         for(final RyaStatement statement : streamedTriples.keySet()) {

Reply via email to