Repository: incubator-rya Updated Branches: refs/heads/master 643608ca1 -> c9b66c7c3
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java index 2bcce65..e43ab83 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java @@ -41,6 +41,8 @@ import org.apache.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.accumulo.MiniAccumuloSingleton; +import org.apache.rya.accumulo.RyaTestInstanceRule; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; @@ -53,6 +55,7 @@ import org.apache.zookeeper.ClientCnxn; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.impl.LiteralImpl; @@ -83,15 +86,16 @@ public class PcjTablesIntegrationTest { private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - protected static final String RYA_TABLE_PREFIX = "demo_"; - // The MiniAccumuloCluster is re-used between tests. - private MiniAccumuloClusterInstance cluster; + private MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance(); // Rya data store and connections. protected RyaSailRepository ryaRepo = null; protected RepositoryConnection ryaConn = null; + @Rule + public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); + @BeforeClass public static void killLoudLogs() { Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); @@ -99,10 +103,6 @@ public class PcjTablesIntegrationTest { @Before public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException { - // Start the cluster. - cluster = new MiniAccumuloClusterInstance(); - cluster.startMiniAccumulo(); - // Setup the Rya library to use the Mini Accumulo. ryaRepo = setupRya(); ryaConn = ryaRepo.getConnection(); @@ -111,12 +111,15 @@ public class PcjTablesIntegrationTest { @After public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException { // Stop Rya. - ryaRepo.shutDown(); - - // Stop the cluster. - cluster.stopMiniAccumulo(); + if (ryaRepo != null) { + ryaRepo.shutDown(); + } } + private String getRyaInstanceName() { + return testInstance.getRyaInstanceName(); + } + /** * Format a Mini Accumulo to be a Rya repository. * @@ -130,11 +133,11 @@ public class PcjTablesIntegrationTest { // Setup Rya configuration values. final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_TABLE_PREFIX); + conf.setTablePrefix(getRyaInstanceName()); conf.setDisplayQueryPlan(true); conf.setBoolean(USE_MOCK_INSTANCE, false); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName()); conf.set(CLOUDBASE_USER, cluster.getUsername()); conf.set(CLOUDBASE_PASSWORD, cluster.getPassword()); conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName()); @@ -167,7 +170,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); @@ -198,7 +201,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); @@ -249,7 +252,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); @@ -323,7 +326,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); @@ -393,7 +396,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); // Create and populate the PCJ table. final PcjTables pcjs = new PcjTables(); @@ -476,7 +479,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); @@ -516,7 +519,7 @@ public class PcjTablesIntegrationTest { final Connector accumuloConn = cluster.getConnector(); // Create a PCJ index. - final String tableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "thePcj"); + final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj"); final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java new file mode 100644 index 0000000..b5d9428 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java @@ -0,0 +1,282 @@ +package org.apache.rya.indexing.pcj.fluo; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.UnknownHostException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.accumulo.MiniAccumuloSingleton; +import org.apache.rya.accumulo.RyaTestInstanceRule; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloInstall; +import org.apache.zookeeper.ClientCnxn; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import org.apache.fluo.api.client.FluoAdmin; +import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoFactory; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.api.mini.MiniFluo; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.DuplicateInstanceNameException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; + +/** + * Integration tests that ensure the Fluo application processes PCJs results + * correctly. + * <p> + * This class is being ignored because it doesn't contain any unit tests. + */ +public abstract class FluoITBase { + private static final Logger log = Logger.getLogger(FluoITBase.class); + + // Mini Accumulo Cluster + private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance(); + private static MiniAccumuloCluster cluster; + + private static String instanceName = null; + private static String zookeepers = null; + + protected static Connector accumuloConn = null; + + // Fluo data store and connections. + protected MiniFluo fluo = null; + protected FluoConfiguration fluoConfig = null; + protected FluoClient fluoClient = null; + + // Rya data store and connections. + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + + @Rule + public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); + + @BeforeClass + public static void beforeClass() throws Exception { + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + + // Setup and start the Mini Accumulo. + cluster = clusterInstance.getCluster(); + + // Store a connector to the Mini Accumulo. + instanceName = cluster.getInstanceName(); + zookeepers = cluster.getZooKeepers(); + + final Instance instance = new ZooKeeperInstance(instanceName, zookeepers); + accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword())); + } + + @Before + public void setupMiniResources() throws Exception { + // Initialize the Mini Fluo that will be used to store created queries. + fluoConfig = createFluoConfig(); + preFluoInitHook(); + FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions() + .setClearTable(true) + .setClearZookeeper(true)); + postFluoInitHook(); + fluo = FluoFactory.newMiniFluo(fluoConfig); + fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); + + // Initialize the Rya that will be used by the tests. + ryaRepo = setupRya(); + ryaConn = ryaRepo.getConnection(); + } + + @After + public void shutdownMiniResources() { + if (ryaConn != null) { + try { + log.info("Shutting down Rya Connection."); + ryaConn.close(); + log.info("Rya Connection shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Connection.", e); + } + } + + if (ryaRepo != null) { + try { + log.info("Shutting down Rya Repo."); + ryaRepo.shutDown(); + log.info("Rya Repo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Repo.", e); + } + } + + if (fluoClient != null) { + try { + log.info("Shutting down Fluo Client."); + fluoClient.close(); + log.info("Fluo Client shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Fluo Client.", e); + } + } + + if (fluo != null) { + try { + log.info("Shutting down Mini Fluo."); + fluo.close(); + log.info("Mini Fluo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo.", e); + } + } + } + + protected void preFluoInitHook() throws Exception { + + } + + protected void postFluoInitHook() throws Exception { + + } + + protected MiniAccumuloCluster getMiniAccumuloCluster() { + return cluster; + } + + protected MiniFluo getMiniFluo() { + return fluo; + } + + public RyaSailRepository getRyaSailRepository() { + return ryaRepo; + } + + public Connector getAccumuloConnector() { + return accumuloConn; + } + + public String getRyaInstanceName() { + return testInstance.getRyaInstanceName(); + } + + protected String getUsername() { + return clusterInstance.getUsername(); + } + + protected String getPassword() { + return clusterInstance.getPassword(); + } + + protected FluoConfiguration getFluoConfiguration() { + return fluoConfig; + } + + public AccumuloConnectionDetails createConnectionDetails() { + return new AccumuloConnectionDetails( + clusterInstance.getUsername(), + clusterInstance.getPassword().toCharArray(), + clusterInstance.getInstanceName(), + clusterInstance.getZookeepers()); + } + + private FluoConfiguration createFluoConfig() { + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(clusterInstance.getUsername()); + config.setAccumuloPassword(clusterInstance.getPassword()); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(getRyaInstanceName()); + config.setAccumuloTable("fluo" + getRyaInstanceName()); + return config; + } + + /** + * Sets up a Rya instance. + */ + protected RyaSailRepository setupRya() + throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, + NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException, + RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException { + checkNotNull(instanceName); + checkNotNull(zookeepers); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(getRyaInstanceName()); + conf.setDisplayQueryPlan(true); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); + conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername()); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword()); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers()); + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName()); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + + // Install the test instance of Rya. + final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setEnableGeoIndex(true) + .setFluoPcjAppName(getRyaInstanceName()) + .build(); + install.install(getRyaInstanceName(), installConfig); + + // Connect to the instance of Rya that was just installed. + final Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); + + return ryaRepo; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java index cd84cb9..3ed1844 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java @@ -93,8 +93,6 @@ public class KafkaExportITBase extends AccumuloExportITBase { /** * Add info about the Kafka queue/topic to receive the export. - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap) */ @Override protected void preFluoInitHook() throws Exception { @@ -128,8 +126,6 @@ public class KafkaExportITBase extends AccumuloExportITBase { /** * setup mini kafka and call the super to setup mini fluo - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources() */ @Before public void setupKafka() throws Exception { @@ -239,8 +235,6 @@ public class KafkaExportITBase extends AccumuloExportITBase { /** * Close all the Kafka mini server and mini-zookeeper - * - * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources() */ @After public void teardownKafka() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java index 5fe999f..84b6343 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java @@ -22,19 +22,10 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.recipes.test.AccumuloExportITBase; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; @@ -42,26 +33,12 @@ import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.rya.sail.config.RyaSailFactory; -import org.junit.After; -import org.junit.Before; import org.junit.BeforeClass; -import org.openrdf.sail.Sail; /** * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index. */ -public class RyaExportITBase extends AccumuloExportITBase { - - protected static final String RYA_INSTANCE_NAME = "test_"; - - private RyaSailRepository ryaSailRepo = null; - - public RyaExportITBase() { - // Indicates that MiniFluo should be started before each test. - super(true); - } +public class RyaExportITBase extends FluoITBase { @BeforeClass public static void setupLogging() { @@ -83,11 +60,11 @@ public class RyaExportITBase extends AccumuloExportITBase { final HashMap<String, String> exportParams = new HashMap<>(); final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); ryaParams.setExportToRya(true); - ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + ryaParams.setRyaInstanceName(getRyaInstanceName()); ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); - ryaParams.setExporterUsername(ACCUMULO_USER); - ryaParams.setExporterPassword(ACCUMULO_PASSWORD); + ryaParams.setExporterUsername(getUsername()); + ryaParams.setExporterPassword(getPassword()); final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); observers.add(exportObserverConfig); @@ -96,87 +73,4 @@ public class RyaExportITBase extends AccumuloExportITBase { super.getFluoConfiguration().addObservers(observers); } - @Before - public void setupRya() throws Exception { - final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); - final String instanceName = cluster.getInstanceName(); - final String zookeepers = cluster.getZooKeepers(); - - // Install the Rya instance to the mini accumulo cluster. - final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), - super.getAccumuloConnector()); - - ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() - .setEnableTableHashPrefix(false) - .setEnableFreeTextIndex(false) - .setEnableEntityCentricIndex(false) - .setEnableGeoIndex(false) - .setEnableTemporalIndex(false) - .setEnablePcjIndex(true) - .setFluoPcjAppName( super.getFluoConfiguration().getApplicationName() ) - .build()); - - // Connect to the Rya instance that was just installed. - final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); - final Sail sail = RyaSailFactory.getInstance(conf); - ryaSailRepo = new RyaSailRepository(sail); - } - - @After - public void teardownRya() throws Exception { - final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); - final String instanceName = cluster.getInstanceName(); - final String zookeepers = cluster.getZooKeepers(); - - // Uninstall the instance of Rya. - final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), - super.getAccumuloConnector()); - - ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); - - // Shutdown the repo. - ryaSailRepo.shutDown(); - } - - /** - * @return A {@link RyaSailRepository} that is connected to the Rya instance that statements are loaded into. - */ - protected RyaSailRepository getRyaSailRepository() throws Exception { - return ryaSailRepo; - } - - protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_INSTANCE_NAME); - - // Accumulo connection information. - conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); - conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); - conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); - conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); - conf.setAuths(""); - - // PCJ configuration information. - conf.set(ConfigUtils.USE_PCJ, "true"); - conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); - conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, - PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - - conf.setDisplayQueryPlan(true); - - return conf; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index 0aceaa3..9a1c285 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -63,12 +63,12 @@ public class GetPcjMetadataIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); @@ -84,7 +84,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { @Test public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException { final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Add a couple of queries to Accumulo. @@ -96,7 +96,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { "}"; final String q1PcjId = pcjStorage.createPcj(q1Sparql); final CreatePcj createPcj = new CreatePcj(); - createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); final String q2Sparql = "SELECT ?x ?y " + @@ -105,7 +105,7 @@ public class GetPcjMetadataIT extends RyaExportITBase { "?y <http://worksAt> <http://Chipotle>." + "}"; final String q2PcjId = pcjStorage.createPcj(q2Sparql); - createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure the command returns the correct metadata. final Set<PcjMetadata> expected = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 10f2319..d56c23a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -78,13 +78,14 @@ public class GetQueryReportIT extends RyaExportITBase { new RyaStatement(new RyaURI("http://Frank"), new RyaURI("http://livesIn"), new RyaURI("http://Lost County"))); // Create the PCJ table. + final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 21d7db0..349d391 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -26,8 +26,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; @@ -36,7 +34,6 @@ import org.apache.fluo.api.client.scanner.RowScanner; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; @@ -130,16 +127,9 @@ public class CreateDeleteIT extends RyaExportITBase { requireNonNull(statements); // Register the PCJ with Rya. - final Instance accInstance = super.getAccumuloConnector().getInstance(); - final Connector accumuloConn = super.getAccumuloConnector(); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector()); - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - accInstance.getInstanceName(), - accInstance.getZooKeepers()), accumuloConn); - - final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index ab97bbd..05dfd32 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -97,12 +97,12 @@ public class InputIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Verify the end results of the query match the expected results. super.getMiniFluo().waitForObservers(); @@ -157,12 +157,12 @@ public class InputIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure the query has no results yet. super.getMiniFluo().waitForObservers(); @@ -223,12 +223,12 @@ public class InputIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure Alice is a match. super.getMiniFluo().waitForObservers(); @@ -312,12 +312,12 @@ public class InputIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Ensure Alice is a match. super.getMiniFluo().waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 08bf2e1..f815a55 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -439,16 +439,11 @@ public class QueryIT extends RyaExportITBase { requireNonNull(expectedResults); // Register the PCJ with Rya. - final Instance accInstance = super.getAccumuloConnector().getInstance(); final Connector accumuloConn = super.getAccumuloConnector(); - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - accInstance.getInstanceName(), - accInstance.getZooKeepers()), accumuloConn); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn); - ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); @@ -461,7 +456,7 @@ public class QueryIT extends RyaExportITBase { super.getMiniFluo().waitForObservers(); // Fetch the value that is stored within the PCJ table. - try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) { + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) { final String pcjId = pcjStorage.listPcjs().get(0); final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 747f6e5..9c21afd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -105,12 +105,12 @@ public class RyaExportIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index 3c74b13..a8d470f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -94,12 +94,12 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Verify the end results of the query match the expected results. super.getMiniFluo().waitForObservers(); @@ -160,12 +160,12 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); super.getMiniFluo().waitForObservers(); @@ -231,12 +231,12 @@ public class RyaInputIncrementalUpdateIT extends RyaExportITBase { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); super.getMiniFluo().waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index 52d6caa..72759bb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -56,11 +56,11 @@ public class StreamingTestIT extends RyaExportITBase { try (FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { // Create the PCJ table. final Connector accumuloConn = super.getAccumuloConnector(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); // Task the Fluo app with the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Add Statements to the Fluo app. log.info("Adding Join Pairs..."); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java index 30b6842..150492f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java @@ -67,7 +67,7 @@ public class HistoricStreamingVisibilityIT extends RyaExportITBase { "}"; final Connector accumuloConn = super.getAccumuloConnector(); - accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("U","V","W")); + accumuloConn.securityOperations().changeUserAuthorizations(getUsername(), new Authorizations("U","V","W")); final AccumuloRyaDAO dao = new AccumuloRyaDAO(); dao.setConnector(accumuloConn); dao.setConf(makeConfig()); @@ -103,11 +103,11 @@ public class HistoricStreamingVisibilityIT extends RyaExportITBase { expected.add(bs); // Create the PCJ table. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = pcjStorage.createPcj(sparql); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, getRyaInstanceName()); } // Verify the end results of the query match the expected results. @@ -119,10 +119,10 @@ public class HistoricStreamingVisibilityIT extends RyaExportITBase { private AccumuloRdfConfiguration makeConfig() { final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_INSTANCE_NAME); + conf.setTablePrefix(getRyaInstanceName()); // Accumulo connection information. - conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD); + conf.set(ConfigUtils.CLOUDBASE_USER, getUsername()); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, getPassword()); conf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getMiniAccumuloCluster().getInstanceName()); conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getMiniAccumuloCluster().getZooKeepers()); conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d3ba442f/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index e7ced90..46bc7b0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -49,6 +49,7 @@ import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; @@ -102,24 +103,36 @@ public class PcjVisibilityIT extends RyaExportITBase { final String instanceName = super.getMiniAccumuloCluster().getInstanceName(); final String zookeepers = super.getMiniAccumuloCluster().getZooKeepers(); - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails( - ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), - instanceName, - zookeepers), accumuloConn); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn); - final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); // Grant the root user the "u" authorization. - super.getAccumuloConnector().securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("u")); + super.getAccumuloConnector().securityOperations().changeUserAuthorizations(getUsername(), new Authorizations("u")); // Setup a connection to the Rya instance that uses the "u" authorizations. This ensures // any statements that are inserted will have the "u" authorization on them and that the // PCJ updating application will have to maintain visibilities. - final AccumuloRdfConfiguration ryaConf = super.makeConfig(instanceName, zookeepers); + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix(getRyaInstanceName()); + + // Accumulo connection information. + ryaConf.setAccumuloUser(getUsername()); + ryaConf.setAccumuloPassword(getPassword()); + ryaConf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); + ryaConf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); ryaConf.set(ConfigUtils.CLOUDBASE_AUTHS, "u"); ryaConf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "u"); + // PCJ configuration information. + ryaConf.set(ConfigUtils.USE_PCJ, "true"); + ryaConf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); + ryaConf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); + ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, + PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + Sail sail = null; RyaSailRepository ryaRepo = null; RepositoryConnection ryaConn = null; @@ -138,7 +151,7 @@ public class PcjVisibilityIT extends RyaExportITBase { super.getMiniFluo().waitForObservers(); // Fetch the exported result and show that its column visibility has been simplified. - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_INSTANCE_NAME, pcjId); + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), pcjId); final Scanner scan = accumuloConn.createScanner(pcjTableName, new Authorizations("u")); scan.fetchColumnFamily(new Text("customer;worker;city")); @@ -202,13 +215,13 @@ public class PcjVisibilityIT extends RyaExportITBase { final Connector accumuloConn = super.getAccumuloConnector(); // Create the PCJ Table in Accumulo. - final PrecomputedJoinStorage rootStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PrecomputedJoinStorage rootStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName()); final String pcjId = rootStorage.createPcj(sparql); try( final FluoClient fluoClient = FluoFactory.newClient( super.getFluoConfiguration() )) { // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, getRyaInstanceName()); // Stream the data into Fluo. for(final RyaStatement statement : streamedTriples.keySet()) { @@ -220,7 +233,7 @@ public class PcjVisibilityIT extends RyaExportITBase { // Fetch the exported results from Accumulo once the observers finish working. super.getMiniFluo().waitForObservers(); - setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId); + setupTestUsers(accumuloConn, getRyaInstanceName(), pcjId); // Verify ABCDE using root. final Set<BindingSet> rootResults = toSet( rootStorage.listResults(pcjId)); @@ -256,7 +269,7 @@ public class PcjVisibilityIT extends RyaExportITBase { // Verify AB final Connector abConn = cluster.getConnector("abUser", "password"); - try(final PrecomputedJoinStorage abStorage = new AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME)) { + try(final PrecomputedJoinStorage abStorage = new AccumuloPcjStorage(abConn, getRyaInstanceName())) { final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) ); final Set<BindingSet> abExpected = Sets.newHashSet(); @@ -271,7 +284,7 @@ public class PcjVisibilityIT extends RyaExportITBase { // Verify ABC final Connector abcConn = cluster.getConnector("abcUser", "password"); - try(final PrecomputedJoinStorage abcStorage = new AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME)) { + try(final PrecomputedJoinStorage abcStorage = new AccumuloPcjStorage(abcConn, getRyaInstanceName())) { final Set<BindingSet> abcResults = toSet( abcStorage.listResults(pcjId) ); final Set<BindingSet> abcExpected = Sets.newHashSet(); @@ -292,7 +305,7 @@ public class PcjVisibilityIT extends RyaExportITBase { // Verify ADE final Connector adeConn = cluster.getConnector("adeUser", "password"); - try(final PrecomputedJoinStorage adeStorage = new AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME)) { + try(final PrecomputedJoinStorage adeStorage = new AccumuloPcjStorage(adeConn, getRyaInstanceName())) { final Set<BindingSet> adeResults = toSet( adeStorage.listResults(pcjId) ); final Set<BindingSet> adeExpected = Sets.newHashSet(); @@ -307,7 +320,7 @@ public class PcjVisibilityIT extends RyaExportITBase { // Verify no auths. final Connector noAuthConn = cluster.getConnector("noAuth", "password"); - try(final PrecomputedJoinStorage noAuthStorage = new AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME)) { + try(final PrecomputedJoinStorage noAuthStorage = new AccumuloPcjStorage(noAuthConn, getRyaInstanceName())) { final Set<BindingSet> noAuthResults = toSet( noAuthStorage.listResults(pcjId) ); assertTrue( noAuthResults.isEmpty() ); }
