http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetailsIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetailsIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetailsIT.java new file mode 100644 index 0000000..a7721d9 --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetailsIT.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.Date; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.junit.Test; + +import com.google.common.base.Optional; + +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.Install.DuplicateInstanceNameException; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; + +/** + * Tests the methods of {@link AccumuloGetInstanceDetails}. + */ +public class AccumuloGetInstanceDetailsIT extends AccumuloITBase { + + @Test + public void getDetails() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException { + // Install an instance of Rya. + final String instanceName = "instance_name"; + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setEnableGeoIndex(true) + .setFluoPcjAppName("fluo_app_name") + .build(); + + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final Install install = new AccumuloInstall(connectionDetails, getConnector()); + install.install(instanceName, installConfig); + + // Verify the correct details were persisted. + final GetInstanceDetails getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, getConnector()); + final Optional<RyaDetails> details = getInstanceDetails.getDetails(instanceName); + + final RyaDetails expectedDetails = RyaDetails.builder() + .setRyaInstanceName(instanceName) + + // The version depends on how the test is packaged, so just grab whatever was stored. + .setRyaVersion( details.get().getRyaVersion() ) + + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails(new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("fluo_app_name") )) + .setProspectorDetails( new ProspectorDetails(Optional.<Date>absent()) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.<Date>absent()) ) + .build(); + + assertEquals(expectedDetails, details.get()); + } + + @Test(expected = InstanceDoesNotExistException.class) + public void getDetails_instanceDoesNotExist() throws AccumuloException, AccumuloSecurityException, InstanceDoesNotExistException, RyaClientException { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final GetInstanceDetails getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, getConnector()); + getInstanceDetails.getDetails("instance_name"); + } + + @Test + public void getDetails_instanceDoesNotHaveDetails() throws AccumuloException, AccumuloSecurityException, InstanceDoesNotExistException, RyaClientException, TableExistsException { + // Mimic a pre-details rya install. + final String instanceName = "instance_name"; + + final TableOperations tableOps = getConnector().tableOperations(); + + final String spoTableName = instanceName + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + final String ospTableName = instanceName + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; + final String poTableName = instanceName + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; + tableOps.create(spoTableName); + tableOps.create(ospTableName); + tableOps.create(poTableName); + + // Verify that the operation returns empty. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final GetInstanceDetails getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, getConnector()); + final Optional<RyaDetails> details = getInstanceDetails.getDetails(instanceName); + assertFalse( details.isPresent() ); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstallIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstallIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstallIT.java new file mode 100644 index 0000000..da212b9 --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstallIT.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.junit.Test; + +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.Install.DuplicateInstanceNameException; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * Integration tests the methods of {@link AccumuloInstall}. + */ +public class AccumuloInstallIT extends AccumuloITBase { + + @Test + public void install() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException { + // Install an instance of Rya. + final String instanceName = "testInstance_"; + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .setFluoPcjAppName("fluo_app_name") + .build(); + + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final Install install = new AccumuloInstall(connectionDetails, getConnector()); + install.install(instanceName, installConfig); + + // Check that the instance exists. + final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); + instanceExists.exists(instanceName); + } + + @Test(expected = DuplicateInstanceNameException.class) + public void install_alreadyExists() throws DuplicateInstanceNameException, RyaClientException, AccumuloException, AccumuloSecurityException { + // Install an instance of Rya. + final String instanceName = "testInstance_"; + final InstallConfiguration installConfig = InstallConfiguration.builder().build(); + + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final Install install = new AccumuloInstall(connectionDetails, getConnector()); + install.install(instanceName, installConfig); + + // Install it again. + install.install(instanceName, installConfig); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstanceExistsIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstanceExistsIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstanceExistsIT.java new file mode 100644 index 0000000..528affd --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloInstanceExistsIT.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.junit.Test; + +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.RyaClientException; + +/** + * Integration tests the methods of {@link AccumuloInstanceExists}. + */ +public class AccumuloInstanceExistsIT extends AccumuloITBase { + + @Test + public void exists_ryaDetailsTable() throws AccumuloException, AccumuloSecurityException, RyaClientException, TableExistsException { + final Connector connector = getConnector(); + final TableOperations tableOps = connector.tableOperations(); + + // Create the Rya instance's Rya details table. + final String instanceName = "test_instance_"; + final String ryaDetailsTable = instanceName + AccumuloRyaInstanceDetailsRepository.INSTANCE_DETAILS_TABLE_NAME; + tableOps.create(ryaDetailsTable); + + // Verify the command reports the instance exists. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final AccumuloInstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); + assertTrue( instanceExists.exists(instanceName) ); + } + + @Test + public void exists_dataTables() throws AccumuloException, AccumuloSecurityException, RyaClientException, TableExistsException { + final Connector connector = getConnector(); + final TableOperations tableOps = connector.tableOperations(); + + // Create the Rya instance's Rya details table. + final String instanceName = "test_instance_"; + + final String spoTableName = instanceName + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + final String ospTableName = instanceName + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; + final String poTableName = instanceName + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; + tableOps.create(spoTableName); + tableOps.create(ospTableName); + tableOps.create(poTableName); + + // Verify the command reports the instance exists. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final AccumuloInstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); + assertTrue( instanceExists.exists(instanceName) ); + } + + @Test + public void doesNotExist() throws RyaClientException, AccumuloException, AccumuloSecurityException { + // Verify the command reports the instance does not exists. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final AccumuloInstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); + assertFalse( instanceExists.exists("some_instance") ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloListInstancesIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloListInstancesIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloListInstancesIT.java new file mode 100644 index 0000000..7101148 --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloListInstancesIT.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.junit.Test; + +import com.beust.jcommander.internal.Lists; + +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.ListInstances; +import mvm.rya.api.client.RyaClientException; + +/** + * Integration tests the methods of {@link AccumuloListInstances}. + */ +public class AccumuloListInstancesIT extends AccumuloITBase { + + @Test + public void listInstances_hasRyaDetailsTable() throws AccumuloException, AccumuloSecurityException, RyaClientException { + // Install a few instances of Rya using the install command. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final Install install = new AccumuloInstall(connectionDetails, getConnector()); + install.install("instance1_", InstallConfiguration.builder().build()); + install.install("instance2_", InstallConfiguration.builder().build()); + install.install("instance3_", InstallConfiguration.builder().build()); + + // Fetch the list and verify it matches what is expected. + final ListInstances listInstances = new AccumuloListInstances(connectionDetails, getConnector()); + final List<String> instances = listInstances.listInstances(); + Collections.sort(instances); + + final List<String> expected = Lists.newArrayList("instance1_", "instance2_", "instance3_"); + assertEquals(expected, instances); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java new file mode 100644 index 0000000..0dcc04c --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/FluoITBase.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.zookeeper.ClientCnxn; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import com.google.common.io.Files; + +import io.fluo.api.client.FluoAdmin; +import io.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import io.fluo.api.client.FluoAdmin.TableExistsException; +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.Install.DuplicateInstanceNameException; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; +import mvm.rya.rdftriplestore.RyaSailRepository; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Integration tests that ensure the Fluo application processes PCJs results + * correctly. + * <p> + * This class is being ignored because it doesn't contain any unit tests. + */ +public abstract class FluoITBase { + private static final Logger log = Logger.getLogger(FluoITBase.class); + + protected static final String RYA_INSTANCE_NAME = "test_"; + + protected static final String ACCUMULO_USER = "root"; + protected static final String ACCUMULO_PASSWORD = "password"; + + // Mini Accumulo Cluster + protected MiniAccumuloCluster cluster; + protected static Connector accumuloConn = null; + protected String instanceName = null; + protected String zookeepers = null; + + // Fluo data store and connections. + protected MiniFluo fluo = null; + protected FluoClient fluoClient = null; + protected final String appName = "IntegrationTests"; + + // Rya data store and connections. + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + + @BeforeClass + public static void killLoudLogs() { + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + } + + @Before + public void setupMiniResources() + throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException, + RyaDAOException, NumberFormatException, InferenceEngineException, AlreadyInitializedException, + TableExistsException, AlreadyInitializedException, RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException { + // Initialize the Mini Accumulo that will be used to host Rya and Fluo. + setupMiniAccumulo(); + + // Initialize the Mini Fluo that will be used to store created queries. + fluo = startMiniFluo(); + fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); + + // Initialize the Rya that will be used by the tests. + ryaRepo = setupRya(ACCUMULO_USER, ACCUMULO_PASSWORD, instanceName, zookeepers, appName); + ryaConn = ryaRepo.getConnection(); + } + + @After + public void shutdownMiniResources() { + if (ryaConn != null) { + try { + log.info("Shutting down Rya Connection."); + ryaConn.close(); + log.info("Rya Connection shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Connection.", e); + } + } + + if (ryaRepo != null) { + try { + log.info("Shutting down Rya Repo."); + ryaRepo.shutDown(); + log.info("Rya Repo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Rya Repo.", e); + } + } + + if (fluoClient != null) { + try { + log.info("Shutting down Fluo Client."); + fluoClient.close(); + log.info("Fluo Client shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Fluo Client.", e); + } + } + + if (fluo != null) { + try { + log.info("Shutting down Mini Fluo."); + fluo.close(); + log.info("Mini Fluo shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo.", e); + } + } + + if(cluster != null) { + try { + log.info("Shutting down the Mini Accumulo being used as a Rya store."); + cluster.stop(); + log.info("Mini Accumulo being used as a Rya store shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } + } + + private void setupMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + final File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, ACCUMULO_PASSWORD); + cluster = new MiniAccumuloCluster(cfg); + cluster.start(); + + // Store a connector to the Mini Accumulo. + instanceName = cluster.getInstanceName(); + zookeepers = cluster.getZooKeepers(); + + final Instance instance = new ZooKeeperInstance(instanceName, zookeepers); + accumuloConn = instance.getConnector(ACCUMULO_USER, new PasswordToken(ACCUMULO_PASSWORD)); + } + + /** + * Override this method to provide an output configuration to the Fluo application. + * <p> + * Exports to the Rya instance by default. + * + * @return The parameters that will be passed to {@link QueryResultObserver} at startup. + */ + protected Map<String, String> makeExportParams() { + final HashMap<String, String> params = new HashMap<>(); + + final RyaExportParameters ryaParams = new RyaExportParameters(params); + ryaParams.setExportToRya(true); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(ACCUMULO_USER); + ryaParams.setExporterPassword(ACCUMULO_PASSWORD); + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); + return params; + } + + /** + * Setup a Mini Fluo cluster that uses a temporary directory to store its + * data. + * + * @return A Mini Fluo cluster. + */ + protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverConfiguration> observers = new ArrayList<>(); + observers.add(new ObserverConfiguration(TripleObserver.class.getName())); + observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName())); + observers.add(new ObserverConfiguration(JoinObserver.class.getName())); + observers.add(new ObserverConfiguration(FilterObserver.class.getName())); + + // Provide export parameters child test classes may provide to the + // export observer. + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration( + QueryResultObserver.class.getName()); + exportObserverConfig.setParameters(makeExportParams()); + observers.add(exportObserverConfig); + + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(ACCUMULO_USER); + config.setAccumuloPassword(ACCUMULO_PASSWORD); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(appName); + config.setAccumuloTable("fluo" + appName); + + config.addObservers(observers); + + FluoFactory.newAdmin(config).initialize( + new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); + return FluoFactory.newMiniFluo(config); + } + + /** + * Sets up a Rya instance. + */ + protected RyaSailRepository setupRya(final String user, final String password, final String instanceName, final String zookeepers, final String appName) + throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, + NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException, + RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException { + checkNotNull(user); + checkNotNull(password); + checkNotNull(instanceName); + checkNotNull(zookeepers); + checkNotNull(appName); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + conf.setDisplayQueryPlan(true); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); + conf.set(ConfigUtils.CLOUDBASE_USER, user); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, password); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); + conf.set(ConfigUtils.USE_PCJ, "true"); + conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, appName); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + + // Install the test instance of Rya. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + cluster.getInstanceName(), + cluster.getZooKeepers()); + final Install install = new AccumuloInstall(connectionDetails, accumuloConn); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setEnableGeoIndex(true) + .setFluoPcjAppName(appName) + .build(); + install.install(RYA_INSTANCE_NAME, installConfig); + + // Connect to the instance of Rya that was just installed. + final Sail sail = RyaSailFactory.getInstance(conf); + final RyaSailRepository ryaRepo = new RyaSailRepository(sail); + + return ryaRepo; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloConstantPcjIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloConstantPcjIntegrationTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloConstantPcjIntegrationTest.java index 70ac0d1..c1d0b56 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloConstantPcjIntegrationTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloConstantPcjIntegrationTest.java @@ -73,7 +73,7 @@ public class AccumuloConstantPcjIntegrationTest { MalformedQueryException, AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, RyaDAOException, InferenceEngineException, - NumberFormatException, UnknownHostException { + NumberFormatException, UnknownHostException, SailException { repo = PcjIntegrationTestingUtil.getNonPcjRepo(prefix, "instance"); conn = repo.getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloPcjIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloPcjIntegrationTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloPcjIntegrationTest.java index 4d686fc..58f370e 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloPcjIntegrationTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/AccumuloPcjIntegrationTest.java @@ -26,17 +26,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; -import mvm.rya.indexing.pcj.matching.PCJOptimizer; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -75,12 +64,23 @@ import com.beust.jcommander.internal.Sets; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.pcj.matching.PCJOptimizer; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; + public class AccumuloPcjIntegrationTest { private SailRepositoryConnection conn, pcjConn; private SailRepository repo, pcjRepo; private Connector accCon; - private Configuration conf = getConf(); + private final Configuration conf = getConf(); private final String prefix = "table_"; private final String tablename = "table_INDEX_"; private URI obj, obj2, subclass, subclass2, talksTo; @@ -91,7 +91,7 @@ public class AccumuloPcjIntegrationTest { MalformedQueryException, AccumuloException, AccumuloSecurityException, TableExistsException, RyaDAOException, TableNotFoundException, InferenceEngineException, - NumberFormatException, UnknownHostException { + NumberFormatException, UnknownHostException, SailException { repo = PcjIntegrationTestingUtil.getNonPcjRepo(prefix, "instance"); conn = repo.getConnection(); @@ -1436,7 +1436,7 @@ public class AccumuloPcjIntegrationTest { private static Configuration getConf() { final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); - conf.setTablePrefix("rya_"); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); conf.set(ConfigUtils.CLOUDBASE_USER, "root"); conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ""); conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "instance"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java index 7798c51..5c48750 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PCJOptionalTestIT.java @@ -81,7 +81,7 @@ public class PCJOptionalTestIT { MalformedQueryException, AccumuloException, AccumuloSecurityException, TableExistsException, RyaDAOException, TableNotFoundException, InferenceEngineException, NumberFormatException, - UnknownHostException { + UnknownHostException, SailException { repo = PcjIntegrationTestingUtil.getNonPcjRepo(tablePrefix, "instance"); conn = repo.getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java index 5cc3f9a..f431c75 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java @@ -58,6 +58,7 @@ import org.openrdf.repository.RepositoryException; import org.openrdf.repository.sail.SailRepository; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; import com.google.common.base.Optional; import com.google.common.collect.Sets; @@ -98,7 +99,7 @@ public class PcjIntegrationTestingUtil { public static SailRepository getPcjRepo(final String tablePrefix, final String instance) throws AccumuloException, AccumuloSecurityException, RyaDAOException, RepositoryException, InferenceEngineException, - NumberFormatException, UnknownHostException { + NumberFormatException, UnknownHostException, SailException { final AccumuloRdfConfiguration pcjConf = new AccumuloRdfConfiguration(); pcjConf.set(ConfigUtils.USE_PCJ, "true"); @@ -107,20 +108,18 @@ public class PcjIntegrationTestingUtil { final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); final SailRepository pcjRepo = new SailRepository(pcjSail); - pcjRepo.initialize(); return pcjRepo; } public static SailRepository getNonPcjRepo(final String tablePrefix, final String instance) throws AccumuloException, AccumuloSecurityException, RyaDAOException, RepositoryException, InferenceEngineException, - NumberFormatException, UnknownHostException { + NumberFormatException, UnknownHostException, SailException { final AccumuloRdfConfiguration nonPcjConf = new AccumuloRdfConfiguration(); populateTestConfig(instance, tablePrefix, nonPcjConf); final Sail nonPcjSail = RyaSailFactory.getInstance(nonPcjConf); final SailRepository nonPcjRepo = new SailRepository(nonPcjSail); - nonPcjRepo.initialize(); return nonPcjRepo; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java index 6fea2ba..dbfe857 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java @@ -72,7 +72,7 @@ public class PrecompJoinOptimizerIntegrationTest { MalformedQueryException, AccumuloException, AccumuloSecurityException, TableExistsException, RyaDAOException, TableNotFoundException, InferenceEngineException, NumberFormatException, - UnknownHostException { + UnknownHostException, SailException { repo = PcjIntegrationTestingUtil.getNonPcjRepo(tablePrefix, "instance"); conn = repo.getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplierTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplierTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplierTest.java index 99b0e89..7bad774 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplierTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplierTest.java @@ -24,12 +24,12 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import com.google.common.base.Supplier; import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetColumnVisibilityTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetColumnVisibilityTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetColumnVisibilityTest.java index faedd07..44dc179 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetColumnVisibilityTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetColumnVisibilityTest.java @@ -1,6 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package mvm.rya.indexing.external.tupleSet; + +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; +import java.util.Date; import java.util.HashSet; import java.util.Set; @@ -16,224 +39,213 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.openrdf.model.impl.NumericLiteralImpl; import org.openrdf.model.impl.URIImpl; import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.impl.BindingImpl; import org.openrdf.repository.RepositoryException; +import com.google.common.base.Optional; import com.google.common.collect.Sets; import com.google.common.io.Files; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ import info.aduna.iteration.CloseableIteration; import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; /** - * XXX Fixed in RYA-82 + * Tests the evaluation of {@link AccumuloIndexSet}. */ -@Ignore public class AccumuloIndexSetColumnVisibilityTest { - - private static final Logger log = Logger - .getLogger(AccumuloIndexSetColumnVisibilityTest.class); - - private static Connector accCon; - private static String pcjTableName; - private static AccumuloPcjStorage storage; - private static Configuration conf; - private static final String sparql = "SELECT ?name ?age " + "{" - + "?name <http://hasAge> ?age ." - + "?name <http://playsSport> \"Soccer\" " + "}"; - private static QueryBindingSet pcjBs1, pcjBs2, pcjBs3; - private static MiniAccumuloCluster accumulo; - private static String instance; - private static String zooKeepers; - - @BeforeClass - public static void init() throws AccumuloException, - AccumuloSecurityException, PCJStorageException, IOException, - InterruptedException, TableNotFoundException { - accumulo = startMiniAccumulo(); - accumulo.getZooKeepers(); - instance = accumulo.getInstanceName(); - zooKeepers = accumulo.getZooKeepers(); - conf = getConf(); - accCon.securityOperations().changeUserAuthorizations("root", new Authorizations("U","USA")); - storage = new AccumuloPcjStorage(accCon, "rya_"); - final Set<VariableOrder> varOrders = new HashSet<>(); - varOrders.add(new VariableOrder("age;name")); - varOrders.add(new VariableOrder("name;age")); - pcjTableName = storage.createPcj(sparql, varOrders); - - final Binding exBinding1 = new BindingImpl("age", new NumericLiteralImpl(14, - XMLSchema.INTEGER)); - final Binding exBinding2 = new BindingImpl("name", - new URIImpl("http://Alice")); - final Binding exBinding3 = new BindingImpl("age", new NumericLiteralImpl(16, - XMLSchema.INTEGER)); - final Binding exBinding4 = new BindingImpl("name", new URIImpl("http://Bob")); - final Binding exBinding5 = new BindingImpl("age", new NumericLiteralImpl(34, - XMLSchema.INTEGER)); - final Binding exBinding6 = new BindingImpl("name", new URIImpl("http://Joe")); - - pcjBs1 = new QueryBindingSet(); - pcjBs1.addBinding(exBinding1); - pcjBs1.addBinding(exBinding2); - - pcjBs2 = new QueryBindingSet(); - pcjBs2.addBinding(exBinding3); - pcjBs2.addBinding(exBinding4); - - pcjBs3 = new QueryBindingSet(); - pcjBs3.addBinding(exBinding5); - pcjBs3.addBinding(exBinding6); - - final Set<BindingSet> bindingSets = new HashSet<>(); - bindingSets.add(pcjBs1); - bindingSets.add(pcjBs2); - bindingSets.add(pcjBs3); - - final Set<VisibilityBindingSet> visBs = new HashSet<>(); - for (final BindingSet bs : bindingSets) { - visBs.add(new VisibilityBindingSet(bs, "U|USA")); - } - - storage.addResults(pcjTableName, visBs); - -// Scanner scanner = accCon.createScanner(pcjTableName, new Authorizations("U","USA")); -// for(Entry<Key, Value> entry : scanner) { -// System.out.println(entry.getKey()); -// } - - - } - - @AfterClass - public static void close() throws RepositoryException, PCJStorageException { - storage.close(); - - if (accumulo != null) { - try { - log.info("Shutting down the Mini Accumulo being used as a Rya store."); - accumulo.stop(); - log.info("Mini Accumulo being used as a Rya store shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Mini Accumulo.", e); - } - } - } - - @Test - public void variableInstantiationTest() throws Exception { - - final AccumuloIndexSet ais = new AccumuloIndexSet(conf, pcjTableName); - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("name", new URIImpl("http://Alice")); - - final QueryBindingSet bs2 = new QueryBindingSet(); - bs2.addBinding("name", new URIImpl("http://Bob")); - - final Set<BindingSet> bSets = Sets.<BindingSet> newHashSet(bs, bs2); - - final CloseableIteration<BindingSet, QueryEvaluationException> results = ais - .evaluate(bSets); - - final Set<BindingSet> expected = new HashSet<>(); - expected.add(pcjBs1); - expected.add(pcjBs2); - final Set<BindingSet> fetchedResults = new HashSet<>(); - while (results.hasNext()) { - final BindingSet next = results.next(); - fetchedResults.add(next); - } - - Assert.assertEquals(expected, fetchedResults); - } - - @Test - public void accumuloIndexSetTestAttemptJoinAccrossTypes() throws Exception { - // Load some Triples into Rya. - final AccumuloIndexSet ais = new AccumuloIndexSet(conf, pcjTableName); - - final QueryBindingSet bs1 = new QueryBindingSet(); - bs1.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - final QueryBindingSet bs2 = new QueryBindingSet(); - bs2.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final Set<BindingSet> bSets = Sets.<BindingSet> newHashSet(bs1, bs2); - - final CloseableIteration<BindingSet, QueryEvaluationException> results = ais - .evaluate(bSets); - final Set<BindingSet> expected = new HashSet<>(); - expected.add(pcjBs1); - expected.add(pcjBs2); - final Set<BindingSet> fetchedResults = new HashSet<>(); - while (results.hasNext()) { - final BindingSet next = results.next(); - fetchedResults.add(next); - } - - Assert.assertEquals(expected, fetchedResults); - } - - private static MiniAccumuloCluster startMiniAccumulo() throws IOException, - InterruptedException, AccumuloException, AccumuloSecurityException { - final File miniDataDir = Files.createTempDir(); - - // Setup and start the Mini Accumulo. - final MiniAccumuloCluster accumulo = new MiniAccumuloCluster( - miniDataDir, "password"); - accumulo.start(); - - // Store a connector to the Mini Accumulo. - final Instance instance = new ZooKeeperInstance( - accumulo.getInstanceName(), accumulo.getZooKeepers()); - accCon = instance.getConnector("root", new PasswordToken("password")); - - return accumulo; - } - - private static Configuration getConf() { - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("rya_"); - conf.set(ConfigUtils.CLOUDBASE_USER, "root"); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "password"); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zooKeepers); - conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,USA"); - return conf; - } - -} + private static final Logger log = Logger.getLogger(AccumuloIndexSetColumnVisibilityTest.class); + + // Accumulo cluster resources. + private static MiniAccumuloCluster accumulo; + private static String instance; + private static String zooKeepers; + private static Connector accCon; + + // Rya resources. + private static String ryaInstanceName = "rya_"; + private static Configuration conf; + private static AccumuloPcjStorage storage; + + // PCJ values used when testing. + private static String pcjId; + private static QueryBindingSet pcjBs1; + private static QueryBindingSet pcjBs2; + + @BeforeClass + public static void init() throws AccumuloException, AccumuloSecurityException, PCJStorageException, IOException, InterruptedException, TableNotFoundException, AlreadyInitializedException, RyaDetailsRepositoryException { + // Setup the mini accumulo instance used by the test. + accumulo = startMiniAccumulo(); + accumulo.getZooKeepers(); + instance = accumulo.getInstanceName(); + zooKeepers = accumulo.getZooKeepers(); + conf = getConf(); + accCon.securityOperations().changeUserAuthorizations("root", new Authorizations("U","USA")); + + // Initialize the Rya Details for the Rya instance. + initRyaDetails(); + + // Initialize a PCJ. + storage = new AccumuloPcjStorage(accCon, ryaInstanceName); + + pcjId = storage.createPcj( + "SELECT ?name ?age " + "{" + + "?name <http://hasAge> ?age ." + + "?name <http://playsSport> \"Soccer\" " + + "}"); + + // Store the PCJ's results. + pcjBs1 = new QueryBindingSet(); + pcjBs1.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + pcjBs1.addBinding("name", new URIImpl("http://Alice")); + + pcjBs2 = new QueryBindingSet(); + pcjBs2.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + pcjBs2.addBinding("name", new URIImpl("http://Bob")); + + final Set<VisibilityBindingSet> visBs = new HashSet<>(); + for (final BindingSet bs : Sets.<BindingSet>newHashSet(pcjBs1, pcjBs2)) { + visBs.add(new VisibilityBindingSet(bs, "U|USA")); + } + + storage.addResults(pcjId, visBs); + } + + @AfterClass + public static void close() throws RepositoryException, PCJStorageException { + storage.close(); + + if (accumulo != null) { + try { + log.info("Shutting down the Mini Accumulo being used as a Rya store."); + accumulo.stop(); + log.info("Mini Accumulo being used as a Rya store shut down."); + } catch (final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } + } + + private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + final File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + final MiniAccumuloCluster accumulo = new MiniAccumuloCluster( + miniDataDir, "password"); + accumulo.start(); + + // Store a connector to the Mini Accumulo. + final Instance instance = new ZooKeeperInstance( + accumulo.getInstanceName(), accumulo.getZooKeepers()); + accCon = instance.getConnector("root", new PasswordToken("password")); + + return accumulo; + } + + private static void initRyaDetails() throws AlreadyInitializedException, RyaDetailsRepositoryException { + // Initialize the Rya Details for the instance. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accCon, ryaInstanceName); + + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(ryaInstanceName) + .setRyaVersion("0.0.0.0") + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.<Date>absent() ) ) + .setProspectorDetails( new ProspectorDetails( Optional.<Date>absent() )) + .build(); + + detailsRepo.initialize(details); + } + + private static Configuration getConf() { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, ryaInstanceName); + conf.set(ConfigUtils.CLOUDBASE_USER, "root"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "password"); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zooKeepers); + conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,USA"); + return conf; + } + + @Test + public void variableInstantiationTest() throws Exception { + // Setup the object that will be tested. + final String pcjTableName = new PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId); + final AccumuloIndexSet ais = new AccumuloIndexSet(conf, pcjTableName); + + // Setup the binding sets that will be evaluated. + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("name", new URIImpl("http://Alice")); + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("name", new URIImpl("http://Bob")); + + final Set<BindingSet> bSets = Sets.<BindingSet> newHashSet(bs, bs2); + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while (results.hasNext()) { + final BindingSet next = results.next(); + fetchedResults.add(next); + } + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(pcjBs1, pcjBs2); + assertEquals(expected, fetchedResults); + } + + @Test + public void accumuloIndexSetTestAttemptJoinAccrossTypes() throws Exception { + // Setup the object that will be tested. + final String pcjTableName = new PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId); + final AccumuloIndexSet ais = new AccumuloIndexSet(conf, pcjTableName); + + // Setup the binding sets that will be evaluated. + final QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final Set<BindingSet> bSets = Sets.<BindingSet> newHashSet(bs1, bs2); + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while (results.hasNext()) { + final BindingSet next = results.next(); + fetchedResults.add(next); + } + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(pcjBs1, pcjBs2); + assertEquals(expected, fetchedResults); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java index 483f9e0..2d4f137 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java @@ -81,14 +81,13 @@ public class AccumuloIndexSetTest { @Before public void init() throws AccumuloException, AccumuloSecurityException, RyaDAOException, RepositoryException, TableNotFoundException, - InferenceEngineException, NumberFormatException, UnknownHostException { + InferenceEngineException, NumberFormatException, UnknownHostException, SailException { accumuloConn = ConfigUtils.getConnector(conf); final TableOperations ops = accumuloConn.tableOperations(); if(ops.exists(prefix+"INDEX_"+ "testPcj")) { ops.delete(prefix+"INDEX_"+ "testPcj"); } ryaRepo = new RyaSailRepository(RyaSailFactory.getInstance(conf)); - ryaRepo.initialize(); ryaConn = ryaRepo.getConnection(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexingExample/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexingExample/pom.xml b/extras/indexingExample/pom.xml index 394af94..2c37057 100644 --- a/extras/indexingExample/pom.xml +++ b/extras/indexingExample/pom.xml @@ -64,6 +64,23 @@ under the License. <groupId>org.locationtech.geomesa</groupId> <artifactId>geomesa-accumulo-distributed-runtime</artifactId> </dependency> + + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-minicluster</artifactId> + <version>${accumulo.version}</version> + </dependency> + + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-mini</artifactId> + </dependency> + + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryrender</artifactId> + <version>${openrdf.sesame.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexingExample/src/main/java/EntityDirectExample.java ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/java/EntityDirectExample.java b/extras/indexingExample/src/main/java/EntityDirectExample.java index 6efd933..4187d8f 100644 --- a/extras/indexingExample/src/main/java/EntityDirectExample.java +++ b/extras/indexingExample/src/main/java/EntityDirectExample.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,11 +21,6 @@ import java.util.List; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.sail.config.RyaSailFactory; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -47,6 +42,10 @@ import org.openrdf.repository.sail.SailRepository; import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.Sail; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.sail.config.RyaSailFactory; + public class EntityDirectExample { private static final Logger log = Logger.getLogger(EntityDirectExample.class); @@ -60,8 +59,8 @@ public class EntityDirectExample { private static final String RYA_TABLE_PREFIX = "x_test_triplestore_"; private static final String AUTHS = "U"; - public static void main(String[] args) throws Exception { - Configuration conf = getConf(); + public static void main(final String[] args) throws Exception { + final Configuration conf = getConf(); conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, PRINT_QUERIES); log.info("Creating the tables as root."); @@ -71,9 +70,8 @@ public class EntityDirectExample { try { log.info("Connecting to Indexing Sail Repository."); - Sail extSail = RyaSailFactory.getInstance(conf); + final Sail extSail = RyaSailFactory.getInstance(conf); repository = new SailRepository(extSail); - repository.initialize(); conn = repository.getConnection(); log.info("Running SPARQL Example: Add and Delete"); @@ -88,21 +86,21 @@ public class EntityDirectExample { } } - private static void closeQuietly(SailRepository repository) { + private static void closeQuietly(final SailRepository repository) { if (repository != null) { try { repository.shutDown(); - } catch (RepositoryException e) { + } catch (final RepositoryException e) { // quietly absorb this exception } } } - private static void closeQuietly(SailRepositoryConnection conn) { + private static void closeQuietly(final SailRepositoryConnection conn) { if (conn != null) { try { conn.close(); - } catch (RepositoryException e) { + } catch (final RepositoryException e) { // quietly absorb this exception } } @@ -112,7 +110,7 @@ public class EntityDirectExample { - public static void testAddAndDelete(SailRepositoryConnection conn) throws MalformedQueryException, + public static void testAddAndDelete(final SailRepositoryConnection conn) throws MalformedQueryException, RepositoryException, UpdateExecutionException, QueryEvaluationException, TupleQueryResultHandlerException, AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -130,7 +128,7 @@ public class EntityDirectExample { query = "select ?x {GRAPH <http://updated/test> {?x <http://acme.com/actions/likes> \"A new book\" . "// + " ?x <http://acme.com/actions/likes> \"Avocados\" }}"; - CountingResultHandler resultHandler = new CountingResultHandler(); + final CountingResultHandler resultHandler = new CountingResultHandler(); TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query); tupleQuery.evaluate(resultHandler); log.info("Result count : " + resultHandler.getCount()); @@ -160,11 +158,11 @@ public class EntityDirectExample { - private static void testAddAndTemporalSearchWithPCJ(SailRepositoryConnection conn) throws Exception { + private static void testAddAndTemporalSearchWithPCJ(final SailRepositoryConnection conn) throws Exception { // create some resources and literals to make statements out of - String sparqlInsert = "PREFIX pref: <http://www.model/pref#> \n" + final String sparqlInsert = "PREFIX pref: <http://www.model/pref#> \n" + "INSERT DATA {\n" // + "<urn:Bob> a pref:Person ;\n" // + " pref:hasProperty1 'property1' ;\n" // one second @@ -175,7 +173,7 @@ public class EntityDirectExample { + " pref:hasProperty5 'property5' ; \n" // + "}"; - Update update = conn.prepareUpdate(QueryLanguage.SPARQL, sparqlInsert); + final Update update = conn.prepareUpdate(QueryLanguage.SPARQL, sparqlInsert); update.execute(); String queryString = "PREFIX pref: <http://www.model/pref#> \n" // @@ -239,7 +237,7 @@ public class EntityDirectExample { private static Configuration getConf() { - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, USE_MOCK_INSTANCE); conf.set(ConfigUtils.USE_ENTITY, "true"); @@ -277,11 +275,11 @@ public class EntityDirectExample { } @Override - public void startQueryResult(List<String> arg0) throws TupleQueryResultHandlerException { + public void startQueryResult(final List<String> arg0) throws TupleQueryResultHandlerException { } @Override - public void handleSolution(BindingSet arg0) throws TupleQueryResultHandlerException { + public void handleSolution(final BindingSet arg0) throws TupleQueryResultHandlerException { count++; if(!bsSizeSet) { bindingSize = arg0.size(); @@ -295,11 +293,11 @@ public class EntityDirectExample { } @Override - public void handleBoolean(boolean arg0) throws QueryResultHandlerException { + public void handleBoolean(final boolean arg0) throws QueryResultHandlerException { } @Override - public void handleLinks(List<String> arg0) throws QueryResultHandlerException { + public void handleLinks(final List<String> arg0) throws QueryResultHandlerException { } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexingExample/src/main/java/MongoRyaDirectExample.java ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/java/MongoRyaDirectExample.java b/extras/indexingExample/src/main/java/MongoRyaDirectExample.java index 9c746c8..91a31f9 100644 --- a/extras/indexingExample/src/main/java/MongoRyaDirectExample.java +++ b/extras/indexingExample/src/main/java/MongoRyaDirectExample.java @@ -71,7 +71,6 @@ public class MongoRyaDirectExample { log.info("Connecting to Indexing Sail Repository."); Sail sail = RyaSailFactory.getInstance(conf); repository = new SailRepository(sail); - repository.initialize(); conn = repository.getConnection(); long start = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexingExample/src/main/java/RyaClientExample.java ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java new file mode 100644 index 0000000..7d1d3dc --- /dev/null +++ b/extras/indexingExample/src/main/java/RyaClientExample.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.zookeeper.ClientCnxn; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; + +import com.beust.jcommander.internal.Lists; +import com.google.common.io.Files; + +import info.aduna.iteration.CloseableIteration; +import io.fluo.api.client.FluoAdmin; +import io.fluo.api.client.FluoAdmin.AlreadyInitializedException; +import io.fluo.api.client.FluoAdmin.TableExistsException; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.RyaClient; +import mvm.rya.api.client.accumulo.AccumuloConnectionDetails; +import mvm.rya.api.client.accumulo.AccumuloRyaClientFactory; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Demonstrates how a {@link RyaClient} may be used to interact with an instance + * of Accumulo to install and manage a Rya instance. + */ +public class RyaClientExample { + private static final Logger log = Logger.getLogger(RyaClientExample.class); + + public static void main(final String[] args) throws Exception { + setupLogging(); + + final String accumuloUsername = "root"; + final String accumuloPassword = "password"; + + MiniAccumuloCluster cluster = null; + MiniFluo fluo = null; + Sail ryaSail = null; + + try { + // Setup a Mini Accumulo Cluster to host the Rya instance. + log.info("Setting up the Mini Accumulo Cluster used by this example."); + final File miniDataDir = Files.createTempDir(); + final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, accumuloPassword); + cluster = new MiniAccumuloCluster(cfg); + cluster.start(); + + // Setup a Mini Fluo application that will be used to incrementally update the PCJ indicies. + log.info("Setting up the Mini Fluo application used by this example."); + final String fluoAppName = "demoInstance_pcjUpdater"; + fluo = makeMiniFluo(accumuloUsername, accumuloPassword, cluster.getInstanceName(), cluster.getZooKeepers(), fluoAppName); + + // Give the root user the 'U' authorizations. + final Connector connector = cluster.getConnector(accumuloUsername, accumuloPassword); + connector.securityOperations().changeUserAuthorizations("root", new Authorizations("U")); + + // Setup a Rya Client that is able to interact with the mini cluster. + final AccumuloConnectionDetails connectionDetails = + new AccumuloConnectionDetails(accumuloUsername, accumuloPassword.toCharArray(), cluster.getInstanceName(), cluster.getZooKeepers()); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, connector); + + // Install an instance of Rya that has all of the secondary indexers turned on. + final String ryaInstanceName = "demoInstance_"; + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableGeoIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setFluoPcjAppName(fluoAppName) + .build(); + + ryaClient.getInstall().install(ryaInstanceName, installConfig); + + // Add a PCJ index. + final String sparql = + "SELECT ?patron ?employee " + + "WHERE { " + + "?patron <http://talksTo> ?employee. " + + "?employee <http://worksAt> <http://CoffeeShop>. " + + "}"; + + // Load some statements into the Rya instance. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(ryaInstanceName); + conf.set(ConfigUtils.CLOUDBASE_USER, accumuloUsername); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, accumuloPassword); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, cluster.getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, cluster.getZooKeepers()); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, "U"); + conf.set(ConfigUtils.USE_PCJ_FLUO_UPDATER, "true"); + conf.set(ConfigUtils.FLUO_APP_NAME, fluoAppName); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); + + ryaSail = RyaSailFactory.getInstance(conf); + + final ValueFactory vf = ryaSail.getValueFactory(); + final List<Statement> statements = Lists.newArrayList( + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://talksTo"), vf.createURI("http://Charlie")), + vf.createStatement(vf.createURI("http://David"), vf.createURI("http://talksTo"), vf.createURI("http://Alice")), + vf.createStatement(vf.createURI("http://Alice"), vf.createURI("http://worksAt"), vf.createURI("http://CoffeeShop")), + vf.createStatement(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://CoffeeShop")), + vf.createStatement(vf.createURI("http://George"), vf.createURI("http://talksTo"), vf.createURI("http://Frank")), + vf.createStatement(vf.createURI("http://Frank"), vf.createURI("http://worksAt"), vf.createURI("http://CoffeeShop")), + vf.createStatement(vf.createURI("http://Eve"), vf.createURI("http://talksTo"), vf.createURI("http://Bob")), + vf.createStatement(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://CoffeeShop"))); + + SailConnection ryaConn = ryaSail.getConnection(); + log.info(""); + log.info("Loading the following statements:"); + ryaConn.begin(); + for(final Statement statement : statements) { + log.info(" " + statement.toString()); + ryaConn.addStatement(statement.getSubject(), statement.getPredicate(), statement.getObject()); + } + log.info(""); + ryaConn.close(); + fluo.waitForObservers(); + + // Execute the SPARQL query and print the results. + log.info("Executing the following query: "); + prettyLogSparql(sparql); + log.info(""); + + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); + ryaConn = ryaSail.getConnection(); + final CloseableIteration<? extends BindingSet, QueryEvaluationException> result = ryaConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); + + log.info("Results:"); + while(result.hasNext()) { + log.info(" " + result.next()); + } + log.info(""); + + } finally { + if(ryaSail != null) { + log.info("Shutting down the Rya Sail instance."); + ryaSail.shutDown(); + } + + if(fluo != null) { + try { + log.info("Shutting down the Mini Fluo instance."); + fluo.close(); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo instance.", e); + } + } + + if(cluster != null) { + log.info("Sutting down the Mini Accumulo Cluster."); + cluster.stop(); + } + } + } + + private static void setupLogging() { + // Turn off all the loggers and customize how they write to the console. + final Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.OFF); + final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout"); + ca.setLayout(new PatternLayout("%-5p - %m%n")); + + + // Turn the logger used by the demo back on. + log.setLevel(Level.INFO); + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + } + + private static void prettyLogSparql(final String sparql) { + try { + // Pretty print. + final String[] lines = prettyFormatSparql(sparql); + for(final String line : lines) { + log.info(line); + } + } catch (final Exception e) { + // Pretty print failed, so ugly print instead. + log.info(sparql); + } + } + + private static String[] prettyFormatSparql(final String sparql) throws Exception { + final SPARQLParser parser = new SPARQLParser(); + final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer(); + final ParsedQuery pq = parser.parseQuery(sparql, null); + final String prettySparql = renderer.render(pq); + return StringUtils.split(prettySparql, '\n'); + } + + private static MiniFluo makeMiniFluo(final String username, final String password, final String instanceName, final String zookeepers, final String fluoAppName) throws AlreadyInitializedException, TableExistsException { + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverConfiguration> observers = new ArrayList<>(); + observers.add(new ObserverConfiguration(TripleObserver.class.getName())); + observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName())); + observers.add(new ObserverConfiguration(JoinObserver.class.getName())); + observers.add(new ObserverConfiguration(FilterObserver.class.getName())); + + // Provide export parameters child test classes may provide to the + // export observer. + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration( + QueryResultObserver.class.getName()); + + final HashMap<String, String> params = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(params); + ryaParams.setExportToRya(true); + ryaParams.setAccumuloInstanceName(instanceName); + ryaParams.setZookeeperServers(zookeepers); + ryaParams.setExporterUsername(username); + ryaParams.setExporterPassword(password); + ryaParams.setRyaInstanceName(fluoAppName); + + exportObserverConfig.setParameters(params); + observers.add(exportObserverConfig); + + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setMiniStartAccumulo(false); + config.setAccumuloInstance(instanceName); + config.setAccumuloUser(username); + config.setAccumuloPassword(password); + config.setInstanceZookeepers(zookeepers + "/fluo"); + config.setAccumuloZookeepers(zookeepers); + + config.setApplicationName(fluoAppName); + config.setAccumuloTable("fluo" + fluoAppName); + + config.addObservers(observers); + + FluoFactory.newAdmin(config).initialize( + new FluoAdmin.InitOpts().setClearTable(true).setClearZookeeper(true) ); + return FluoFactory.newMiniFluo(config); + } +} \ No newline at end of file
