Repository: incubator-rya
Updated Branches:
  refs/heads/master 643608ca1 -> c9b66c7c3


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
index 2bcce65..e43ab83 100644
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
@@ -41,6 +41,8 @@ import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
@@ -53,6 +55,7 @@ import org.apache.zookeeper.ClientCnxn;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.openrdf.model.Statement;
 import org.openrdf.model.impl.LiteralImpl;
@@ -83,15 +86,16 @@ public class PcjTablesIntegrationTest {
 
     private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
 
-    protected static final String RYA_TABLE_PREFIX = "demo_";
-
     // The MiniAccumuloCluster is re-used between tests.
-    private MiniAccumuloClusterInstance cluster;
+    private MiniAccumuloClusterInstance cluster = 
MiniAccumuloSingleton.getInstance();
 
     // Rya data store and connections.
     protected RyaSailRepository ryaRepo = null;
     protected RepositoryConnection ryaConn = null;
 
+    @Rule
+    public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
+
     @BeforeClass
     public static void killLoudLogs() {
         Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
@@ -99,10 +103,6 @@ public class PcjTablesIntegrationTest {
 
     @Before
     public void resetTestEnvironmanet() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, RepositoryException, 
IOException, InterruptedException {
-        // Start the cluster.
-        cluster = new MiniAccumuloClusterInstance();
-        cluster.startMiniAccumulo();
-
         // Setup the Rya library to use the Mini Accumulo.
         ryaRepo = setupRya();
         ryaConn = ryaRepo.getConnection();
@@ -111,12 +111,15 @@ public class PcjTablesIntegrationTest {
     @After
     public void shutdownMiniCluster() throws IOException, 
InterruptedException, RepositoryException {
         // Stop Rya.
-        ryaRepo.shutDown();
-
-        // Stop the cluster.
-        cluster.stopMiniAccumulo();
+        if (ryaRepo != null) {
+            ryaRepo.shutDown();
+        }
     }
 
+    private String getRyaInstanceName() {
+        return testInstance.getRyaInstanceName();
+    }
+    
     /**
      * Format a Mini Accumulo to be a Rya repository.
      *
@@ -130,11 +133,11 @@ public class PcjTablesIntegrationTest {
 
         // Setup Rya configuration values.
         final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(RYA_TABLE_PREFIX);
+        conf.setTablePrefix(getRyaInstanceName());
         conf.setDisplayQueryPlan(true);
 
         conf.setBoolean(USE_MOCK_INSTANCE, false);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
RYA_TABLE_PREFIX);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
getRyaInstanceName());
         conf.set(CLOUDBASE_USER, cluster.getUsername());
         conf.set(CLOUDBASE_PASSWORD, cluster.getPassword());
         conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName());
@@ -167,7 +170,7 @@ public class PcjTablesIntegrationTest {
         final Connector accumuloConn = cluster.getConnector();
 
         // Create a PCJ table in the Mini Accumulo.
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
         pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
@@ -198,7 +201,7 @@ public class PcjTablesIntegrationTest {
         final Connector accumuloConn = cluster.getConnector();
 
         // Create a PCJ table in the Mini Accumulo.
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
         pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
@@ -249,7 +252,7 @@ public class PcjTablesIntegrationTest {
         final Connector accumuloConn = cluster.getConnector();
 
         // Create a PCJ table in the Mini Accumulo.
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
         pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
@@ -323,7 +326,7 @@ public class PcjTablesIntegrationTest {
 
         final Connector accumuloConn = cluster.getConnector();
 
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
         pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
@@ -393,7 +396,7 @@ public class PcjTablesIntegrationTest {
 
         final Connector accumuloConn = cluster.getConnector();
 
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
 
         // Create and populate the PCJ table.
         final PcjTables pcjs = new PcjTables();
@@ -476,7 +479,7 @@ public class PcjTablesIntegrationTest {
         final Connector accumuloConn = cluster.getConnector();
 
         // Create a PCJ table in the Mini Accumulo.
-        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj");
+        final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj");
         final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age"));
         final PcjTables pcjs = new PcjTables();
         pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql);
@@ -516,7 +519,7 @@ public class PcjTablesIntegrationTest {
         final Connector accumuloConn = cluster.getConnector();
 
         // Create a PCJ index.
-        final String tableName = new 
PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "thePcj");
+        final String tableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj");
         final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( 
new VariableOrder("x") );
         final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
new file mode 100644
index 0000000..b5d9428
--- /dev/null
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java
@@ -0,0 +1,282 @@
+package org.apache.rya.indexing.pcj.fluo;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.UnknownHostException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.accumulo.MiniAccumuloSingleton;
+import org.apache.rya.accumulo.RyaTestInstanceRule;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloInstall;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.Install;
+import org.apache.rya.api.client.Install.DuplicateInstanceNameException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests that ensure the Fluo application processes PCJs results
+ * correctly.
+ * <p>
+ * This class is being ignored because it doesn't contain any unit tests.
+ */
+public abstract class FluoITBase {
+    private static final Logger log = Logger.getLogger(FluoITBase.class);
+
+    // Mini Accumulo Cluster
+    private static MiniAccumuloClusterInstance clusterInstance = 
MiniAccumuloSingleton.getInstance();
+    private static MiniAccumuloCluster cluster;
+
+    private static String instanceName = null;
+    private static String zookeepers = null;
+
+    protected static Connector accumuloConn = null;
+
+    // Fluo data store and connections.
+    protected MiniFluo fluo = null;
+    protected FluoConfiguration fluoConfig = null;
+    protected FluoClient fluoClient = null;
+
+    // Rya data store and connections.
+    protected RyaSailRepository ryaRepo = null;
+    protected RepositoryConnection ryaConn = null;
+
+    @Rule
+    public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false);
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
+
+        // Setup and start the Mini Accumulo.
+        cluster = clusterInstance.getCluster();
+
+        // Store a connector to the Mini Accumulo.
+        instanceName = cluster.getInstanceName();
+        zookeepers = cluster.getZooKeepers();
+
+        final Instance instance = new ZooKeeperInstance(instanceName, 
zookeepers);
+        accumuloConn = instance.getConnector(clusterInstance.getUsername(), 
new PasswordToken(clusterInstance.getPassword()));
+    }
+
+    @Before
+    public void setupMiniResources() throws Exception {
+        // Initialize the Mini Fluo that will be used to store created queries.
+        fluoConfig = createFluoConfig();
+        preFluoInitHook();
+        FluoFactory.newAdmin(fluoConfig).initialize(new 
FluoAdmin.InitializationOptions()
+                .setClearTable(true)
+                .setClearZookeeper(true));
+        postFluoInitHook();
+        fluo = FluoFactory.newMiniFluo(fluoConfig);
+        fluoClient = FluoFactory.newClient(fluo.getClientConfiguration());
+
+        // Initialize the Rya that will be used by the tests.
+        ryaRepo = setupRya();
+        ryaConn = ryaRepo.getConnection();
+    }
+
+    @After
+    public void shutdownMiniResources() {
+        if (ryaConn != null) {
+            try {
+                log.info("Shutting down Rya Connection.");
+                ryaConn.close();
+                log.info("Rya Connection shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Rya Connection.", e);
+            }
+        }
+
+        if (ryaRepo != null) {
+            try {
+                log.info("Shutting down Rya Repo.");
+                ryaRepo.shutDown();
+                log.info("Rya Repo shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Rya Repo.", e);
+            }
+        }
+
+        if (fluoClient != null) {
+            try {
+                log.info("Shutting down Fluo Client.");
+                fluoClient.close();
+                log.info("Fluo Client shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Fluo Client.", e);
+            }
+        }
+
+        if (fluo != null) {
+            try {
+                log.info("Shutting down Mini Fluo.");
+                fluo.close();
+                log.info("Mini Fluo shut down.");
+            } catch (final Exception e) {
+                log.error("Could not shut down the Mini Fluo.", e);
+            }
+        }
+    }
+
+    protected void preFluoInitHook() throws Exception {
+
+    }
+
+    protected void postFluoInitHook() throws Exception {
+
+    }
+
+    protected MiniAccumuloCluster getMiniAccumuloCluster() {
+        return cluster;
+    }
+
+    protected MiniFluo getMiniFluo() {
+        return fluo;
+    }
+
+    public RyaSailRepository getRyaSailRepository() {
+        return ryaRepo;
+    }
+
+    public Connector getAccumuloConnector() {
+        return accumuloConn;
+    }
+
+    public String getRyaInstanceName() {
+        return testInstance.getRyaInstanceName();
+    }
+
+    protected String getUsername() {
+        return clusterInstance.getUsername();
+    }
+
+    protected String getPassword() {
+        return clusterInstance.getPassword();
+    }
+
+    protected FluoConfiguration getFluoConfiguration() {
+        return fluoConfig;
+    }
+
+    public AccumuloConnectionDetails createConnectionDetails() {
+        return new AccumuloConnectionDetails(
+                clusterInstance.getUsername(),
+                clusterInstance.getPassword().toCharArray(),
+                clusterInstance.getInstanceName(),
+                clusterInstance.getZookeepers());
+    }
+
+    private FluoConfiguration createFluoConfig() {
+        // Configure how the mini fluo will run.
+        final FluoConfiguration config = new FluoConfiguration();
+        config.setMiniStartAccumulo(false);
+        config.setAccumuloInstance(instanceName);
+        config.setAccumuloUser(clusterInstance.getUsername());
+        config.setAccumuloPassword(clusterInstance.getPassword());
+        config.setInstanceZookeepers(zookeepers + "/fluo");
+        config.setAccumuloZookeepers(zookeepers);
+
+        config.setApplicationName(getRyaInstanceName());
+        config.setAccumuloTable("fluo" + getRyaInstanceName());
+        return config;
+    }
+
+    /**
+     * Sets up a Rya instance.
+     */
+    protected RyaSailRepository setupRya()
+            throws AccumuloException, AccumuloSecurityException, 
RepositoryException, RyaDAOException,
+            NumberFormatException, UnknownHostException, 
InferenceEngineException, AlreadyInitializedException,
+            RyaDetailsRepositoryException, DuplicateInstanceNameException, 
RyaClientException, SailException {
+        checkNotNull(instanceName);
+        checkNotNull(zookeepers);
+
+        // Setup Rya configuration values.
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(getRyaInstanceName());
+        conf.setDisplayQueryPlan(true);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false);
+        conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername());
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, 
clusterInstance.getPassword());
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, 
clusterInstance.getInstanceName());
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, 
clusterInstance.getZookeepers());
+        conf.set(ConfigUtils.USE_PCJ, "true");
+        conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName());
+        conf.set(ConfigUtils.PCJ_STORAGE_TYPE, 
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        conf.set(ConfigUtils.PCJ_UPDATER_TYPE, 
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+        conf.set(ConfigUtils.CLOUDBASE_AUTHS, "");
+
+        // Install the test instance of Rya.
+        final Install install = new AccumuloInstall(createConnectionDetails(), 
accumuloConn);
+
+        final InstallConfiguration installConfig = 
InstallConfiguration.builder()
+                .setEnableTableHashPrefix(true)
+                .setEnableEntityCentricIndex(true)
+                .setEnableFreeTextIndex(true)
+                .setEnableTemporalIndex(true)
+                .setEnablePcjIndex(true)
+                .setEnableGeoIndex(true)
+                .setFluoPcjAppName(getRyaInstanceName())
+                .build();
+        install.install(getRyaInstanceName(), installConfig);
+
+        // Connect to the instance of Rya that was just installed.
+        final Sail sail = RyaSailFactory.getInstance(conf);
+        final RyaSailRepository ryaRepo = new RyaSailRepository(sail);
+
+        return ryaRepo;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
index cd84cb9..3ed1844 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java
@@ -93,8 +93,6 @@ public class KafkaExportITBase extends AccumuloExportITBase {
 
     /**
      * Add info about the Kafka queue/topic to receive the export.
-     *
-     * @see 
org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
      */
     @Override
     protected void preFluoInitHook() throws Exception {
@@ -128,8 +126,6 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
 
     /**
      * setup mini kafka and call the super to setup mini fluo
-     *
-     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
      */
     @Before
     public void setupKafka() throws Exception {
@@ -239,8 +235,6 @@ public class KafkaExportITBase extends AccumuloExportITBase 
{
 
     /**
      * Close all the Kafka mini server and mini-zookeeper
-     *
-     * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
      */
     @After
     public void teardownKafka() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
index 5fe999f..84b6343 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java
@@ -22,19 +22,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
-import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.api.client.Install.InstallConfiguration;
-import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
-import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
 import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
@@ -42,26 +33,12 @@ import 
org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
 import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-import org.apache.rya.sail.config.RyaSailFactory;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.openrdf.sail.Sail;
 
 /**
  * The base Integration Test class used for Fluo applications that export to a 
Rya PCJ Index.
  */
-public class RyaExportITBase extends AccumuloExportITBase {
-
-    protected static final String RYA_INSTANCE_NAME = "test_";
-
-    private RyaSailRepository ryaSailRepo = null;
-
-    public RyaExportITBase() {
-        // Indicates that MiniFluo should be started before each test.
-        super(true);
-    }
+public class RyaExportITBase extends FluoITBase {
 
     @BeforeClass
     public static void setupLogging() {
@@ -83,11 +60,11 @@ public class RyaExportITBase extends AccumuloExportITBase {
         final HashMap<String, String> exportParams = new HashMap<>();
         final RyaExportParameters ryaParams = new 
RyaExportParameters(exportParams);
         ryaParams.setExportToRya(true);
-        ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+        ryaParams.setRyaInstanceName(getRyaInstanceName());
         
ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName());
         
ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers());
-        ryaParams.setExporterUsername(ACCUMULO_USER);
-        ryaParams.setExporterPassword(ACCUMULO_PASSWORD);
+        ryaParams.setExporterUsername(getUsername());
+        ryaParams.setExporterPassword(getPassword());
 
         final ObserverSpecification exportObserverConfig = new 
ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
         observers.add(exportObserverConfig);
@@ -96,87 +73,4 @@ public class RyaExportITBase extends AccumuloExportITBase {
         super.getFluoConfiguration().addObservers(observers);
     }
 
-    @Before
-    public void setupRya() throws Exception {
-        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
-        final String instanceName = cluster.getInstanceName();
-        final String zookeepers = cluster.getZooKeepers();
-
-        // Install the Rya instance to the mini accumulo cluster.
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
-                super.getAccumuloConnector());
-
-        ryaClient.getInstall().install(RYA_INSTANCE_NAME, 
InstallConfiguration.builder()
-                .setEnableTableHashPrefix(false)
-                .setEnableFreeTextIndex(false)
-                .setEnableEntityCentricIndex(false)
-                .setEnableGeoIndex(false)
-                .setEnableTemporalIndex(false)
-                .setEnablePcjIndex(true)
-                .setFluoPcjAppName( 
super.getFluoConfiguration().getApplicationName() )
-                .build());
-
-        // Connect to the Rya instance that was just installed.
-        final AccumuloRdfConfiguration conf = makeConfig(instanceName, 
zookeepers);
-        final Sail sail = RyaSailFactory.getInstance(conf);
-        ryaSailRepo = new RyaSailRepository(sail);
-    }
-
-    @After
-    public void teardownRya() throws Exception {
-        final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster();
-        final String instanceName = cluster.getInstanceName();
-        final String zookeepers = cluster.getZooKeepers();
-
-        // Uninstall the instance of Rya.
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(
-                new AccumuloConnectionDetails(
-                    ACCUMULO_USER,
-                    ACCUMULO_PASSWORD.toCharArray(),
-                    instanceName,
-                    zookeepers),
-                super.getAccumuloConnector());
-
-        ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME);
-
-        // Shutdown the repo.
-        ryaSailRepo.shutDown();
-    }
-
-    /**
-     * @return A {@link RyaSailRepository} that is connected to the Rya 
instance that statements are loaded into.
-     */
-    protected RyaSailRepository getRyaSailRepository() throws Exception {
-        return ryaSailRepo;
-    }
-
-    protected AccumuloRdfConfiguration makeConfig(final String instanceName, 
final String zookeepers) {
-        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(RYA_INSTANCE_NAME);
-
-        // Accumulo connection information.
-        conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER);
-        conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD);
-        
conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
-        
conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
-        conf.setAuths("");
-
-        // PCJ configuration information.
-        conf.set(ConfigUtils.USE_PCJ, "true");
-        conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
-        conf.set(ConfigUtils.FLUO_APP_NAME, 
super.getFluoConfiguration().getApplicationName());
-        conf.set(ConfigUtils.PCJ_STORAGE_TYPE,
-                
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
-        conf.set(ConfigUtils.PCJ_UPDATER_TYPE,
-                
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
-
-        conf.setDisplayQueryPlan(true);
-
-        return conf;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
index 0aceaa3..9a1c285 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java
@@ -63,12 +63,12 @@ public class GetPcjMetadataIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Fetch the PCJ's Metadata through the GetPcjMetadata interactor.
             final String queryId = new 
ListQueryIds().listQueryIds(fluoClient).get(0);
@@ -84,7 +84,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
     @Test
     public void getAllMetadata() throws MalformedQueryException, 
SailException, QueryEvaluationException, PcjException, NotInFluoException, 
NotInAccumuloException, AccumuloException, AccumuloSecurityException, 
RyaDAOException {
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Add a couple of queries to Accumulo.
@@ -96,7 +96,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
                             "}";
             final String q1PcjId = pcjStorage.createPcj(q1Sparql);
             final CreatePcj createPcj = new CreatePcj();
-            createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             final String q2Sparql =
                     "SELECT ?x ?y " +
@@ -105,7 +105,7 @@ public class GetPcjMetadataIT extends RyaExportITBase {
                             "?y <http://worksAt> <http://Chipotle>." +
                             "}";
             final String q2PcjId = pcjStorage.createPcj(q2Sparql);
-            createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Ensure the command returns the correct metadata.
             final Set<PcjMetadata> expected = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
index 10f2319..d56c23a 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java
@@ -78,13 +78,14 @@ public class GetQueryReportIT extends RyaExportITBase {
                 new RyaStatement(new RyaURI("http://Frank";), new 
RyaURI("http://livesIn";), new RyaURI("http://Lost County")));
 
         // Create the PCJ table.
+
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 21d7db0..349d391 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -26,8 +26,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
@@ -36,7 +34,6 @@ import org.apache.fluo.api.client.scanner.RowScanner;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.api.client.RyaClient;
-import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
 import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
@@ -130,16 +127,9 @@ public class CreateDeleteIT extends RyaExportITBase {
         requireNonNull(statements);
 
         // Register the PCJ with Rya.
-        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
-        final Connector accumuloConn = super.getAccumuloConnector();
+        final RyaClient ryaClient = 
AccumuloRyaClientFactory.build(createConnectionDetails(), 
getAccumuloConnector());
 
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                accInstance.getInstanceName(),
-                accInstance.getZooKeepers()), accumuloConn);
-
-        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
 
         // Write the data to Rya.
         final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
index ab97bbd..05dfd32 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java
@@ -97,12 +97,12 @@ public class InputIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Verify the end results of the query match the expected results.
             super.getMiniFluo().waitForObservers();
@@ -157,12 +157,12 @@ public class InputIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Ensure the query has no results yet.
             super.getMiniFluo().waitForObservers();
@@ -223,12 +223,12 @@ public class InputIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Ensure Alice is a match.
             super.getMiniFluo().waitForObservers();
@@ -312,12 +312,12 @@ public class InputIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Ensure Alice is a match.
             super.getMiniFluo().waitForObservers();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 08bf2e1..f815a55 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -439,16 +439,11 @@ public class QueryIT extends RyaExportITBase {
         requireNonNull(expectedResults);
 
         // Register the PCJ with Rya.
-        final Instance accInstance = 
super.getAccumuloConnector().getInstance();
         final Connector accumuloConn = super.getAccumuloConnector();
 
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                accInstance.getInstanceName(),
-                accInstance.getZooKeepers()), accumuloConn);
+        final RyaClient ryaClient = 
AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
 
-        ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+        ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
 
         // Write the data to Rya.
         final SailRepositoryConnection ryaConn = 
super.getRyaSailRepository().getConnection();
@@ -461,7 +456,7 @@ public class QueryIT extends RyaExportITBase {
         super.getMiniFluo().waitForObservers();
 
         // Fetch the value that is stored within the PCJ table.
-        try(final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) {
+        try(final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) {
             final String pcjId = pcjStorage.listPcjs().get(0);
             final Set<BindingSet> results = Sets.newHashSet( 
pcjStorage.listResults(pcjId) );
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
index 747f6e5..9c21afd 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java
@@ -105,12 +105,12 @@ public class RyaExportIT extends RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             new InsertTriples().insert(fluoClient, streamedTriples, 
Optional.<String>absent());

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
index 3c74b13..a8d470f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java
@@ -94,12 +94,12 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Verify the end results of the query match the expected results.
             super.getMiniFluo().waitForObservers();
@@ -160,12 +160,12 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             super.getMiniFluo().waitForObservers();
 
@@ -231,12 +231,12 @@ public class RyaInputIncrementalUpdateIT extends 
RyaExportITBase {
 
         // Create the PCJ table.
         final Connector accumuloConn = super.getAccumuloConnector();
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
             // Tell the Fluo app to maintain the PCJ.
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             super.getMiniFluo().waitForObservers();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
index 52d6caa..72759bb 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java
@@ -56,11 +56,11 @@ public class StreamingTestIT extends RyaExportITBase {
            try (FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
                // Create the PCJ table.
                final Connector accumuloConn = super.getAccumuloConnector();
-               final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+               final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
                final String pcjId = pcjStorage.createPcj(sparql);
 
                // Task the Fluo app with the PCJ.
-               new CreatePcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+               new CreatePcj().withRyaIntegration(pcjId, pcjStorage, 
fluoClient, accumuloConn, getRyaInstanceName());
 
                // Add Statements to the Fluo app.
                log.info("Adding Join Pairs...");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
index 30b6842..150492f 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java
@@ -67,7 +67,7 @@ public class HistoricStreamingVisibilityIT extends 
RyaExportITBase {
               "}";
 
         final Connector accumuloConn = super.getAccumuloConnector();
-        
accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new 
Authorizations("U","V","W"));
+        
accumuloConn.securityOperations().changeUserAuthorizations(getUsername(), new 
Authorizations("U","V","W"));
         final AccumuloRyaDAO dao = new AccumuloRyaDAO();
         dao.setConnector(accumuloConn);
         dao.setConf(makeConfig());
@@ -103,11 +103,11 @@ public class HistoricStreamingVisibilityIT extends 
RyaExportITBase {
         expected.add(bs);
 
         // Create the PCJ table.
-        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = pcjStorage.createPcj(sparql);
 
         try(FluoClient fluoClient = 
FluoFactory.newClient(super.getFluoConfiguration())) {
-            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
         }
 
         // Verify the end results of the query match the expected results.
@@ -119,10 +119,10 @@ public class HistoricStreamingVisibilityIT extends 
RyaExportITBase {
 
     private AccumuloRdfConfiguration makeConfig() {
         final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix(RYA_INSTANCE_NAME);
+        conf.setTablePrefix(getRyaInstanceName());
         // Accumulo connection information.
-        conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
+        conf.set(ConfigUtils.CLOUDBASE_USER, getUsername());
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, getPassword());
         conf.set(ConfigUtils.CLOUDBASE_INSTANCE, 
super.getMiniAccumuloCluster().getInstanceName());
         conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, 
super.getMiniAccumuloCluster().getZooKeepers());
         conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index e7ced90..46bc7b0 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -49,6 +49,7 @@ import 
org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
 import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig;
 import org.apache.rya.indexing.pcj.fluo.RyaExportITBase;
 import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
 import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
@@ -102,24 +103,36 @@ public class PcjVisibilityIT extends RyaExportITBase {
         final String instanceName = 
super.getMiniAccumuloCluster().getInstanceName();
         final String zookeepers = 
super.getMiniAccumuloCluster().getZooKeepers();
 
-        final RyaClient ryaClient = AccumuloRyaClientFactory.build(new 
AccumuloConnectionDetails(
-                ACCUMULO_USER,
-                ACCUMULO_PASSWORD.toCharArray(),
-                instanceName,
-                zookeepers), accumuloConn);
+        final RyaClient ryaClient = 
AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
 
-        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql);
+        final String pcjId = 
ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
 
         // Grant the root user the "u" authorization.
-        
super.getAccumuloConnector().securityOperations().changeUserAuthorizations(ACCUMULO_USER,
 new Authorizations("u"));
+        
super.getAccumuloConnector().securityOperations().changeUserAuthorizations(getUsername(),
 new Authorizations("u"));
 
         // Setup a connection to the Rya instance that uses the "u" 
authorizations. This ensures
         // any statements that are inserted will have the "u" authorization on 
them and that the
         // PCJ updating application will have to maintain visibilities.
-        final AccumuloRdfConfiguration ryaConf = 
super.makeConfig(instanceName, zookeepers);
+        final AccumuloRdfConfiguration ryaConf = new 
AccumuloRdfConfiguration();
+        ryaConf.setTablePrefix(getRyaInstanceName());
+
+        // Accumulo connection information.
+        ryaConf.setAccumuloUser(getUsername());
+        ryaConf.setAccumuloPassword(getPassword());
+        
ryaConf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName());
+        
ryaConf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers());
         ryaConf.set(ConfigUtils.CLOUDBASE_AUTHS, "u");
         ryaConf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "u");
 
+        // PCJ configuration information.
+        ryaConf.set(ConfigUtils.USE_PCJ, "true");
+        ryaConf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true");
+        ryaConf.set(ConfigUtils.FLUO_APP_NAME, 
super.getFluoConfiguration().getApplicationName());
+        ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE,
+                
PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString());
+        ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE,
+                
PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString());
+
         Sail sail = null;
         RyaSailRepository ryaRepo = null;
         RepositoryConnection ryaConn = null;
@@ -138,7 +151,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
             super.getMiniFluo().waitForObservers();
 
             // Fetch the exported result and show that its column visibility 
has been simplified.
-            final String pcjTableName = new 
PcjTableNameFactory().makeTableName(RYA_INSTANCE_NAME, pcjId);
+            final String pcjTableName = new 
PcjTableNameFactory().makeTableName(getRyaInstanceName(), pcjId);
             final Scanner scan = accumuloConn.createScanner(pcjTableName, new 
Authorizations("u"));
             scan.fetchColumnFamily(new Text("customer;worker;city"));
 
@@ -202,13 +215,13 @@ public class PcjVisibilityIT extends RyaExportITBase {
         final Connector accumuloConn = super.getAccumuloConnector();
 
         // Create the PCJ Table in Accumulo.
-        final PrecomputedJoinStorage rootStorage = new 
AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+        final PrecomputedJoinStorage rootStorage = new 
AccumuloPcjStorage(accumuloConn, getRyaInstanceName());
         final String pcjId = rootStorage.createPcj(sparql);
 
 
         try( final FluoClient fluoClient = FluoFactory.newClient( 
super.getFluoConfiguration() )) {
             // Create the PCJ in Fluo.
-            new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, RYA_INSTANCE_NAME);
+            new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, 
accumuloConn, getRyaInstanceName());
 
             // Stream the data into Fluo.
             for(final RyaStatement statement : streamedTriples.keySet()) {
@@ -220,7 +233,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
         // Fetch the exported results from Accumulo once the observers finish 
working.
         super.getMiniFluo().waitForObservers();
 
-        setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId);
+        setupTestUsers(accumuloConn, getRyaInstanceName(), pcjId);
 
         // Verify ABCDE using root.
         final Set<BindingSet> rootResults = toSet( 
rootStorage.listResults(pcjId));
@@ -256,7 +269,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
 
         // Verify AB
         final Connector abConn = cluster.getConnector("abUser", "password");
-        try(final PrecomputedJoinStorage abStorage = new 
AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME)) {
+        try(final PrecomputedJoinStorage abStorage = new 
AccumuloPcjStorage(abConn, getRyaInstanceName())) {
             final Set<BindingSet> abResults = toSet( 
abStorage.listResults(pcjId) );
 
             final Set<BindingSet> abExpected = Sets.newHashSet();
@@ -271,7 +284,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
 
         // Verify ABC
         final Connector abcConn = cluster.getConnector("abcUser", "password");
-        try(final PrecomputedJoinStorage abcStorage = new 
AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME)) {
+        try(final PrecomputedJoinStorage abcStorage = new 
AccumuloPcjStorage(abcConn, getRyaInstanceName())) {
             final Set<BindingSet> abcResults = toSet( 
abcStorage.listResults(pcjId) );
 
             final Set<BindingSet> abcExpected = Sets.newHashSet();
@@ -292,7 +305,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
 
         // Verify ADE
         final Connector adeConn = cluster.getConnector("adeUser", "password");
-        try(final PrecomputedJoinStorage adeStorage = new 
AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME)) {
+        try(final PrecomputedJoinStorage adeStorage = new 
AccumuloPcjStorage(adeConn, getRyaInstanceName())) {
             final Set<BindingSet> adeResults = toSet( 
adeStorage.listResults(pcjId) );
 
             final Set<BindingSet> adeExpected = Sets.newHashSet();
@@ -307,7 +320,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
 
         // Verify no auths.
         final Connector noAuthConn = cluster.getConnector("noAuth", 
"password");
-        try(final PrecomputedJoinStorage noAuthStorage = new 
AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME)) {
+        try(final PrecomputedJoinStorage noAuthStorage = new 
AccumuloPcjStorage(noAuthConn, getRyaInstanceName())) {
             final Set<BindingSet> noAuthResults = toSet( 
noAuthStorage.listResults(pcjId) );
             assertTrue( noAuthResults.isEmpty() );
         }

Reply via email to