http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig deleted file mode 100644 index 9311200..0000000 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig +++ /dev/null @@ -1,529 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.accumulo; - -import static java.util.Objects.requireNonNull; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.utils.ConnectorFactory; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.instance.RyaDetails; -import org.apache.rya.indexing.FilterFunctionOptimizer; -import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; -import org.apache.rya.indexing.accumulo.entity.EntityOptimizer; -import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer; -import org.apache.rya.indexing.accumulo.freetext.Tokenizer; -import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import org.apache.rya.indexing.entity.EntityIndexOptimizer; -import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer; -import org.apache.rya.indexing.external.PrecomputedJoinIndexer; -import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; -import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer; -import org.apache.rya.indexing.pcj.matching.PCJOptimizer; -import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - -/** - * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. - * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. - * New code must separate parameters that are set at Rya install time from that which is specific to the client. - * Also Accumulo index tables are pushed down to the implementation and not configured in conf. - */ -public class ConfigUtils { - private static final Logger logger = Logger.getLogger(ConfigUtils.class); - - /** - * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead. - */ - @Deprecated - public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX; - - /** - * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead. - */ - @Deprecated - public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE; - - /** - * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead. - */ - @Deprecated - public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS; - - /** - * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead. - */ - @Deprecated - public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER; - - /** - * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead. - */ - @Deprecated - public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD; - /** - * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead. - */ - @Deprecated - public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH; - - public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads"; - public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency"; - public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory"; - - public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit"; - - public static final String USE_FREETEXT = "sc.use_freetext"; - public static final String USE_TEMPORAL = "sc.use_temporal"; - public static final String USE_ENTITY = "sc.use_entity"; - public static final String USE_PCJ = "sc.use_pcj"; - public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; - public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; - - public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; - public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; - public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; - public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; - - public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE; - - public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; - - private static final int WRITER_MAX_WRITE_THREADS = 1; - private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; - private static final long WRITER_MAX_MEMORY = 10000L; - - public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan"; - - public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates"; - public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text"; - public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term"; - - public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class"; - - public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; - - public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; - - public static final String USE_MONGO = "sc.useMongo"; - - public static boolean isDisplayQueryPlan(final Configuration conf) { - return conf.getBoolean(DISPLAY_QUERY_PLAN, false); - } - - /** - * get a value from the configuration file and throw an exception if the - * value does not exist. - * - * @param conf - * @param key - * @return - */ - private static String getStringCheckSet(final Configuration conf, final String key) { - final String value = conf.get(key); - requireNonNull(value, key + " not set"); - return value; - } - - /** - * @param conf - * @param tablename - * @return if the table was created - * @throws AccumuloException - * @throws AccumuloSecurityException - * @throws TableExistsException - */ - public static boolean createTableIfNotExists(final Configuration conf, final String tablename) - throws AccumuloException, AccumuloSecurityException, TableExistsException { - final TableOperations tops = getConnector(conf).tableOperations(); - if (!tops.exists(tablename)) { - logger.info("Creating table: " + tablename); - tops.create(tablename); - return true; - } - return false; - } - - /** - * Lookup the table name prefix in the conf and throw an error if it is - * null. Future, get table prefix from RyaDetails -- the Rya instance name - * -- also getting info from the RyaDetails should happen within - * RyaSailFactory and not ConfigUtils. - * - * @param conf - * Rya configuration map where it extracts the prefix (instance - * name) - * @return index table prefix corresponding to this Rya instance - */ - public static String getTablePrefix(final Configuration conf) { - final String tablePrefix; - tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); - requireNonNull(tablePrefix, - "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name."); - return tablePrefix; - } - - public static int getFreeTextTermLimit(final Configuration conf) { - return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); - } - - public static Set<URI> getFreeTextPredicates(final Configuration conf) { - return getPredicates(conf, FREETEXT_PREDICATES_LIST); - } - - public static Set<URI> getGeoPredicates(final Configuration conf) { - return getPredicates(conf, GEO_PREDICATES_LIST); - } - - /** - * Used for indexing statements about date & time instances and intervals. - * - * @param conf - * @return Set of predicate URI's whose objects should be date time - * literals. - */ - public static Set<URI> getTemporalPredicates(final Configuration conf) { - return getPredicates(conf, TEMPORAL_PREDICATES_LIST); - } - - protected static Set<URI> getPredicates(final Configuration conf, final String confName) { - final String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); - final Set<URI> predicates = new HashSet<>(); - for (final String prediateString : validPredicateStrings) { - predicates.add(new URIImpl(prediateString)); - } - return predicates; - } - - public static Tokenizer getFreeTextTokenizer(final Configuration conf) { - final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); - return ReflectionUtils.newInstance(c, conf); - } - - public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - final Connector connector = ConfigUtils.getConnector(conf); - return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); - } - - public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) - throws AccumuloException, AccumuloSecurityException { - final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - final Connector connector = ConfigUtils.getConnector(conf); - return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); - } - - public static Scanner createScanner(final String tablename, final Configuration conf) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - final Connector connector = ConfigUtils.getConnector(conf); - final Authorizations auths = ConfigUtils.getAuthorizations(conf); - return connector.createScanner(tablename, auths); - - } - - public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - final Connector connector = ConfigUtils.getConnector(conf); - final Authorizations auths = ConfigUtils.getAuthorizations(conf); - Integer numThreads = null; - if (conf instanceof RdfCloudTripleStoreConfiguration) { - numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); - } else { - numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); - } - return connector.createBatchScanner(tablename, auths, numThreads); - } - - public static int getWriterMaxWriteThreads(final Configuration conf) { - return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); - } - - public static long getWriterMaxLatency(final Configuration conf) { - return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); - } - - public static long getWriterMaxMemory(final Configuration conf) { - return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); - } - - public static String getUsername(final JobContext job) { - return getUsername(job.getConfiguration()); - } - - /** - * Get the Accumulo username from the configuration object that is meant to - * be used when connecting a {@link Connector} to Accumulo. - * - * @param conf - The configuration object that will be interrogated. (not null) - * @return The username if one could be found; otherwise {@code null}. - */ - public static String getUsername(final Configuration conf) { - return new AccumuloRdfConfiguration(conf).getUsername(); - } - - public static Authorizations getAuthorizations(final JobContext job) { - return getAuthorizations(job.getConfiguration()); - } - - public static Authorizations getAuthorizations(final Configuration conf) { - final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, ""); - if (authString.isEmpty()) { - return new Authorizations(); - } - return new Authorizations(authString.split(",")); - } - - public static Instance getInstance(final JobContext job) { - return getInstance(job.getConfiguration()); - } - - /** - * Create an {@link Instance} that may be used to create {@link Connector}s - * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag - * set, then the instance will be be a {@link MockInstance} instead of a - * Zookeeper backed instance. - * - * @param conf - The configuration object that will be interrogated. (not null) - * @return The {@link Instance} that may be used to connect to Accumulo. - */ - public static Instance getInstance(final Configuration conf) { - // Pull out the Accumulo specific configuration values. - final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf); - String instanceName = accConf.getInstanceName(); - String zoookeepers = accConf.getZookeepers(); - - // Create an Instance a mock if the mock flag is set. - if (useMockInstance(conf)) { - return new MockInstance(instanceName); - } - - // Otherwise create an Instance to a Zookeeper managed instance of Accumulo. - return new ZooKeeperInstance(instanceName, zoookeepers); - } - - public static String getPassword(final JobContext job) { - return getPassword(job.getConfiguration()); - } - - /** - * Get the Accumulo password from the configuration object that is meant to - * be used when connecting a {@link Connector} to Accumulo. - * - * @param conf - The configuration object that will be interrogated. (not null) - * @return The password if one could be found; otherwise an empty string. - */ - public static String getPassword(final Configuration conf) { - return new AccumuloRdfConfiguration(conf).getPassword(); - } - - public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException { - return getConnector(job.getConfiguration()); - } - - /** - * Create an Accumulo {@link Connector} using the configured connection information. - * If the connection information points to a mock instance of Accumulo, then the - * {@link #USE_MOCK_INSTANCE} flag must be set. - * - * @param conf - Configures how the connector will be built. (not null) - * @return A {@link Connector} that may be used to interact with the configured Accumulo instance. - * @throws AccumuloException The connector couldn't be created because of an Accumulo problem. - * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation. - */ - public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException { - return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) ); - } - - /** - * Indicates that a Mock instance of Accumulo is being used to back the Rya instance. - * - * @param conf - The configuration object that will be interrogated. (not null) - * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}. - */ - public static boolean useMockInstance(final Configuration conf) { - return new AccumuloRdfConfiguration(conf).useMockInstance(); - } - - protected static int getNumPartitions(final Configuration conf) { - return conf.getInt(NUM_PARTITIONS, 25); - } - - public static int getFreeTextDocNumPartitions(final Configuration conf) { - return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); - } - - public static int getFreeTextTermNumPartitions(final Configuration conf) { - return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); - } - - public static boolean getUseFreeText(final Configuration conf) { - return conf.getBoolean(USE_FREETEXT, false); - } - - public static boolean getUseTemporal(final Configuration conf) { - return conf.getBoolean(USE_TEMPORAL, false); - } - - public static boolean getUseEntity(final Configuration conf) { - return conf.getBoolean(USE_ENTITY, false); - } - - public static boolean getUsePCJ(final Configuration conf) { - return conf.getBoolean(USE_PCJ, false); - } - - public static boolean getUseOptimalPCJ(final Configuration conf) { - return conf.getBoolean(USE_OPTIMAL_PCJ, false); - } - - public static boolean getUsePcjUpdaterIndex(final Configuration conf) { - return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false); - } - - - /** - * @return The name of the Fluo Application this instance of RYA is using to - * incrementally update PCJs. - */ - // TODO delete this eventually and use Details table - public Optional<String> getFluoAppName(final Configuration conf) { - return Optional.fromNullable(conf.get(FLUO_APP_NAME)); - } - - - public static boolean getUseMongo(final Configuration conf) { - return conf.getBoolean(USE_MONGO, false); - } - - - public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { - - final List<String> indexList = Lists.newArrayList(); - final List<String> optimizers = Lists.newArrayList(); - - boolean useFilterIndex = false; - - if (ConfigUtils.getUseMongo(conf)) { - if (getUseFreeText(conf)) { - indexList.add(MongoFreeTextIndexer.class.getName()); - useFilterIndex = true; - } - - if (getUseEntity(conf)) { - indexList.add(MongoEntityIndexer.class.getName()); - optimizers.add(EntityIndexOptimizer.class.getName()); - } - - if (getUseTemporal(conf)) { - indexList.add(MongoTemporalIndexer.class.getName()); - useFilterIndex = true; - } - } else { - if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { - conf.setPcjOptimizer(PCJOptimizer.class); - } - - if (getUsePcjUpdaterIndex(conf)) { - indexList.add(PrecomputedJoinIndexer.class.getName()); - } - - if (getUseFreeText(conf)) { - indexList.add(AccumuloFreeTextIndexer.class.getName()); - useFilterIndex = true; - } - - if (getUseTemporal(conf)) { - indexList.add(AccumuloTemporalIndexer.class.getName()); - useFilterIndex = true; - } - - if (getUseEntity(conf)) { - indexList.add(EntityCentricIndex.class.getName()); - optimizers.add(EntityOptimizer.class.getName()); - } - } - - if (useFilterIndex) { - optimizers.add(FilterFunctionOptimizer.class.getName()); - } - - if (conf.getUseStatementMetadata()) { - optimizers.add(StatementMetadataOptimizer.class.getName()); - } - -<<<<<<< HEAD - conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {})); - conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {})); - } -} -======= - final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS); - if(existingIndexers != null ) { - for(final String idx : existingIndexers) { - indexList.add(idx); - } - } - - final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS); - if(existingOptimizers != null ) { - for(final String opt : existingOptimizers) { - optimizers.add(opt); - } - } - - conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); - conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); - } - - - -} ->>>>>>> RYA-236 Changes to other indexers
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java index d73e180..4faf4a0 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/external/PrecomputedJoinIndexerConfig.java @@ -47,7 +47,11 @@ public class PrecomputedJoinIndexerConfig { /** * Stores each PCJ within an Accumulo table. */ - ACCUMULO; + ACCUMULO, + /** + * Stores each PCJ within a MongoDB collection. + */ + MONGO; } /** @@ -106,8 +110,6 @@ public class PrecomputedJoinIndexerConfig { return Optional.fromNullable(updaterType); } - - public boolean getUseFluoUpdater() { return config.getBoolean(ConfigUtils.USE_PCJ_UPDATER_INDEX, false); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java new file mode 100644 index 0000000..d3fa07e --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjIndexSetProvider.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import java.util.List; +import java.util.Map; + +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.openrdf.query.MalformedQueryException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.mongodb.MongoClient; + +/** + * Implementation of {@link AbstractPcjIndexSetProvider} for MongoDB. + */ +public class MongoPcjIndexSetProvider extends AbstractPcjIndexSetProvider { + /** + * Creates a new {@link MongoPcjIndexSetProvider}. + * @param conf - The configuration for this provider. (not null) + */ + public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf) { + super(conf); + } + + /** + * Creates a new {@link MongoPcjIndexSetProvider}. + * @param conf - The configuration for this provider. + * @param indices - The predefined indicies on this provider. + * @param client - The {@link MongoClient} used to connect to mongo. + */ + public MongoPcjIndexSetProvider(final StatefulMongoDBRdfConfiguration conf, final List<ExternalTupleSet> indices) { + super(conf, indices); + } + + @Override + protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException { + try { + final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf; + final MongoClient client = mongoConf.getMongoClient(); + final MongoPcjDocuments pcjDocs = new MongoPcjDocuments(client, mongoConf.getRyaInstanceName()); + List<String> documents = null; + + documents = mongoConf.getPcjTables(); + // this maps associates pcj document name with pcj sparql query + final Map<String, String> indexDocuments = Maps.newLinkedHashMap(); + + try(final PrecomputedJoinStorage storage = new MongoPcjStorage(client, mongoConf.getRyaInstanceName())) { + + final boolean docsProvided = documents != null && !documents.isEmpty(); + + if (docsProvided) { + // if tables provided, associate table name with sparql + for (final String doc : documents) { + indexDocuments.put(doc, storage.getPcjMetadata(doc).getSparql()); + } + } else if (hasRyaDetails()) { + // If this is a newer install of Rya, and it has PCJ Details, then + // use those. + final List<String> ids = storage.listPcjs(); + for (final String pcjId : ids) { + indexDocuments.put(pcjId, storage.getPcjMetadata(pcjId).getSparql()); + } + } else { + // Otherwise figure it out by getting document IDs. + documents = pcjDocs.listPcjDocuments(); + for (final String pcjId : documents) { + if (pcjId.startsWith("INDEX")) { + indexDocuments.put(pcjId, pcjDocs.getPcjMetadata(pcjId).getSparql()); + } + } + } + } + + final List<ExternalTupleSet> index = Lists.newArrayList(); + if (indexDocuments.isEmpty()) { + log.info("No Index found"); + } else { + for (final String pcjID : indexDocuments.keySet()) { + final String indexSparqlString = indexDocuments.get(pcjID); + index.add(new MongoPcjQueryNode(indexSparqlString, pcjID, pcjDocs)); + } + } + return index; + } catch (final PCJStorageException | MalformedQueryException e) { + throw new PcjIndexSetException("Failed to get indicies for this PCJ index.", e); + } + } + + private boolean hasRyaDetails() { + final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf; + final RyaDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(mongoConf.getMongoClient(), mongoConf.getRyaInstanceName()); + try { + detailsRepo.getRyaInstanceDetails(); + return true; + } catch (final RyaDetailsRepositoryException e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java new file mode 100644 index 0000000..c03ee99 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPcjQueryNode.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.api.utils.IteratorWrapper; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil; +import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import info.aduna.iteration.CloseableIteration; + +/** + * Indexing Node for PCJs expressions to be inserted into execution plans. + */ +@DefaultAnnotation(NonNull.class) +public class MongoPcjQueryNode extends ExternalTupleSet implements ExternalBatchingIterator { + private static final Logger log = Logger.getLogger(MongoPcjQueryNode.class); + private final String pcjId; + private final MongoPcjDocuments pcjDocs; + + /** + * Creates a new {@link MongoPcjQueryNode}. + * + * @param sparql - sparql query whose results will be stored in PCJ document. (not empty of null) + * @param pcjId - name of an existing PCJ. (not empty or null) + * @param pcjDocs - {@link MongoPcjDocuments} used to maintain PCJs in mongo. (not null) + * + * @throws MalformedQueryException - The SPARQL query needs to contain a projection. + */ + public MongoPcjQueryNode(final String sparql, final String pcjId, final MongoPcjDocuments pcjDocs) throws MalformedQueryException { + checkArgument(!Strings.isNullOrEmpty(sparql)); + checkArgument(!Strings.isNullOrEmpty(pcjId)); + this.pcjDocs = checkNotNull(pcjDocs); + this.pcjId = pcjId; + final SPARQLParser sp = new SPARQLParser(); + final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); + final TupleExpr te = pq.getTupleExpr(); + Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), "TupleExpr is an invalid PCJ."); + + final Optional<Projection> projection = new ParsedQueryUtil().findProjection(pq); + if (!projection.isPresent()) { + throw new MalformedQueryException("SPARQL query '" + sparql + "' does not contain a Projection."); + } + setProjectionExpr(projection.get()); + } + + /** + * Creates a new {@link MongoPcjQueryNode}. + * + * @param conf - configuration to use to connect to mongo. (not null) + * @param pcjId - name of an existing PCJ. (not empty or null) + */ + public MongoPcjQueryNode(final Configuration conf, final String pcjId) { + checkNotNull(conf); + checkArgument(conf instanceof StatefulMongoDBRdfConfiguration, + "The configuration must be a StatefulMongoDBRdfConfiguration, found: " + conf.getClass().getSimpleName()); + checkArgument(!Strings.isNullOrEmpty(pcjId)); + final StatefulMongoDBRdfConfiguration statefulConf = (StatefulMongoDBRdfConfiguration) conf; + pcjDocs = new MongoPcjDocuments(statefulConf.getMongoClient(), statefulConf.getRyaInstanceName()); + this.pcjId = checkNotNull(pcjId); + } + + /** + * returns size of table for query planning + */ + @Override + public double cardinality() { + double cardinality = 0; + try { + cardinality = pcjDocs.getPcjMetadata(pcjId).getCardinality(); + } catch (final PcjException e) { + log.error("The PCJ has not been created, so has no cardinality.", e); + } + return cardinality; + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindingset) + throws QueryEvaluationException { + return this.evaluate(Collections.singleton(bindingset)); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) + throws QueryEvaluationException { + + if (bindingset.isEmpty()) { + return new IteratorWrapper<BindingSet, QueryEvaluationException>(new HashSet<BindingSet>().iterator()); + } + final CloseableIterator<BindingSet> iter = pcjDocs.getResults(pcjId, bindingset); + return new CloseableIteration<BindingSet, QueryEvaluationException>() { + @Override + public boolean hasNext() throws QueryEvaluationException { + return iter.hasNext(); + } + + @Override + public BindingSet next() throws QueryEvaluationException { + final BindingSet bs = iter.next(); + return bs; + } + + @Override + public void remove() throws QueryEvaluationException { + iter.remove(); + } + + @Override + public void close() throws QueryEvaluationException { + try { + iter.close(); + } catch (final Exception e) { + throw new QueryEvaluationException(e.getMessage(), e); + } + } + }; + } + + @Override + public String getSignature() { + return "(Mongo PcjQueryNode) " + Joiner.on(", ").join(super.getTupleExpr().getProjectionElemList().getElements()).replaceAll("\\s+", " "); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java deleted file mode 100644 index 4a15665..0000000 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.matching; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.instance.RyaDetailsRepository; -import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; -import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.matching.ExternalSetProvider; -import org.apache.rya.indexing.external.matching.QuerySegment; -import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet; -import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.sail.SailException; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s. - * This provider uses either user specified Accumulo configuration information or user a specified - * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If Accumulo configuration - * is provided, the provider connects to an instance of RyaDetails and populates the cache with - * PCJs registered in RyaDetails. - * - */ -public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> { - - private static final Logger log = Logger.getLogger(ExternalSetProvider.class); - private static final PCJToSegmentConverter converter = new PCJToSegmentConverter(); - private List<ExternalTupleSet> indexCache; - private final Configuration conf; - private boolean init = false; - - public AccumuloIndexSetProvider(final Configuration conf) { - this.conf = Objects.requireNonNull(conf); - } - - public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) { - this(conf); - indexCache = indices; - init = true; - } - - /** - * - * @return - size of underlying PCJ cache - * @throws Exception - */ - public int size() throws Exception { - if(!init) { - indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices()); - init = true; - } - return indexCache.size(); - } - - /** - * @param segment - QuerySegment used to get relevant queries form index cache for matching - * @return List of PCJs for matching - */ - @Override - public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) { - try { - if(!init) { - indexCache = PCJOptimizerUtilities.getValidPCJs(getAccIndices()); - init = true; - } - final TupleExpr query = segment.getQuery().getTupleExpr(); - final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache); - final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); - final List<ExternalTupleSet> tuples = new ArrayList<>(); - for (final ExternalTupleSet tuple: pcjs) { - final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple); - if (segment.containsQuerySegment(pcj)) { - tuples.add(tuple); - } - } - return tuples; - - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - /** - * @param segment - QuerySegment used to get relevant queries form index cache for matching - * - * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan - */ - @Override - public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) { - final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes()); - return comboGen.getValidIndexCombos(getExternalSets(segment)); - } - - /** - * - * - * @param conf - * - client configuration - * - * @return - list of {@link ExternalTupleSet}s or PCJs that are either - * specified by user in Configuration or exist in system. - * - * @throws MalformedQueryException - * @throws SailException - * @throws QueryEvaluationException - * @throws TableNotFoundException - * @throws AccumuloException - * @throws AccumuloSecurityException - * @throws PcjException - */ - private List<ExternalTupleSet> getAccIndices() throws Exception { - - Objects.requireNonNull(conf); - final String tablePrefix = Objects.requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); - final Connector conn = Objects.requireNonNull(ConfigUtils.getConnector(conf)); - List<String> tables = null; - - if (conf instanceof RdfCloudTripleStoreConfiguration) { - tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); - } - // this maps associates pcj table name with pcj sparql query - final Map<String, String> indexTables = Maps.newLinkedHashMap(); - - try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) { - final PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); - - final boolean tablesProvided = tables != null && !tables.isEmpty(); - - if (tablesProvided) { - // if tables provided, associate table name with sparql - for (final String table : tables) { - indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); - } - } else if (hasRyaDetails(tablePrefix, conn)) { - // If this is a newer install of Rya, and it has PCJ Details, then - // use those. - final List<String> ids = storage.listPcjs(); - for (final String id : ids) { - indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); - } - } else { - // Otherwise figure it out by scanning tables. - final PcjTables pcjTables = new PcjTables(); - for (final String table : conn.tableOperations().list()) { - if (table.startsWith(tablePrefix + "INDEX")) { - indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql()); - } - } - } - } - - // use table name sparql map (indexTables) to create {@link - // AccumuloIndexSet} - final List<ExternalTupleSet> index = Lists.newArrayList(); - if (indexTables.isEmpty()) { - log.info("No Index found"); - } else { - for (final String table : indexTables.keySet()) { - final String indexSparqlString = indexTables.get(table); - index.add(new AccumuloIndexSet(indexSparqlString, conf, table)); - } - } - - - return index; - } - - private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) { - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName); - try { - detailsRepo.getRyaInstanceDetails(); - return true; - } catch (final RyaDetailsRepositoryException e) { - return false; - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java index 75b48b4..8067a85 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizer.java @@ -33,6 +33,10 @@ import org.apache.rya.indexing.external.matching.QueryNodeListRater; import org.apache.rya.indexing.external.matching.QuerySegment; import org.apache.rya.indexing.external.matching.TopOfQueryFilterRelocator; import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider; +import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider; +import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.algebra.QueryModelNode; @@ -58,29 +62,32 @@ import com.google.common.base.Optional;; */ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet>implements Configurable { private static final PCJExternalSetMatcherFactory factory = new PCJExternalSetMatcherFactory(); - private AccumuloIndexSetProvider provider; + private AbstractPcjIndexSetProvider provider; private Configuration conf; private boolean init = false; public PCJOptimizer() {} public PCJOptimizer(final Configuration conf) { - setConf(conf); + setConf(conf); } /** * This constructor is designed to be used for testing. A more typical use * pattern is for a user to specify Accumulo connection details in a Configuration * file so that PCJs can be retrieved by an AccumuloIndexSetProvider. - * - * @param indices - user specified PCJs to match to query + * + * @param indices - user specified PCJs to match to query. (not null) * @param useOptimalPcj - optimize PCJ combos for matching + * @param provider - The provider to use in this optimizer. (not null) */ - public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj) { + public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj, + final AbstractPcjIndexSetProvider provider) { checkNotNull(indices); + checkNotNull(provider); conf = new Configuration(); - this.useOptimal = useOptimalPcj; - provider = new AccumuloIndexSetProvider(conf, indices); + useOptimal = useOptimalPcj; + this.provider = provider; init = true; } @@ -90,9 +97,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet> if (!init) { try { this.conf = conf; - this.useOptimal = ConfigUtils.getUseOptimalPCJ(conf); - provider = new AccumuloIndexSetProvider(conf); - } catch (Exception e) { + useOptimal = ConfigUtils.getUseOptimalPCJ(conf); + if (conf instanceof StatefulMongoDBRdfConfiguration) { + final StatefulMongoDBRdfConfiguration mongoConf = (StatefulMongoDBRdfConfiguration) conf; + provider = new MongoPcjIndexSetProvider(mongoConf); + } else { + provider = new AccumuloIndexSetProvider(conf); + } + } catch (final Exception e) { throw new Error(e); } init = true; @@ -124,14 +136,14 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet> } else { return; } - } catch (Exception e) { - throw new RuntimeException("Could not populate Accumulo Index Cache."); + } catch (final Exception e) { + throw new RuntimeException("Could not populate Index Cache.", e); } } @Override - protected ExternalSetMatcher<ExternalTupleSet> getMatcher(QuerySegment<ExternalTupleSet> segment) { + protected ExternalSetMatcher<ExternalTupleSet> getMatcher(final QuerySegment<ExternalTupleSet> segment) { return factory.getMatcher(segment); } @@ -141,7 +153,7 @@ public class PCJOptimizer extends AbstractExternalSetOptimizer<ExternalTupleSet> } @Override - protected Optional<QueryNodeListRater> getNodeListRater(QuerySegment<ExternalTupleSet> segment) { + protected Optional<QueryNodeListRater> getNodeListRater(final QuerySegment<ExternalTupleSet> segment) { return Optional.of(new BasicRater(segment.getOrderedNodes())); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java index 1ad03b6..09a2706 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerUtilities.java @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.rya.indexing.external.matching.QuerySegment; import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; import org.apache.rya.indexing.pcj.matching.QueryVariableNormalizer.VarCollector; - import org.openrdf.query.algebra.Difference; import org.openrdf.query.algebra.EmptySet; import org.openrdf.query.algebra.Filter; @@ -61,7 +60,7 @@ public class PCJOptimizerUtilities { * @param node - node to be checked for validity * @return - true if valid and false otherwise */ - public static boolean isPCJValid(TupleExpr node) { + public static boolean isPCJValid(final TupleExpr node) { final ValidQueryVisitor vqv = new ValidQueryVisitor(); node.visit(vqv); @@ -91,16 +90,16 @@ public class PCJOptimizerUtilities { * @param node - PCJ {@link ExternalTupleSet} index node to be checked for validity * @return - true if valid and false otherwise */ - public static boolean isPCJValid(ExternalTupleSet node) { + public static boolean isPCJValid(final ExternalTupleSet node) { return isPCJValid(node.getTupleExpr()); } public static List<ExternalTupleSet> getValidPCJs( - List<ExternalTupleSet> pcjs) { + final List<ExternalTupleSet> pcjs) { - Iterator<ExternalTupleSet> iterator = pcjs.iterator(); + final Iterator<ExternalTupleSet> iterator = pcjs.iterator(); while (iterator.hasNext()) { - ExternalTupleSet pcj = iterator.next(); + final ExternalTupleSet pcj = iterator.next(); if (!isPCJValid(pcj)) { iterator.remove(); } @@ -109,8 +108,8 @@ public class PCJOptimizerUtilities { } - public static Projection getProjection(TupleExpr te) { - ProjectionVisitor visitor = new ProjectionVisitor(); + public static Projection getProjection(final TupleExpr te) { + final ProjectionVisitor visitor = new ProjectionVisitor(); te.visit(visitor); return visitor.node; } @@ -120,7 +119,7 @@ public class PCJOptimizerUtilities { Projection node = null; @Override - public void meet(Projection node) { + public void meet(final Projection node) { this.node = node; } } @@ -131,13 +130,13 @@ public class PCJOptimizerUtilities { * - filters to be pushed down into next {@link QuerySegment}, or * as far down as binding variable names permit. */ - public static void relocateFilters(Set<Filter> filters) { - for (Filter filter : filters) { + public static void relocateFilters(final Set<Filter> filters) { + for (final Filter filter : filters) { FilterRelocator.relocate(filter); } } - private static Set<String> getVarNames(Collection<QueryModelNode> nodes) { + private static Set<String> getVarNames(final Collection<QueryModelNode> nodes) { List<String> tempVars; final Set<String> nodeVarNames = Sets.newHashSet(); @@ -159,8 +158,8 @@ public class PCJOptimizerUtilities { QueryModelVisitorBase<RuntimeException> { private boolean isValid = true; - private Set<QueryModelNode> filterSet = Sets.newHashSet(); - private Set<QueryModelNode> spSet = Sets.newHashSet(); + private final Set<QueryModelNode> filterSet = Sets.newHashSet(); + private final Set<QueryModelNode> spSet = Sets.newHashSet(); private int joinCount = 0; public Set<QueryModelNode> getFilters() { @@ -180,35 +179,35 @@ public class PCJOptimizerUtilities { } @Override - public void meet(Projection node) { + public void meet(final Projection node) { node.getArg().visit(this); } @Override - public void meet(Filter node) { + public void meet(final Filter node) { filterSet.add(node.getCondition()); node.getArg().visit(this); } @Override - public void meet(StatementPattern node) { + public void meet(final StatementPattern node) { spSet.add(node); } @Override - public void meet(Join node) { + public void meet(final Join node) { joinCount++; super.meet(node); } @Override - public void meet(LeftJoin node) { + public void meet(final LeftJoin node) { joinCount++; super.meet(node); } @Override - public void meetNode(QueryModelNode node) { + public void meetNode(final QueryModelNode node) { if (!(node instanceof Join || node instanceof LeftJoin || node instanceof StatementPattern || node instanceof Var || node instanceof Union || node instanceof Filter || node instanceof Projection)) { @@ -238,18 +237,18 @@ public class PCJOptimizerUtilities { protected Filter filter; protected Set<String> filterVars; - public FilterRelocator(Filter filter) { + public FilterRelocator(final Filter filter) { this.filter = filter; filterVars = VarNameCollector.process(filter.getCondition()); } - public static void relocate(Filter filter) { + public static void relocate(final Filter filter) { final FilterRelocator fr = new FilterRelocator(filter); filter.visit(fr); } @Override - protected void meetNode(QueryModelNode node) { + protected void meetNode(final QueryModelNode node) { // By default, do not traverse assert node instanceof TupleExpr; @@ -263,7 +262,7 @@ public class PCJOptimizerUtilities { } @Override - public void meet(Join join) { + public void meet(final Join join) { if (join.getRightArg().getBindingNames().containsAll(filterVars)) { // All required vars are bound by the left expr join.getRightArg().visit(this); @@ -277,12 +276,12 @@ public class PCJOptimizerUtilities { } @Override - public void meet(Filter node) { + public void meet(final Filter node) { node.getArg().visit(this); } @Override - public void meet(LeftJoin leftJoin) { + public void meet(final LeftJoin leftJoin) { if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) { leftJoin.getLeftArg().visit(this); } else { @@ -291,7 +290,7 @@ public class PCJOptimizerUtilities { } @Override - public void meet(Union union) { + public void meet(final Union union) { boolean filterMoved = false; if (Sets.intersection(union.getRightArg().getBindingNames(), filterVars).size() > 0) { relocate(filter, union.getRightArg()); @@ -300,7 +299,7 @@ public class PCJOptimizerUtilities { if (Sets.intersection(union.getLeftArg().getBindingNames(), filterVars).size() > 0) { if (filterMoved) { - Filter clone = new Filter(filter.getArg(), filter.getCondition().clone()); + final Filter clone = new Filter(filter.getArg(), filter.getCondition().clone()); relocate(clone, union.getLeftArg()); } else { relocate(filter, union.getLeftArg()); @@ -309,36 +308,36 @@ public class PCJOptimizerUtilities { } @Override - public void meet(Difference node) { + public void meet(final Difference node) { if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) { relocate(filter, node.getRightArg()); } else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) { - Filter clone = new Filter(filter.getArg(), filter + final Filter clone = new Filter(filter.getArg(), filter .getCondition().clone()); relocate(clone, node.getLeftArg()); } } @Override - public void meet(Intersection node) { + public void meet(final Intersection node) { if (Sets.intersection(node.getRightArg().getBindingNames(), filterVars).size() > 0) { relocate(filter, node.getRightArg()); } else if (Sets.intersection(node.getLeftArg().getBindingNames(), filterVars).size() > 0) { - Filter clone = new Filter(filter.getArg(), filter + final Filter clone = new Filter(filter.getArg(), filter .getCondition().clone()); relocate(clone, node.getLeftArg()); } } @Override - public void meet(EmptySet node) { + public void meet(final EmptySet node) { if (filter.getParentNode() != null) { // Remove filter from its original location filter.replaceWith(filter.getArg()); } } - protected void relocate(Filter filter, TupleExpr newFilterArg) { + protected void relocate(final Filter filter, final TupleExpr newFilterArg) { if (!filter.getArg().equals(newFilterArg)) { if (filter.getParentNode() != null) { // Remove filter from its original location @@ -351,10 +350,8 @@ public class PCJOptimizerUtilities { } } - - - public static boolean tupleContainsLeftJoins(TupleExpr node) { - LeftJoinVisitor lj = new LeftJoinVisitor(); + public static boolean tupleContainsLeftJoins(final TupleExpr node) { + final LeftJoinVisitor lj = new LeftJoinVisitor(); node.visit(lj); return lj.containsLeftJoin; } @@ -368,18 +365,8 @@ public class PCJOptimizerUtilities { } @Override - public void meet(LeftJoin node) { + public void meet(final LeftJoin node) { containsLeftJoin = true; } } - - - - - - - - - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java new file mode 100644 index 0000000..984153a --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AbstractPcjIndexSetProvider.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.matching.provider; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; +import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; +import org.apache.rya.indexing.external.matching.ExternalSetProvider; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities; +import org.apache.rya.indexing.pcj.matching.PCJToSegmentConverter; +import org.openrdf.query.algebra.TupleExpr; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Abstraction of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s. + * Implementations of this use either a user specified configuration information or user a specified + * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If a configuration + * is provided, the provider connects to an instance of RyaDetails and populates the cache with + * PCJs registered in RyaDetails. + */ +public abstract class AbstractPcjIndexSetProvider implements ExternalSetProvider<ExternalTupleSet> { + protected static final Logger log = Logger.getLogger(AbstractPcjIndexSetProvider.class); + protected static final PCJToSegmentConverter converter = new PCJToSegmentConverter(); + protected List<ExternalTupleSet> indexCache; + protected final Configuration conf; + protected boolean init = false; + + /** + * Creates a new {@link AbstractPcjIndexSetProvider} based on configuration only. + * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}. + */ + public AbstractPcjIndexSetProvider(final Configuration conf) { + this.conf = requireNonNull(conf); + } + + /** + * Creates a new {@link AbstractPcjIndexSetProvider} based user provided {@link ExternalTupleSet}s. + * @param conf - The {@link Configuration} used to connect to {@link RyaDetails}. + * @param indices - The {@link ExternalTupleSet}s to populate the internal cache. + */ + public AbstractPcjIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) { + requireNonNull(conf); + this.conf = conf; + indexCache = indices; + init = true; + } + + + /** + * + * @param indices + */ + @VisibleForTesting + public void setIndices(final List<ExternalTupleSet> indices) { + indexCache = indices; + init = true; + } + + /** + * @param segment - QuerySegment used to get relevant queries form index cache for matching + * + * @return Iterator of Lists (combos) of PCJs used to build an optimal query plan + */ + @Override + public Iterator<List<ExternalTupleSet>> getExternalSetCombos(final QuerySegment<ExternalTupleSet> segment) { + final ValidIndexCombinationGenerator comboGen = new ValidIndexCombinationGenerator(segment.getOrderedNodes()); + return comboGen.getValidIndexCombos(getExternalSets(segment)); + } + + /** + * @param segment - QuerySegment used to get relevant queries form index cache for matching + * @return List of PCJs for matching + */ + @Override + public List<ExternalTupleSet> getExternalSets(final QuerySegment<ExternalTupleSet> segment) { + try { + if(!init) { + indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices()); + init = true; + } + final TupleExpr query = segment.getQuery().getTupleExpr(); + final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(query, indexCache); + final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); + final List<ExternalTupleSet> tuples = new ArrayList<>(); + for (final ExternalTupleSet tuple: pcjs) { + final QuerySegment<ExternalTupleSet> pcj = converter.setToSegment(tuple); + if (segment.containsQuerySegment(pcj)) { + tuples.add(tuple); + } + } + return tuples; + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + /** + * @return The size of the set index cache. + * @throws Exception + */ + public int size() throws Exception { + if (!init) { + indexCache = PCJOptimizerUtilities.getValidPCJs(getIndices()); + init = true; + } + return indexCache.size(); + } + + /** + * @param conf - client configuration + * @return - list of {@link ExternalTupleSet}s or PCJs that are either + * specified by user in Configuration or exist in system. + * + * @throws Exception + */ + protected abstract List<ExternalTupleSet> getIndices() throws PcjIndexSetException; + + /** + * Exception thrown when failing to get the defined PCJS for a particular + * index. + */ + public class PcjIndexSetException extends Exception { + public PcjIndexSetException(final String message) { + super(message); + } + + public PcjIndexSetException(final String message, final Throwable cause) { + super(message, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java new file mode 100644 index 0000000..1fa3677 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/provider/AccumuloIndexSetProvider.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.matching.provider; + +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.sail.SailException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Implementation of {@link AbstractPcjIndexSetProvider} for Accumulo. + * This provider uses either user specified Accumulo configuration information or user a specified + * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If Accumulo configuration + * is provided, the provider connects to an instance of RyaDetails and populates the cache with + * PCJs registered in RyaDetails. + */ +public class AccumuloIndexSetProvider extends AbstractPcjIndexSetProvider { + private static final Logger log = Logger.getLogger(AccumuloIndexSetProvider.class); + + public AccumuloIndexSetProvider(final Configuration conf) { + super(conf); + } + + public AccumuloIndexSetProvider(final Configuration conf, final List<ExternalTupleSet> indices) { + super(conf, indices); + } + + @Override + protected List<ExternalTupleSet> getIndices() throws PcjIndexSetException { + requireNonNull(conf); + try { + final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); + final Connector conn = requireNonNull(ConfigUtils.getConnector(conf)); + List<String> tables = null; + + if (conf instanceof RdfCloudTripleStoreConfiguration) { + tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); + } + // this maps associates pcj table name with pcj sparql query + final Map<String, String> indexTables = Maps.newLinkedHashMap(); + + try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) { + final PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); + + final boolean tablesProvided = tables != null && !tables.isEmpty(); + + if (tablesProvided) { + // if tables provided, associate table name with sparql + for (final String table : tables) { + indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); + } + } else if (hasRyaDetails(tablePrefix, conn)) { + // If this is a newer install of Rya, and it has PCJ Details, then + // use those. + final List<String> ids = storage.listPcjs(); + for (final String id : ids) { + indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); + } + } else { + // Otherwise figure it out by scanning tables. + final PcjTables pcjTables = new PcjTables(); + for (final String table : conn.tableOperations().list()) { + if (table.startsWith(tablePrefix + "INDEX")) { + indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql()); + } + } + } + } + + // use table name sparql map (indexTables) to create {@link + // AccumuloIndexSet} + final List<ExternalTupleSet> index = Lists.newArrayList(); + if (indexTables.isEmpty()) { + log.info("No Index found"); + } else { + for (final String table : indexTables.keySet()) { + final String indexSparqlString = indexTables.get(table); + index.add(new AccumuloIndexSet(indexSparqlString, conf, table)); + } + } + + return index; + } catch (final PCJStorageException | AccumuloException | AccumuloSecurityException | MalformedQueryException + | SailException | QueryEvaluationException | TableNotFoundException e) { + throw new PcjIndexSetException("Failed to retrieve the indicies.", e); + } + } + + private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName); + try { + detailsRepo.getRyaInstanceDetails(); + return true; + } catch (final RyaDetailsRepositoryException e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java index 5028454..78b4f52 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java @@ -107,7 +107,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase { final String pcjId = pcjStorage.createPcj(sparql); // Run the test. - ryaClient.getBatchUpdatePCJ().get().batchUpdate(RYA_INSTANCE_NAME, pcjId); + ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId); // Verify the correct results were loaded into the PCJ table. final Set<BindingSet> expectedResults = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java index da717d7..a8c7455 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloInstallIT.java @@ -23,10 +23,9 @@ import static org.junit.Assert.assertTrue; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.rya.accumulo.AccumuloITBase; -import org.apache.rya.api.client.Install; import org.apache.rya.api.client.Install.DuplicateInstanceNameException; import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.RyaClientException; import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; @@ -41,6 +40,14 @@ public class AccumuloInstallIT extends AccumuloITBase { public void install() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException { // Install an instance of Rya. final String instanceName = getRyaInstanceName(); + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + final InstallConfiguration installConfig = InstallConfiguration.builder() .setEnableTableHashPrefix(false) .setEnableEntityCentricIndex(false) @@ -48,39 +55,58 @@ public class AccumuloInstallIT extends AccumuloITBase { .setEnableTemporalIndex(false) .setEnablePcjIndex(false) .setEnableGeoIndex(false) - .setFluoPcjAppName("fluo_app_name") .build(); + + ryaClient.getInstall().install(instanceName, installConfig); + + // Check that the instance exists. + assertTrue( ryaClient.getInstanceExists().exists(instanceName) ); + } + + @Test + public void install_withIndexers() throws AccumuloException, AccumuloSecurityException, DuplicateInstanceNameException, RyaClientException, NotInitializedException, RyaDetailsRepositoryException { + // Install an instance of Rya. + final String instanceName = getRyaInstanceName(); final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( - getUsername(), - getPassword().toCharArray(), - getInstanceName(), - getZookeepers()); + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(true) + .setEnableFreeTextIndex(true) + .setEnableTemporalIndex(true) + .setEnablePcjIndex(true) + .setEnableGeoIndex(true) + .build(); - final Install install = new AccumuloInstall(connectionDetails, getConnector()); - install.install(instanceName, installConfig); + ryaClient.getInstall().install(instanceName, installConfig); // Check that the instance exists. - final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); - assertTrue( instanceExists.exists(instanceName) ); + assertTrue( ryaClient.getInstanceExists().exists(instanceName) ); } @Test(expected = DuplicateInstanceNameException.class) public void install_alreadyExists() throws DuplicateInstanceNameException, RyaClientException, AccumuloException, AccumuloSecurityException { // Install an instance of Rya. final String instanceName = getRyaInstanceName(); - final InstallConfiguration installConfig = InstallConfiguration.builder().build(); - final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( - getUsername(), - getPassword().toCharArray(), - getInstanceName(), - getZookeepers()); + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); - final Install install = new AccumuloInstall(connectionDetails, getConnector()); - install.install(instanceName, installConfig); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + + final InstallConfiguration installConfig = InstallConfiguration.builder().build(); + ryaClient.getInstall().install(instanceName, installConfig); // Install it again. - install.install(instanceName, installConfig); + ryaClient.getInstall().install(instanceName, installConfig); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java index c4e4d54..e88e35c 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloUninstallIT.java @@ -22,11 +22,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.rya.accumulo.AccumuloITBase; -import org.apache.rya.api.client.Install; import org.apache.rya.api.client.Install.InstallConfiguration; import org.apache.rya.api.client.InstanceDoesNotExistException; -import org.apache.rya.api.client.InstanceExists; -import org.apache.rya.api.client.Uninstall; +import org.apache.rya.api.client.RyaClient; import org.junit.Test; /** @@ -62,20 +60,16 @@ public class AccumuloUninstallIT extends AccumuloITBase { getPassword().toCharArray(), getInstanceName(), getZookeepers()); - - final Install install = new AccumuloInstall(connectionDetails, getConnector()); - final String ryaInstanceName = getRyaInstanceName(); - install.install(ryaInstanceName, installConfig); + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + ryaClient.getInstall().install(getRyaInstanceName(), installConfig); // Check that the instance exists. - final InstanceExists instanceExists = new AccumuloInstanceExists(connectionDetails, getConnector()); - assertTrue( instanceExists.exists(ryaInstanceName) ); + assertTrue( ryaClient.getInstanceExists().exists(getRyaInstanceName()) ); // Uninstall the instance of Rya. - final Uninstall uninstall = new AccumuloUninstall(connectionDetails, getConnector()); - uninstall.uninstall(ryaInstanceName); + ryaClient.getUninstall().uninstall(getRyaInstanceName()); // Verify that it no longer exists. - assertFalse( instanceExists.exists(ryaInstanceName) ); + assertFalse( ryaClient.getInstanceExists().exists(getRyaInstanceName()) ); } } \ No newline at end of file
