RYA-151 Rya Query benchmark tool implemented using JMH.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e77e839d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e77e839d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e77e839d Branch: refs/heads/master Commit: e77e839d76e140bee47eb610cf19c453ef0a79b5 Parents: 62c0794 Author: Kevin Chilton <[email protected]> Authored: Wed Aug 17 22:29:41 2016 -0400 Committer: pujav65 <[email protected]> Committed: Tue Sep 27 11:05:06 2016 -0400 ---------------------------------------------------------------------- .../java/mvm/rya/api/client/BatchUpdatePCJ.java | 22 ++ .../api/client/PCJDoesNotExistException.java | 20 ++ .../main/java/mvm/rya/api/client/RyaClient.java | 13 +- .../api/instance/RyaDetailsToConfiguration.java | 72 ++-- .../java/mvm/rya/accumulo/AccumuloITBase.java | 8 +- .../client/accumulo/AccumuloBatchUpdatePCJ.java | 226 +++++++++++++ .../api/client/accumulo/AccumuloCreatePCJ.java | 64 ++-- .../accumulo/AccumuloRyaClientFactory.java | 1 + .../PrecomputedJoinStorageSupplier.java | 3 +- .../accumulo/AccumuloBatchUpdatePCJIT.java | 135 ++++++++ extras/pom.xml | 7 +- extras/rya.benchmark/pom.xml | 250 ++++++++++++++ .../query/QueriesBenchmarkConfReader.java | 83 +++++ .../rya/benchmark/query/QueryBenchmark.java | 336 +++++++++++++++++++ .../src/main/resources/LICENSE.txt | 16 + .../src/main/xsd/queries-benchmark-conf.xsd | 74 ++++ .../query/QueriesBenchmarkConfReaderIT.java | 105 ++++++ .../benchmark/query/QueryBenchmarkRunIT.java | 196 +++++++++++ extras/rya.console/pom.xml | 17 +- .../java/mvm/rya/shell/RyaAdminCommands.java | 72 ++-- .../java/mvm/rya/shell/util/InstallPrompt.java | 16 +- .../mvm/rya/shell/RyaAdminCommandsTest.java | 37 +- 22 files changed, 1672 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java new file mode 100644 index 0000000..20d90e0 --- /dev/null +++ b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java @@ -0,0 +1,22 @@ +package mvm.rya.api.client; + +import javax.annotation.ParametersAreNonnullByDefault; + +/** + * Batch update a PCJ index. + */ +@ParametersAreNonnullByDefault +public interface BatchUpdatePCJ { + + /** + * Batch update a specific PCJ index using the {@link Statement}s that are + * currently in the Rya instance. + * + * @param ryaInstanceName - The Rya instance whose PCJ will be updated. (not null) + * @param pcjId - Identifies the PCJ index to update. (not null) + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws PCJDoesNotExistException No PCJ exists for the provided PCJ ID. + * @throws RyaClientException Something caused the command to fail. + */ + public void batchUpdate(String ryaInstanceName, String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java new file mode 100644 index 0000000..63efe0c --- /dev/null +++ b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java @@ -0,0 +1,20 @@ +package mvm.rya.api.client; + +import javax.annotation.ParametersAreNonnullByDefault; + +/** + * One of the {@link RyaClient} commands could not execute because the connected + * instance of Rya does not have a PCJ matching the provided PCJ ID. + */ +@ParametersAreNonnullByDefault +public class PCJDoesNotExistException extends RyaClientException { + private static final long serialVersionUID = 1L; + + public PCJDoesNotExistException(final String message) { + super(message); + } + + public PCJDoesNotExistException(final String message, final Throwable cause) { + super(message, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java index 173e1e0..851a273 100644 --- a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java +++ b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java @@ -33,23 +33,26 @@ public class RyaClient { private final Install install; private final CreatePCJ createPcj; private final DeletePCJ deletePcj; + private final BatchUpdatePCJ batchUpdatePcj; private final GetInstanceDetails getInstanceDetails; private final InstanceExists instanceExists; private final ListInstances listInstances; /** - * Constructs an isntance of {@link RyaClient}. + * Constructs an instance of {@link RyaClient}. */ public RyaClient( final Install install, final CreatePCJ createPcj, final DeletePCJ deletePcj, + final BatchUpdatePCJ batchUpdatePcj, final GetInstanceDetails getInstanceDetails, final InstanceExists instanceExists, final ListInstances listInstances) { this.install = requireNonNull(install); this.createPcj = requireNonNull(createPcj); this.deletePcj = requireNonNull(deletePcj); + this.batchUpdatePcj = requireNonNull(batchUpdatePcj); this.getInstanceDetails = requireNonNull(getInstanceDetails); this.instanceExists = requireNonNull(instanceExists); this.listInstances = requireNonNull(listInstances); @@ -79,6 +82,14 @@ public class RyaClient { } /** + * @return An instnace of {@link BatchUpdatePCJ} that is connected to a Rya storage + * if the Rya instance supports PCJ indexing. + */ + public BatchUpdatePCJ getBatchUpdatePCJ() { + return batchUpdatePcj; + } + + /** * @return An instance of {@link GetInstanceDetails} that is connected to a Rya storage. */ public GetInstanceDetails getGetInstanceDetails() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java index faec0ff..8734adc 100644 --- a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java +++ b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java @@ -18,29 +18,34 @@ */ package mvm.rya.api.instance; +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; /** * Used to fetch {@link RyaDetails} from a {@link RyaDetailsRepository} and * add them to the application's {@link Configuration}. */ +@ParametersAreNonnullByDefault public class RyaDetailsToConfiguration { - private static final Logger LOG = Logger.getLogger(RyaDetailsToConfiguration.class); + private static final Logger log = Logger.getLogger(RyaDetailsToConfiguration.class); + /** * Ensures the values in the {@link Configuration} do not conflict with the values in {@link RyaDetails}. * If they do, the values in {@link RyaDetails} take precedent and the {@link Configuration} value will * be overwritten. * - * @param details - The {@link RyaDetails} to add to the {@link Configuration}. - * @param conf - The {@link Configuration} to add {@link RyaDetails} to. + * @param details - The {@link RyaDetails} to add to the {@link Configuration}. (not null) + * @param conf - The {@link Configuration} to add {@link RyaDetails} to. (not null) */ public static void addRyaDetailsToConfiguration(final RyaDetails details, final Configuration conf) { - Preconditions.checkNotNull(details); - Preconditions.checkNotNull(conf); + requireNonNull(details); + requireNonNull(conf); checkAndSet(conf, ConfigurationFields.USE_ENTITY, details.getEntityCentricIndexDetails().isEnabled()); checkAndSet(conf, ConfigurationFields.USE_FREETEXT, details.getFreeTextIndexDetails().isEnabled()); @@ -50,23 +55,44 @@ public class RyaDetailsToConfiguration { } /** - * Checks to see if the configuration has a value in the specified field. - * If the value exists and does not match what is expected by the {@link RyaDetails}, - * an error will be logged and the value will be overwritten. - * @param conf - The {@link Configuration} to potentially change. - * @param field - The field to check and set. - * @param value - The new value in the field (is not used if the value doesn't need to be changed). + * Ensures a Rya Client will not try to use a secondary index that is not not supported by the Rya Instance + * it is connecting to. + * </p> + * If the configuration... + * <ul> + * <li>provides an 'on' value for an index that is supported, then nothing changes.</li> + * <li>provides an 'off' value for an index that is or is not supported, then nothing changes.</li> + * <li>provides an 'on' value for an index that is not supported, then the index is turned + * off and a warning is logged.</li> + * <li>does not provide any value for an index, then it will be turned on if supported.</li> + * </ul> + * + * @param conf - The {@link Configuration} to potentially change. (not null) + * @param useIndexField - The field within {@code conf} that indicates if the client will utilize the index. (not null) + * @param indexSupported - {@code true} if the Rya Instance supports the index; otherwise {@code false}. */ - private static void checkAndSet(final Configuration conf, final String field, final boolean value) { - final Optional<String> opt = Optional.fromNullable(conf.get(field)); - if(opt.isPresent()) { - final Boolean curVal = new Boolean(opt.get()); - if(curVal != value) { - LOG.error("The user configured value in: " + field + " will be overwritten by what has been configured by the admin."); - conf.setBoolean(field, value); - } - } else { - conf.setBoolean(field, value); + private static void checkAndSet(final Configuration conf, final String useIndexField, final boolean indexSupported) { + requireNonNull(conf); + requireNonNull(useIndexField); + + final Optional<String> useIndexStr = Optional.fromNullable( conf.get(useIndexField) ); + + // If no value was provided, default to using the index if it is supported. + if(!useIndexStr.isPresent()) { + log.info("No Rya Client configuration was provided for the " + useIndexField + + " index, so it is being defaulted to " + indexSupported); + conf.setBoolean(useIndexField, indexSupported); + return; + } + + // If configured to use the index, but the Rya Instance does not support it, then turn it off. + final boolean useIndex = Boolean.parseBoolean( useIndexStr.get() ); + if(useIndex && !indexSupported) { + log.warn("The Rya Client indicates it wants to use a secondary index that the Rya Instance does not support. " + + "This is not allowed, so the index will be turned off. Index Configuration Field: " + useIndexField); + conf.setBoolean(useIndexField, false); } + + // Otherwise use whatever the Client wants to use. } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java index 2a1c384..7dd23e6 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java @@ -95,11 +95,9 @@ public class AccumuloITBase { } /** - * TODO doc - * - * @return - * @throws AccumuloSecurityException - * @throws AccumuloException + * @return A {@link Connector} that creates connections to the mini accumulo cluster. + * @throws AccumuloException Could not connect to the cluster. + * @throws AccumuloSecurityException Could not connect to the cluster because of a security violation. */ public Connector getConnector() throws AccumuloException, AccumuloSecurityException { return cluster.getConnector(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java new file mode 100644 index 0000000..ee773b0 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java @@ -0,0 +1,226 @@ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.BatchUpdatePCJ; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.PCJDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.instance.RyaDetailsUpdater; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Uses an in memory Rya Client to batch update a PCJ index. + */ +public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ { + + private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class); + + /** + * Constructs an instance of {@link AccumuloBatchUpdatePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { + requireNonNull(ryaInstanceName); + requireNonNull(pcjId); + verifyPCJState(ryaInstanceName, pcjId); + updatePCJResults(ryaInstanceName, pcjId); + updatePCJMetadata(ryaInstanceName, pcjId); + } + + private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException { + try { + // Fetch the Rya instance's details. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); + final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails(); + + // Ensure PCJs are enabled. + if(!ryaDetails.getPCJIndexDetails().isEnabled()) { + throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'."); + } + + // Ensure the PCJ exists. + if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) { + throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'."); + } + + // Ensure the PCJ is not already being incrementally updated. + final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy(); + if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) { + throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally."); + } + } catch(final NotInitializedException e) { + throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e); + } + } + + private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { + // Things that have to be closed before we exit. + Sail sail = null; + SailConnection sailConn = null; + CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null; + + try { + // Create an instance of Sail backed by the Rya instance. + sail = connectToRya(ryaInstanceName); + + // Purge the old results from the PCJ. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName); + try { + pcjStorage.purge(pcjId); + } catch (final PCJStorageException e) { + throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " + + "results could not be purged from it.", e); + } + + try { + // Parse the PCJ's SPARQL query. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = metadata.getSparql(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + + // Execute the query. + sailConn = sail.getConnection(); + results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); + + // Load the results into the PCJ table. + final List<VisibilityBindingSet> batch = new ArrayList<>(1000); + + while(results.hasNext()) { + final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), ""); + batch.add(result); + + if(batch.size() == 1000) { + pcjStorage.addResults(pcjId, batch); + batch.clear(); + } + } + + if(!batch.isEmpty()) { + pcjStorage.addResults(pcjId, batch); + batch.clear(); + } + } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) { + throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e); + } + } finally { + if(results != null) { + try { + results.close(); + } catch (final QueryEvaluationException e) { + log.warn(e.getMessage(), e); + } + } + + if(sailConn != null) { + try { + sailConn.close(); + } catch (final SailException e) { + log.warn(e.getMessage(), e); + } + } + + if(sail != null) { + try { + sail.shutDown(); + } catch (final SailException e) { + log.warn(e.getMessage(), e); + } + } + } + } + + private Sail connectToRya(final String ryaInstanceName) throws RyaClientException { + try { + final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails(); + + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix(ryaInstanceName); + ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); + ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); + ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); + ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); + + // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results. + ryaConf.set(ConfigUtils.USE_PCJ, "false"); + + return RyaSailFactory.getInstance(ryaConf); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e); + } + } + + private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException { + // Update the PCJ's metadata to indicate it was just batch updated. + try { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); + + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they were batch updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.BATCH ) + .setLastUpdateTime( new Date()); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Could not update the PCJ's metadata.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java index 92b5d8c..30be548 100644 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -92,13 +92,6 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); } - final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); - final boolean usingFluo = fluoDetailsHolder.isPresent(); - if(!usingFluo) { - throw new RyaClientException( String.format("Can not create a PCJ for the '%s' instance of Rya because it does" + - "not have a Fluo application associated with it. Update the instance's PCJ Index Details to fix this problem.", instanceName) ); - } - // Create the PCJ table that will receive the index results. final String pcjId; final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName); @@ -108,33 +101,36 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { throw new RyaClientException("Problem while initializing the PCJ table.", e); } - // Task the Fluo application with updating the PCJ. - final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); - try { - updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); - } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { - throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); - } - - // Update the Rya Details to indicate the PCJ is being updated incrementally. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); - try { - new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { - @Override - public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { - // Update the original PCJ Details to indicate they are incrementally updated. - final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); - final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) - .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); - - // Replace the old PCJ Details with the updated ones. - final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); - builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); - return builder.build(); - } - }); - } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { - throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); + // If a Fluo application is being used, task it with updating the PCJ. + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + if(fluoDetailsHolder.isPresent()) { + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } + + // Update the Rya Details to indicate the PCJ is being updated incrementally. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they are incrementally updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); + } } // Return the ID that was assigned to the PCJ. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java index 8c276a8..102f667 100644 --- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -52,6 +52,7 @@ public class AccumuloRyaClientFactory { new AccumuloInstall(connectionDetails, connector), new AccumuloCreatePCJ(connectionDetails, connector), new AccumuloDeletePCJ(connectionDetails, connector), + new AccumuloBatchUpdatePCJ(connectionDetails, connector), new AccumuloGetInstanceDetails(connectionDetails, connector), new AccumuloInstanceExists(connectionDetails, connector), new AccumuloListInstances(connectionDetails, connector)); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java index bf10c84..7c48315 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java @@ -27,6 +27,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import cern.colt.Arrays; import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; @@ -64,7 +65,7 @@ public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinS // Ensure the storage type has been set. final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType(); checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE + - "' property must have one of the following values: " + PrecomputedJoinStorageType.values()); + "' property must have one of the following values: " + Arrays.toString(PrecomputedJoinStorageType.values())); // Create and return the configured storage. switch(storageType.get()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java new file mode 100644 index 0000000..f23f1c4 --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java @@ -0,0 +1,135 @@ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; + +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.RyaClient; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Integration tests the methods of {@link AccumuloBatchUpdatePCJ}. + */ +public class AccumuloBatchUpdatePCJIT extends AccumuloITBase { + + private static final String RYA_INSTANCE_NAME = "test_"; + + @Test + public void batchUpdate() throws Exception { + // Setup a Rya Client. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + super.getUsername(), + super.getPassword().toCharArray(), + super.getInstanceName(), + super.getZookeepers()); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + + // Install an instance of Rya on the mini accumulo cluster. + ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder() + .setEnablePcjIndex(true) + .build()); + + Sail sail = null; + try { + // Get a Sail connection backed by the installed Rya instance. + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix(RYA_INSTANCE_NAME); + ryaConf.set(ConfigUtils.CLOUDBASE_USER, super.getUsername()); + ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, super.getPassword()); + ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getZookeepers()); + ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getInstanceName()); + ryaConf.set(ConfigUtils.USE_PCJ, "true"); + ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString()); + ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString()); + sail = RyaSailFactory.getInstance( ryaConf ); + + // Load some statements into the Rya instance. + final ValueFactory vf = sail.getValueFactory(); + + final SailConnection sailConn = sail.getConnection(); + sailConn.begin(); + sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:likes"), vf.createURI("urn:icecream")); + + sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue")); + sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:green")); + sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:brown")); + sailConn.commit(); + sailConn.close(); + + // Create a PCJ for a SPARQL query. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME); + final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Run the test. + ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId); + + // Verify the correct results were loaded into the PCJ table. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:Alice")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:Bob")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:Charlie")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:David")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:Eve")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", vf.createURI("urn:Frank")); + expectedResults.add(bs); + + final Set<BindingSet> results = new HashSet<>(); + for(final BindingSet result : pcjStorage.listResults(pcjId)) { + results.add( result ); + } + + assertEquals(expectedResults, results); + + } finally { + if(sail != null) { + sail.shutDown(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index 9568e27..0ffeb7a 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -1,5 +1,4 @@ -<?xml version="1.0" encoding="utf-8"?> - +<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -18,7 +17,6 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> - <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -44,5 +42,6 @@ under the License. <module>vagrantExample</module> <module>rya.pcj.fluo</module> <module>rya.merger</module> + <module>rya.benchmark</module> </modules> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml new file mode 100644 index 0000000..5b9eb68 --- /dev/null +++ b/extras/rya.benchmark/pom.xml @@ -0,0 +1,250 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>rya.extras</artifactId> + <groupId>org.apache.rya</groupId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.benchmark</artifactId> + + <name>Apache Rya Benchmarks</name> + + <dependencies> + <!-- JMH Benchmark Framework dependencies --> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-core</artifactId> + <version>${jmh.version}</version> + </dependency> + <dependency> + <groupId>org.openjdk.jmh</groupId> + <artifactId>jmh-generator-annprocess</artifactId> + <version>${jmh.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>3.0.1</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-minicluster</artifactId> + <version>${accumulo.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <!-- + JMH version to use with this project. + --> + <jmh.version>1.13</jmh.version> + + <!-- + Java source/target to use for compilation. + --> + <javac.target>1.8</javac.target> + + <!-- + Name of the benchmark Uber-JAR to generate. + --> + <uberjar.name>benchmarks</uberjar.name> + </properties> + + <build> + <resources> + <resource> + <directory>src/main/xsd</directory> + </resource> + </resources> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerVersion>${javac.target}</compilerVersion> + <source>${javac.target}</source> + <target>${javac.target}</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>jaxb2-maven-plugin</artifactId> + <executions> + <execution> + <id>xjc</id> + <goals> + <goal>xjc</goal> + </goals> + </execution> + </executions> + <configuration> + <!-- Place the generated source within the 'src' directory so license-maven-plugin will find it. --> + <outputDirectory>src/main/gen</outputDirectory> + <packageName>org.apache.rya.benchmark.query</packageName> + </configuration> + </plugin> + + <!-- Automatically place Apache 2 license headers at the top of all of the project's Java files. + Rat runs during the 'validate' lifecycle step, so it will fail the build before this one + executes if any of the headers are missing. Run the build with rat turned off to add + missing headers to the Java files. --> + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <version>2.6</version> + <configuration> + <!-- We use a custome Apache 2.0 license because we do not include a copywrite section. --> + <header>src/main/resources/LICENSE.txt</header> + </configuration> + <executions> + <execution> + <phase>process-sources</phase> + <goals> + <goal>format</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.2</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${uberjar.name}</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.openjdk.jmh.Main</mainClass> + </transformer> + </transformers> + <filters> + <filter> + <!-- + Shading signed JARs will fail without this. + http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar + --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.19.1</version> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + <pluginManagement> + <plugins> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <version>2.5</version> + </plugin> + <plugin> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.8.1</version> + </plugin> + <plugin> + <artifactId>maven-install-plugin</artifactId> + <version>2.5.1</version> + </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.3</version> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.17</version> + </plugin> + </plugins> + </pluginManagement> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java new file mode 100644 index 0000000..8cbf203 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.query; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + +import org.xml.sax.SAXException; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +/** + * Unmarshalls instances of {@link QueriesBenchmarkConf}. + */ +@ParametersAreNonnullByDefault +public final class QueriesBenchmarkConfReader { + + // It is assumed the schema file is held within the root directory of the packaged jar. + private static final String SCHEMA_LOCATION = "queries-benchmark-conf.xsd"; + + // Only load the Schema once. + private static final Supplier<Schema> SCHEMA_SUPPLIER = Suppliers.memoize( + new Supplier<Schema>() { + @Override + public Schema get() { + final InputStream schemaStream = ClassLoader.getSystemResourceAsStream(SCHEMA_LOCATION); + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + try { + return schemaFactory.newSchema( new StreamSource( schemaStream ) ); + } catch (final SAXException e) { + throw new RuntimeException("Could not load the '" + SCHEMA_LOCATION + "' schema file. Make sure it is on the classpath.", e); + } + } + }); + + /** + * Unmarshall an instance of {@link QueriesBenchmarkConf} from the XML that + * is retrieved from an {@link InputStream}. + * + * @param xmlStream - The input stream holding the XML. (not null) + * @return The {@link BenchmarkQueries} instance that was read from the stream. + * @throws JAXBException There was a problem with the formatting of the XML. + */ + public QueriesBenchmarkConf load(final InputStream xmlStream) throws JAXBException { + requireNonNull(xmlStream); + + // Load the schema that describes the stream. + final Schema schema = SCHEMA_SUPPLIER.get(); + + // Unmarshal the object from the stream. + final JAXBContext context = JAXBContext.newInstance( QueriesBenchmarkConf.class ); + final Unmarshaller unmarshaller = context.createUnmarshaller(); + unmarshaller.setSchema(schema); + return (QueriesBenchmarkConf) unmarshaller.unmarshal(xmlStream); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java new file mode 100644 index 0000000..404f183 --- /dev/null +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.query; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.rya.benchmark.query.Parameters.NumReadsRuns; +import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException; +import org.apache.rya.benchmark.query.Rya.Accumulo; +import org.apache.rya.benchmark.query.Rya.SecondaryIndexing; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Timeout; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.CommandLineOptions; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * A benchmark that may be used to evaluate the performance of SPARQL queries + * over a living instance of Rya. It pivots over two dimensions: + * <ul> + * <li>Which SPARQL query to execute</li> + * <li>How many of the query's results to read</li> + * </ul> + * </p> + * These parameters are configured by placing a file named "queries-benchmark-conf.xml" + * within the directory the benchmark is being executed from. The schema that defines + * this XML file is named "queries-benchmark-conf.xsd" and may be found embedded within + * the benchmark's jar file. + * </p> + * To execute this benchmark, build the project by executing: + * <pre> + * mvn clean install + * </pre> + * Transport the "target/benchmarking.jar" file to the system that will execute + * the benchmark, write the configuration file, and then execute: + * <pre> + * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark + * </pre> + */ +@State(Scope.Thread) +public class QueryBenchmark { + + /** + * The path to the configuration file that this benchmark uses to connect to Rya. + */ + public static final Path QUERY_BENCHMARK_CONFIGURATION_FILE = Paths.get("queries-benchmark-conf.xml"); + + /** + * Indicates all query results will be read during the benchmark. + */ + public static final String READ_ALL = "ALL"; + + @Param({"1", "10", "100", READ_ALL}) + public String numReads; + + @Param({}) + public String sparql; + + private Sail sail = null; + private SailConnection sailConn = null; + + @Setup + public void setup() throws Exception { + // Setup logging. + final ConsoleAppender console = new ConsoleAppender(); + console.setLayout(new PatternLayout("%d [%p|%c|%C{1}] %m%n")); + console.setThreshold(Level.INFO); + console.activateOptions(); + Logger.getRootLogger().addAppender(console); + + // Load the benchmark's configuration file. + final InputStream queriesStream = Files.newInputStream(QUERY_BENCHMARK_CONFIGURATION_FILE); + final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream); + + // Create the Rya Configuration object using the benchmark's configuration. + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + + final Rya rya = benchmarkConf.getRya(); + ryaConf.setTablePrefix(rya.getRyaInstanceName()); + + final Accumulo accumulo = rya.getAccumulo(); + ryaConf.set(ConfigUtils.CLOUDBASE_USER, accumulo.getUsername()); + ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, accumulo.getPassword()); + ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, accumulo.getZookeepers()); + ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName()); + + // Print the query plan so that you can visually inspect how PCJs are being applied for each benchmark. + ryaConf.set(ConfigUtils.DISPLAY_QUERY_PLAN, "true"); + + // Turn on PCJs if we are configured to use them. + final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing(); + if(secondaryIndexing.isUsePCJ()) { + ryaConf.set(ConfigUtils.USE_PCJ, "true"); + ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString()); + ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString()); + } else { + ryaConf.set(ConfigUtils.USE_PCJ, "false"); + } + + // Create the connections used to execute the benchmark. + sail = RyaSailFactory.getInstance( ryaConf ); + sailConn = sail.getConnection(); + } + + @TearDown + public void tearDown() { + try { + sailConn.close(); + } catch(final Exception e) { } + + try { + sail.shutDown(); + } catch(final Exception e) { } + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @Timeout(time = 1, timeUnit = TimeUnit.HOURS) + public void queryRya() throws MalformedQueryException, QueryEvaluationException, SailException, NotEnoughResultsException { + final QueryBenchmarkRun benchmark; + + if(numReads.equals( READ_ALL )) { + benchmark = new QueryBenchmarkRun(sailConn, sparql); + } else { + benchmark = new QueryBenchmarkRun(sailConn, sparql, Long.parseLong(numReads)); + } + + benchmark.run(); + } + + /** + * Runs the query benchmarks. + * </p> + * Example command line: + * <pre> + * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark + * </pre> + * + * @param args - The command line arguments that will be fed into the benchmark. + * @throws Exception The benchmark could not be run. + */ + public static void main(final String[] args) throws Exception { + // Read the queries that will be benchmarked from the provided path. + final InputStream queriesStream = Files.newInputStream( QUERY_BENCHMARK_CONFIGURATION_FILE ); + final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream); + final Parameters parameters = benchmarkConf.getParameters(); + + // Setup the options that will be used to run the benchmark. + final OptionsBuilder options = new OptionsBuilder(); + options.parent( new CommandLineOptions(args) ); + options.include(QueryBenchmark.class.getSimpleName()); + + // Provide the SPARQL queries that will be injected into the benchmark's 'sparql' parameter. + final List<String> sparql = parameters.getQueries().getSPARQL(); + final String[] sparqlArray = new String[ sparql.size() ]; + sparql.toArray( sparqlArray ); + + // Clean up the sparql's whitespace. + for(int i = 0; i < sparqlArray.length; i++) { + sparqlArray[i] = sparqlArray[i].trim(); + } + + options.param("sparql", sparqlArray); + + // If numReadsRuns was specified, inject them into the benchmark's 'numReads' parameter. + final NumReadsRuns numReadsRuns = parameters.getNumReadsRuns(); + if(numReadsRuns != null) { + // Validate the list. + final List<String> numReadsList = numReadsRuns.getNumReads(); + for(final String numReads : numReadsList) { + // It may be the READ_ALL flag. + if(numReads.equals(READ_ALL)) { + continue; + } + + // Or it must be a Long. + try { + Long.parseLong(numReads); + } catch(final NumberFormatException e) { + throw new RuntimeException("There is a problem with the benchmark's configuration. Encountered " + + "a numReads value of '" + numReads + "', which is inavlid. The value must be a Long or " + + "'" + READ_ALL + "'"); + } + } + + // Configure the benchmark with the numRuns. + final String[] numReadsArray = new String[ numReadsList.size() ]; + numReadsList.toArray( numReadsArray ); + options.param("numReads", numReadsArray); + } + + // Execute the benchmark. + new Runner(options.build()).run(); + } + + /** + * Executes an iteration of the benchmarked logic. + */ + @ParametersAreNonnullByDefault + public static final class QueryBenchmarkRun { + + private final SailConnection sailConn; + private final String sparql; + private final Optional<Long> numReads; + + /** + * Constructs an instance of {@link QueryBenchmarkRun} that will read all of the results of the query. + * + * @param sailConn - The connection to the Rya instance the query will be executed against. (not null) + * @param sparql - The query that will be executed. (not null) + */ + public QueryBenchmarkRun(final SailConnection sailConn, final String sparql) { + this.sailConn = requireNonNull(sailConn); + this.sparql = requireNonNull(sparql); + this.numReads = Optional.empty(); + } + + /** + * Constructs an instance of {@link QueryBenchmarkRun} that will only read a specific number of results. + * + * @param sailConn - The connection to the Rya instance the query will be executed against. (not null) + * @param sparql - The query that will be executed. (not null) + * @param numReads - The number of results that will be read. (not null) + */ + public QueryBenchmarkRun(final SailConnection sailConn, final String sparql, final Long numReads) { + this.sailConn = requireNonNull(sailConn); + this.sparql = requireNonNull(sparql); + this.numReads = Optional.of( requireNonNull(numReads) ); + } + + public void run() throws MalformedQueryException, QueryEvaluationException, NotEnoughResultsException, SailException { + CloseableIteration<? extends BindingSet, QueryEvaluationException> it = null; + + try { + // Execute the query. + final SPARQLParser sparqlParser = new SPARQLParser(); + final ParsedQuery parsedQuery = sparqlParser.parseQuery(sparql, null); + it = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); + + // Perform the reads. + if(numReads.isPresent()) { + read(it, numReads.get() ); + } else { + readAll(it); + } + } finally { + if(it != null) { + it.close(); + } + } + } + + private void read(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it, final long numReads) throws QueryEvaluationException, NotEnoughResultsException { + requireNonNull(it); + long i = 0; + while(i < numReads) { + if(!it.hasNext()) { + throw new NotEnoughResultsException(String.format("The SPARQL query did not result in enough results. Needed: %d Found: %d", numReads, i)); + } + it.next(); + i++; + } + } + + private void readAll(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it) throws QueryEvaluationException { + requireNonNull(it); + while(it.hasNext()) { + it.next(); + } + } + + /** + * The benchmark must read a specific number of results, but the benchmarked query + * does not have enough results to meet that number. + */ + public static final class NotEnoughResultsException extends Exception { + private static final long serialVersionUID = 1L; + + public NotEnoughResultsException(final String message) { + super(message); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/resources/LICENSE.txt ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/resources/LICENSE.txt b/extras/rya.benchmark/src/main/resources/LICENSE.txt new file mode 100644 index 0000000..4a9fe83 --- /dev/null +++ b/extras/rya.benchmark/src/main/resources/LICENSE.txt @@ -0,0 +1,16 @@ +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd new file mode 100644 index 0000000..826083e --- /dev/null +++ b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"> + + <xsd:element name="QueriesBenchmarkConf"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="Rya" type="Rya"/> + <xsd:element name="Parameters" type="Parameters"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + + <xsd:complexType name="Rya"> + <xsd:sequence> + <xsd:element name="ryaInstanceName" type="xsd:string" /> + <xsd:element name="accumulo"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="username" type="xsd:string"/> + <xsd:element name="password" type="xsd:string"/> + <xsd:element name="zookeepers" type="xsd:string"/> + <xsd:element name="instanceName" type="xsd:string"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + <xsd:element name="secondaryIndexing"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="usePCJ" type="xsd:boolean"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + </xsd:sequence> + </xsd:complexType> + + <xsd:complexType name="Parameters"> + <xsd:sequence> + <xsd:element name="NumReadsRuns"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="NumReads" type="xsd:string" maxOccurs="unbounded"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + <xsd:element name="Queries" minOccurs="0"> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="SPARQL" type="xsd:string" maxOccurs="unbounded"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> + </xsd:sequence> + </xsd:complexType> +</xsd:schema> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java new file mode 100644 index 0000000..f229dc4 --- /dev/null +++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.benchmark.query; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; + +import javax.xml.bind.JAXBException; + +import org.apache.rya.benchmark.query.Parameters.NumReadsRuns; +import org.apache.rya.benchmark.query.Parameters.Queries; +import org.apache.rya.benchmark.query.Rya.Accumulo; +import org.apache.rya.benchmark.query.Rya.SecondaryIndexing; +import org.junit.Test; +import org.xml.sax.SAXException; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + +/** + * Tests the methods of {@link BenchmarkQueriesReader}. + */ +public class QueriesBenchmarkConfReaderIT { + + @Test + public void load() throws JAXBException, SAXException { + // Unmarshal some XML. + final String xml = + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + + "<QueriesBenchmarkConf>\n" + + " <Rya>\n" + + " <ryaInstanceName>test_</ryaInstanceName>\n" + + " <accumulo>\n" + + " <username>test</username>\n" + + " <password>t3stP@ssw0rd</password>\n" + + " <zookeepers>zoo-server-1,zoo-server-2</zookeepers>\n" + + " <instanceName>testInstance</instanceName>\n" + + " </accumulo>\n" + + " <secondaryIndexing>\n" + + " <usePCJ>true</usePCJ>\n" + + " </secondaryIndexing>\n" + + " </Rya>\n" + + " <Parameters>" + + " <NumReadsRuns>" + + " <NumReads>1</NumReads>" + + " <NumReads>10</NumReads>" + + " <NumReads>100</NumReads>" + + " <NumReads>ALL</NumReads>" + + " </NumReadsRuns>" + + " <Queries>\n" + + " <SPARQL><![CDATA[SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }]]></SPARQL>\n" + + " <SPARQL><![CDATA[SELECT ?a ?b WHERE { ?a <http://knows> ?b . }]]></SPARQL>\n" + + " </Queries>\n" + + " </Parameters>" + + "</QueriesBenchmarkConf>"; + + final InputStream xmlStream = new ByteArrayInputStream( xml.getBytes(Charsets.UTF_8) ); + final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load( xmlStream ); + + // Ensure it was unmarshalled correctly. + final Rya rya = benchmarkConf.getRya(); + assertEquals("test_", rya.getRyaInstanceName()); + + final Accumulo accumulo = rya.getAccumulo(); + assertEquals("test", accumulo.getUsername()); + assertEquals("t3stP@ssw0rd", accumulo.getPassword()); + assertEquals("zoo-server-1,zoo-server-2", accumulo.getZookeepers()); + assertEquals("testInstance", accumulo.getInstanceName()); + + final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing(); + assertTrue(secondaryIndexing.isUsePCJ()); + + + final Parameters parameters = benchmarkConf.getParameters(); + final List<String> expectedNumReads = Lists.newArrayList("1", "10", "100", "ALL"); + final NumReadsRuns NumReads = parameters.getNumReadsRuns(); + assertEquals(expectedNumReads, NumReads.getNumReads()); + + final List<String> expectedQueries = Lists.newArrayList( + "SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }", + "SELECT ?a ?b WHERE { ?a <http://knows> ?b . }"); + final Queries queries = parameters.getQueries(); + assertEquals(expectedQueries, queries.getSPARQL()); + } +} \ No newline at end of file
