RYA-239 GeoTemporal tests added Added tests and fixes that were needed while writing tests. Closes #138
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/646d21b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/646d21b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/646d21b4 Branch: refs/heads/master Commit: 646d21b4e6ed9f7e010ffa8e75b2801f48456c1e Parents: 440a4bf Author: isper3at <[email protected]> Authored: Fri Mar 31 13:38:30 2017 -0400 Committer: Aaron Mihalik <[email protected]> Committed: Wed Jun 14 14:42:45 2017 -0400 ---------------------------------------------------------------------- .../apache/rya/indexing/TemporalTupleSet.java | 2 - .../rya/indexing/accumulo/ConfigUtils.java.orig | 529 +++++++++++++++++++ .../indexing/entity/update/EntityUpdater.java | 4 +- .../rya/indexing/mongodb/MongoDbSmartUri.java | 9 +- .../mongodb/update/DocumentUpdater.java | 98 ---- .../mongodb/update/MongoDocumentUpdater.java | 98 ++++ .../mongodb/update/RyaObjectStorage.java | 1 + .../GeoEnabledFilterFunctionOptimizer.java | 47 +- .../indexing/accumulo/geo/GeoParseUtils.java | 75 ++- .../GeoTemporalIndexSetProvider.java | 46 +- .../geotemporal/GeoTemporalIndexer.java | 61 ++- .../geotemporal/model/EventQueryNode.java | 121 ++++- .../geotemporal/mongo/EventUpdater.java | 4 +- .../GeoTemporalMongoDBStorageStrategy.java | 3 +- .../geotemporal/mongo/MongoEventStorage.java | 5 +- .../mongo/MongoGeoTemporalIndexer.java | 6 +- .../geotemporal/GeoTemporalProviderTest.java | 222 ++++++++ .../geotemporal/GeoTemporalTestBase.java | 140 +++++ .../geotemporal/MongoGeoTemporalIndexIT.java | 174 ++++++ .../geotemporal/model/EventQueryNodeTest.java | 368 +++++++++++++ .../mongo/EventDocumentConverterTest.java | 64 +++ .../GeoTemporalMongoDBStorageStrategyTest.java | 469 ++++++++++++++++ .../mongo/MongoEventStorageTest.java | 197 +++++++ .../mongo/MongoGeoTemporalIndexerIT.java | 126 +++++ .../indexing/geotemporal/mongo/MongoITBase.java | 81 +++ 25 files changed, 2730 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java index 808afdf..7cb4e6c 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java @@ -126,10 +126,8 @@ public class TemporalTupleSet extends ExternalTupleSet { public static class TemporalSearchFunctionFactory { private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); private final TemporalIndexer temporalIndexer; - Configuration conf; public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) { - this.conf = conf; this.temporalIndexer = temporalIndexer; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig new file mode 100644 index 0000000..9311200 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java.orig @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.utils.ConnectorFactory; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.indexing.FilterFunctionOptimizer; +import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; +import org.apache.rya.indexing.accumulo.entity.EntityOptimizer; +import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import org.apache.rya.indexing.accumulo.freetext.LuceneTokenizer; +import org.apache.rya.indexing.accumulo.freetext.Tokenizer; +import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.apache.rya.indexing.entity.EntityIndexOptimizer; +import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer; +import org.apache.rya.indexing.external.PrecomputedJoinIndexer; +import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer; +import org.apache.rya.indexing.pcj.matching.PCJOptimizer; +import org.apache.rya.indexing.statement.metadata.matching.StatementMetadataOptimizer; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +/** + * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. + * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. + * New code must separate parameters that are set at Rya install time from that which is specific to the client. + * Also Accumulo index tables are pushed down to the implementation and not configured in conf. + */ +public class ConfigUtils { + private static final Logger logger = Logger.getLogger(ConfigUtils.class); + + /** + * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_TBL_PREFIX} instead. + */ + @Deprecated + public static final String CLOUDBASE_TBL_PREFIX = RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX; + + /** + * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_INSTANCE} instead. + */ + @Deprecated + public static final String CLOUDBASE_INSTANCE = AccumuloRdfConfiguration.CLOUDBASE_INSTANCE; + + /** + * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_ZOOKEEPERS} instead. + */ + @Deprecated + public static final String CLOUDBASE_ZOOKEEPERS = AccumuloRdfConfiguration.CLOUDBASE_ZOOKEEPERS; + + /** + * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_USER} instead. + */ + @Deprecated + public static final String CLOUDBASE_USER = AccumuloRdfConfiguration.CLOUDBASE_USER; + + /** + * @Deprecated use {@link AccumuloRdfConfiguration#CLOUDBASE_PASSWORD} instead. + */ + @Deprecated + public static final String CLOUDBASE_PASSWORD = AccumuloRdfConfiguration.CLOUDBASE_PASSWORD; + /** + * @Deprecated use {@link RdfCloudTripleStoreConfiguration#CONF_QUERY_AUTH} instead. + */ + @Deprecated + public static final String CLOUDBASE_AUTHS = RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH; + + public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads"; + public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency"; + public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory"; + + public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit"; + + public static final String USE_FREETEXT = "sc.use_freetext"; + public static final String USE_TEMPORAL = "sc.use_temporal"; + public static final String USE_ENTITY = "sc.use_entity"; + public static final String USE_PCJ = "sc.use_pcj"; + public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; + public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; + + public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; + public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; + public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; + + public static final String USE_MOCK_INSTANCE = AccumuloRdfConfiguration.USE_MOCK_INSTANCE; + + public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; + + private static final int WRITER_MAX_WRITE_THREADS = 1; + private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; + private static final long WRITER_MAX_MEMORY = 10000L; + + public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan"; + + public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates"; + public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text"; + public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term"; + + public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class"; + + public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; + + public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; + + public static final String USE_MONGO = "sc.useMongo"; + + public static boolean isDisplayQueryPlan(final Configuration conf) { + return conf.getBoolean(DISPLAY_QUERY_PLAN, false); + } + + /** + * get a value from the configuration file and throw an exception if the + * value does not exist. + * + * @param conf + * @param key + * @return + */ + private static String getStringCheckSet(final Configuration conf, final String key) { + final String value = conf.get(key); + requireNonNull(value, key + " not set"); + return value; + } + + /** + * @param conf + * @param tablename + * @return if the table was created + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws TableExistsException + */ + public static boolean createTableIfNotExists(final Configuration conf, final String tablename) + throws AccumuloException, AccumuloSecurityException, TableExistsException { + final TableOperations tops = getConnector(conf).tableOperations(); + if (!tops.exists(tablename)) { + logger.info("Creating table: " + tablename); + tops.create(tablename); + return true; + } + return false; + } + + /** + * Lookup the table name prefix in the conf and throw an error if it is + * null. Future, get table prefix from RyaDetails -- the Rya instance name + * -- also getting info from the RyaDetails should happen within + * RyaSailFactory and not ConfigUtils. + * + * @param conf + * Rya configuration map where it extracts the prefix (instance + * name) + * @return index table prefix corresponding to this Rya instance + */ + public static String getTablePrefix(final Configuration conf) { + final String tablePrefix; + tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + requireNonNull(tablePrefix, + "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name."); + return tablePrefix; + } + + public static int getFreeTextTermLimit(final Configuration conf) { + return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); + } + + public static Set<URI> getFreeTextPredicates(final Configuration conf) { + return getPredicates(conf, FREETEXT_PREDICATES_LIST); + } + + public static Set<URI> getGeoPredicates(final Configuration conf) { + return getPredicates(conf, GEO_PREDICATES_LIST); + } + + /** + * Used for indexing statements about date & time instances and intervals. + * + * @param conf + * @return Set of predicate URI's whose objects should be date time + * literals. + */ + public static Set<URI> getTemporalPredicates(final Configuration conf) { + return getPredicates(conf, TEMPORAL_PREDICATES_LIST); + } + + protected static Set<URI> getPredicates(final Configuration conf, final String confName) { + final String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); + final Set<URI> predicates = new HashSet<>(); + for (final String prediateString : validPredicateStrings) { + predicates.add(new URIImpl(prediateString)); + } + return predicates; + } + + public static Tokenizer getFreeTextTokenizer(final Configuration conf) { + final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); + return ReflectionUtils.newInstance(c, conf); + } + + public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); + return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) + throws AccumuloException, AccumuloSecurityException { + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); + return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static Scanner createScanner(final String tablename, final Configuration conf) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); + return connector.createScanner(tablename, auths); + + } + + public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); + Integer numThreads = null; + if (conf instanceof RdfCloudTripleStoreConfiguration) { + numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); + } else { + numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); + } + return connector.createBatchScanner(tablename, auths, numThreads); + } + + public static int getWriterMaxWriteThreads(final Configuration conf) { + return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); + } + + public static long getWriterMaxLatency(final Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); + } + + public static long getWriterMaxMemory(final Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); + } + + public static String getUsername(final JobContext job) { + return getUsername(job.getConfiguration()); + } + + /** + * Get the Accumulo username from the configuration object that is meant to + * be used when connecting a {@link Connector} to Accumulo. + * + * @param conf - The configuration object that will be interrogated. (not null) + * @return The username if one could be found; otherwise {@code null}. + */ + public static String getUsername(final Configuration conf) { + return new AccumuloRdfConfiguration(conf).getUsername(); + } + + public static Authorizations getAuthorizations(final JobContext job) { + return getAuthorizations(job.getConfiguration()); + } + + public static Authorizations getAuthorizations(final Configuration conf) { + final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, ""); + if (authString.isEmpty()) { + return new Authorizations(); + } + return new Authorizations(authString.split(",")); + } + + public static Instance getInstance(final JobContext job) { + return getInstance(job.getConfiguration()); + } + + /** + * Create an {@link Instance} that may be used to create {@link Connector}s + * to Accumulo. If the configuration has the {@link #USE_MOCK_INSTANCE} flag + * set, then the instance will be be a {@link MockInstance} instead of a + * Zookeeper backed instance. + * + * @param conf - The configuration object that will be interrogated. (not null) + * @return The {@link Instance} that may be used to connect to Accumulo. + */ + public static Instance getInstance(final Configuration conf) { + // Pull out the Accumulo specific configuration values. + final AccumuloRdfConfiguration accConf = new AccumuloRdfConfiguration(conf); + String instanceName = accConf.getInstanceName(); + String zoookeepers = accConf.getZookeepers(); + + // Create an Instance a mock if the mock flag is set. + if (useMockInstance(conf)) { + return new MockInstance(instanceName); + } + + // Otherwise create an Instance to a Zookeeper managed instance of Accumulo. + return new ZooKeeperInstance(instanceName, zoookeepers); + } + + public static String getPassword(final JobContext job) { + return getPassword(job.getConfiguration()); + } + + /** + * Get the Accumulo password from the configuration object that is meant to + * be used when connecting a {@link Connector} to Accumulo. + * + * @param conf - The configuration object that will be interrogated. (not null) + * @return The password if one could be found; otherwise an empty string. + */ + public static String getPassword(final Configuration conf) { + return new AccumuloRdfConfiguration(conf).getPassword(); + } + + public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException { + return getConnector(job.getConfiguration()); + } + + /** + * Create an Accumulo {@link Connector} using the configured connection information. + * If the connection information points to a mock instance of Accumulo, then the + * {@link #USE_MOCK_INSTANCE} flag must be set. + * + * @param conf - Configures how the connector will be built. (not null) + * @return A {@link Connector} that may be used to interact with the configured Accumulo instance. + * @throws AccumuloException The connector couldn't be created because of an Accumulo problem. + * @throws AccumuloSecurityException The connector couldn't be created because of an Accumulo security violation. + */ + public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException { + return ConnectorFactory.connect( new AccumuloRdfConfiguration(conf) ); + } + + /** + * Indicates that a Mock instance of Accumulo is being used to back the Rya instance. + * + * @param conf - The configuration object that will be interrogated. (not null) + * @return {@code true} if the Rya instance is backed by a mock Accumulo; otherwise {@code false}. + */ + public static boolean useMockInstance(final Configuration conf) { + return new AccumuloRdfConfiguration(conf).useMockInstance(); + } + + protected static int getNumPartitions(final Configuration conf) { + return conf.getInt(NUM_PARTITIONS, 25); + } + + public static int getFreeTextDocNumPartitions(final Configuration conf) { + return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static int getFreeTextTermNumPartitions(final Configuration conf) { + return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static boolean getUseFreeText(final Configuration conf) { + return conf.getBoolean(USE_FREETEXT, false); + } + + public static boolean getUseTemporal(final Configuration conf) { + return conf.getBoolean(USE_TEMPORAL, false); + } + + public static boolean getUseEntity(final Configuration conf) { + return conf.getBoolean(USE_ENTITY, false); + } + + public static boolean getUsePCJ(final Configuration conf) { + return conf.getBoolean(USE_PCJ, false); + } + + public static boolean getUseOptimalPCJ(final Configuration conf) { + return conf.getBoolean(USE_OPTIMAL_PCJ, false); + } + + public static boolean getUsePcjUpdaterIndex(final Configuration conf) { + return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false); + } + + + /** + * @return The name of the Fluo Application this instance of RYA is using to + * incrementally update PCJs. + */ + // TODO delete this eventually and use Details table + public Optional<String> getFluoAppName(final Configuration conf) { + return Optional.fromNullable(conf.get(FLUO_APP_NAME)); + } + + + public static boolean getUseMongo(final Configuration conf) { + return conf.getBoolean(USE_MONGO, false); + } + + + public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + + final List<String> indexList = Lists.newArrayList(); + final List<String> optimizers = Lists.newArrayList(); + + boolean useFilterIndex = false; + + if (ConfigUtils.getUseMongo(conf)) { + if (getUseFreeText(conf)) { + indexList.add(MongoFreeTextIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseEntity(conf)) { + indexList.add(MongoEntityIndexer.class.getName()); + optimizers.add(EntityIndexOptimizer.class.getName()); + } + + if (getUseTemporal(conf)) { + indexList.add(MongoTemporalIndexer.class.getName()); + useFilterIndex = true; + } + } else { + if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { + conf.setPcjOptimizer(PCJOptimizer.class); + } + + if (getUsePcjUpdaterIndex(conf)) { + indexList.add(PrecomputedJoinIndexer.class.getName()); + } + + if (getUseFreeText(conf)) { + indexList.add(AccumuloFreeTextIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseTemporal(conf)) { + indexList.add(AccumuloTemporalIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseEntity(conf)) { + indexList.add(EntityCentricIndex.class.getName()); + optimizers.add(EntityOptimizer.class.getName()); + } + } + + if (useFilterIndex) { + optimizers.add(FilterFunctionOptimizer.class.getName()); + } + + if (conf.getUseStatementMetadata()) { + optimizers.add(StatementMetadataOptimizer.class.getName()); + } + +<<<<<<< HEAD + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[] {})); + conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[] {})); + } +} +======= + final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS); + if(existingIndexers != null ) { + for(final String idx : existingIndexers) { + indexList.add(idx); + } + } + + final String[] existingOptimizers = conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS); + if(existingOptimizers != null ) { + for(final String opt : existingOptimizers) { + optimizers.add(opt); + } + } + + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); + conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); + } + + + +} +>>>>>>> RYA-236 Changes to other indexers http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java index 2edbe37..91f1146 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java @@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.entity.model.Entity; import org.apache.rya.indexing.entity.storage.EntityStorage; import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; -import org.apache.rya.indexing.mongodb.update.DocumentUpdater; +import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater; import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Performs update operations over an {@link EntityStorage}. */ @DefaultAnnotation(NonNull.class) -public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{ +public class EntityUpdater implements MongoDocumentUpdater<RyaURI, Entity>{ private final EntityStorage storage; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java index 9fdfad6..cbc8796 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/MongoDbSmartUri.java @@ -36,6 +36,7 @@ import org.apache.rya.indexing.entity.storage.EntityStorage; import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor; import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; import org.apache.rya.indexing.smarturi.SmartUriAdapter; import org.apache.rya.indexing.smarturi.SmartUriException; import org.apache.rya.indexing.smarturi.SmartUriStorage; @@ -74,7 +75,7 @@ public class MongoDbSmartUri implements SmartUriStorage { // Create it. try { entityStorage.create(entity); - } catch (final EntityStorageException e) { + } catch (final ObjectStorageException e) { throw new SmartUriException("Failed to create entity storage", e); } } @@ -86,7 +87,7 @@ public class MongoDbSmartUri implements SmartUriStorage { // Create it. try { entityStorage.create(entity); - } catch (final EntityStorageException e) { + } catch (final ObjectStorageException e) { throw new SmartUriException("Failed to create entity storage", e); } } @@ -98,7 +99,7 @@ public class MongoDbSmartUri implements SmartUriStorage { // Update it. try { entityStorage.update(oldEntity, updatedEntity); - } catch (final EntityStorageException e) { + } catch (final ObjectStorageException e) { throw new SmartUriException("Failed to update entity", e); } } @@ -111,7 +112,7 @@ public class MongoDbSmartUri implements SmartUriStorage { try { final Optional<Entity> resultEntity = entityStorage.get(subject); return resultEntity.get(); - } catch (final EntityStorageException e) { + } catch (final ObjectStorageException e) { throw new SmartUriException("Failed to query entity storage", e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java deleted file mode 100644 index 0b9db13..0000000 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.mongodb.update; - -import static java.util.Objects.requireNonNull; - -import java.util.Optional; -import java.util.function.Function; - -import org.apache.rya.indexing.mongodb.IndexingException; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Performs an update operation on a Document in mongodb. - * @param <T> - The key to find the object. - * @param <V> - The type of object to get updated. - */ -@DefaultAnnotation(NonNull.class) -public interface DocumentUpdater<T, V> { - public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException { - requireNonNull(mutator); - - // Fetch the current state of the Entity. - boolean completed = false; - while(!completed) { - //this cast is safe since the mutator interface is defined below to use Optional<V> - final Optional<V> old = getOld(key); - final Optional<V> updated = mutator.apply(old); - - final boolean doWork = updated.isPresent(); - if(doWork) { - if(!old.isPresent()) { - create(updated.get()); - } else { - update(old.get(), updated.get()); - } - } - completed = true; - } - } - - Optional<V> getOld(T key) throws IndexingException; - - void create(final V newObj) throws IndexingException; - - void update(final V old, final V updated) throws IndexingException; - - /** - * Implementations of this interface are used to update the state of a - * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}. - * </p> - * This table describes what the updater will do depending on if the object - * exists and if an updated object is returned. - * </p> - * <table border="1px"> - * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr> - * <tr> - * <td>true</td> - * <td>true</td> - * <td>The old Object will be updated using the returned state.</td> - * </tr> - * <tr> - * <td>true</td> - * <td>false</td> - * <td>No work is performed.</td> - * </tr> - * <tr> - * <td>false</td> - * <td>true</td> - * <td>A new Object will be created using the returned state.</td> - * </tr> - * <tr> - * <td>false</td> - * <td>false</td> - * <td>No work is performed.</td> - * </tr> - * </table> - */ - public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java new file mode 100644 index 0000000..a7a3eb9 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/MongoDocumentUpdater.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.update; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.function.Function; + +import org.apache.rya.indexing.mongodb.IndexingException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Performs an update operation on a Document in mongodb. + * @param <T> - The key to find the object. + * @param <V> - The type of object to get updated. + */ +@DefaultAnnotation(NonNull.class) +public interface MongoDocumentUpdater<T, V> { + public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException { + requireNonNull(mutator); + + // Fetch the current state of the Entity. + boolean completed = false; + while(!completed) { + //this cast is safe since the mutator interface is defined below to use Optional<V> + final Optional<V> old = getOld(key); + final Optional<V> updated = mutator.apply(old); + + final boolean doWork = updated.isPresent(); + if(doWork) { + if(!old.isPresent()) { + create(updated.get()); + } else { + update(old.get(), updated.get()); + } + } + completed = true; + } + } + + Optional<V> getOld(T key) throws IndexingException; + + void create(final V newObj) throws IndexingException; + + void update(final V old, final V updated) throws IndexingException; + + /** + * Implementations of this interface are used to update the state of a + * {@link MongoDocumentUpdater#V} in unison with a {@link MongoDocumentUpdater}. + * </p> + * This table describes what the updater will do depending on if the object + * exists and if an updated object is returned. + * </p> + * <table border="1px"> + * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr> + * <tr> + * <td>true</td> + * <td>true</td> + * <td>The old Object will be updated using the returned state.</td> + * </tr> + * <tr> + * <td>true</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>true</td> + * <td>A new Object will be created using the returned state.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * </table> + */ + public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java index 10feb0d..bd04368 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java @@ -25,6 +25,7 @@ import org.apache.rya.indexing.mongodb.IndexingException; /** * Stores and provides access to objects of type T. + * The RyaURI subject is the primary storage key used. * @param <T> - The type of object to store/access. */ public interface RyaObjectStorage<T> { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java index d773831..bf6b632 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoEnabledFilterFunctionOptimizer.java @@ -34,6 +34,18 @@ import org.apache.commons.lang.Validate; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet; +import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; +import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer; +import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer; import org.geotools.feature.SchemaException; import org.openrdf.model.Resource; import org.openrdf.model.URI; @@ -52,25 +64,12 @@ import org.openrdf.query.algebra.QueryModelNode; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.ValueConstant; -import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.evaluation.QueryOptimizer; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import com.google.common.collect.Lists; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import org.apache.rya.indexing.accumulo.freetext.FreeTextTupleSet; -import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; -import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import org.apache.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; -import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer; -import org.apache.rya.indexing.mongodb.temporal.MongoTemporalIndexer; - public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Configurable { private static final Logger LOG = Logger.getLogger(GeoEnabledFilterFunctionOptimizer.class); private final ValueFactory valueFactory = new ValueFactoryImpl(); @@ -232,7 +231,7 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config final URI fnUri = valueFactory.createURI(call.getURI()); final Var resultVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(fnUri, call.getArgs()); if (resultVar != null && resultVar.getName().equals(matchVar)) { - addFilter(valueFactory.createURI(call.getURI()), extractArguments(matchVar, call)); + addFilter(valueFactory.createURI(call.getURI()), GeoParseUtils.extractArguments(matchVar, call)); if (call.getParentNode() instanceof Filter || call.getParentNode() instanceof And || call.getParentNode() instanceof LeftJoin) { call.replaceWith(new ValueConstant(valueFactory.createLiteral(true))); } else { @@ -241,26 +240,6 @@ public class GeoEnabledFilterFunctionOptimizer implements QueryOptimizer, Config } } - private Value[] extractArguments(final String matchName, final FunctionCall call) { - final Value args[] = new Value[call.getArgs().size() - 1]; - int argI = 0; - for (int i = 0; i != call.getArgs().size(); ++i) { - final ValueExpr arg = call.getArgs().get(i); - if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { - continue; - } - if (arg instanceof ValueConstant) { - args[argI] = ((ValueConstant)arg).getValue(); - } else if (arg instanceof Var && ((Var)arg).hasValue()) { - args[argI] = ((Var)arg).getValue(); - } else { - throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI"); - } - ++argI; - } - return args; - } - @Override public void meet(final Filter filter) { //First visit children, then condition (reverse of default): http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java index ffba225..e8fbc3d 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoParseUtils.java @@ -14,9 +14,9 @@ import javax.xml.parsers.ParserConfigurationException; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,21 +27,25 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.log4j.Logger; +import org.apache.rya.indexing.GeoConstants; import org.geotools.gml3.GMLConfiguration; import org.geotools.xml.Parser; import org.openrdf.model.Literal; import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; import org.xml.sax.SAXException; import com.vividsolutions.jts.geom.Geometry; import com.vividsolutions.jts.io.ParseException; import com.vividsolutions.jts.io.WKTReader; -import org.apache.rya.indexing.GeoConstants; - public class GeoParseUtils { static final Logger logger = Logger.getLogger(GeoParseUtils.class); - /** + /** * @deprecated Not needed since geo literals may be WKT or GML. * * This method warns on a condition that must already be tested. Replaced by @@ -50,41 +54,41 @@ public class GeoParseUtils { * and getLiteral(statement).getDatatype() */ @Deprecated - public static String getWellKnownText(Statement statement) throws ParseException { - Literal lit = getLiteral(statement); + public static String getWellKnownText(final Statement statement) throws ParseException { + final Literal lit = getLiteral(statement); if (!GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { logger.warn("Literal is not of type " + GeoConstants.XMLSCHEMA_OGC_WKT + ": " + statement.toString()); } return lit.getLabel().toString(); } - public static Literal getLiteral(Statement statement) throws ParseException { - org.openrdf.model.Value v = statement.getObject(); + public static Literal getLiteral(final Statement statement) throws ParseException { + final org.openrdf.model.Value v = statement.getObject(); if (!(v instanceof Literal)) { throw new ParseException("Statement does not contain Literal: " + statement.toString()); } - Literal lit = (Literal) v; + final Literal lit = (Literal) v; return lit; } /** * Parse GML/wkt literal to Geometry - * + * * @param statement * @return * @throws ParseException - * @throws ParserConfigurationException - * @throws SAXException - * @throws IOException + * @throws ParserConfigurationException + * @throws SAXException + * @throws IOException */ - public static Geometry getGeometry(Statement statement) throws ParseException { + public static Geometry getGeometry(final Statement statement) throws ParseException { // handle GML or WKT - Literal lit = getLiteral(statement); + final Literal lit = getLiteral(statement); if (GeoConstants.XMLSCHEMA_OGC_WKT.equals(lit.getDatatype())) { final String wkt = lit.getLabel().toString(); return (new WKTReader()).read(wkt); } else if (GeoConstants.XMLSCHEMA_OGC_GML.equals(lit.getDatatype())) { - String gml = lit.getLabel().toString(); + final String gml = lit.getLabel().toString(); try { return getGeometryGml(gml); } catch (IOException | SAXException | ParserConfigurationException e) { @@ -102,18 +106,43 @@ public class GeoParseUtils { * @throws SAXException * @throws ParserConfigurationException */ - public static Geometry getGeometryGml(String gmlString) throws IOException, SAXException, ParserConfigurationException { - Reader reader = new StringReader(gmlString); - GMLConfiguration gmlConfiguration = new GMLConfiguration(); - Parser gmlParser = new Parser(gmlConfiguration); + public static Geometry getGeometryGml(final String gmlString) throws IOException, SAXException, ParserConfigurationException { + final Reader reader = new StringReader(gmlString); + final GMLConfiguration gmlConfiguration = new GMLConfiguration(); + final Parser gmlParser = new Parser(gmlConfiguration); // gmlParser.setStrict(false); // attempt at allowing deprecated elements, but no. // gmlParser.setValidating(false); final Geometry geometry = (Geometry) gmlParser.parse(reader); // This sometimes gets populated with the SRS/CRS: geometry.getUserData() - // Always returns 0 : geometry.getSRID() + // Always returns 0 : geometry.getSRID() //TODO geometry.setUserData(some default CRS); OR geometry.setSRID(some default CRS) - + return geometry; } + /** + * Extracts the arguments used in a {@link FunctionCall}. + * @param matchName - The variable name to match to arguments used in the {@link FunctionCall}. + * @param call - The {@link FunctionCall} to match against. + * @return - The {@link Value}s matched. + */ + public static Value[] extractArguments(final String matchName, final FunctionCall call) { + final Value args[] = new Value[call.getArgs().size() - 1]; + int argI = 0; + for (int i = 0; i != call.getArgs().size(); ++i) { + final ValueExpr arg = call.getArgs().get(i); + if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { + continue; + } + if (arg instanceof ValueConstant) { + args[argI] = ((ValueConstant)arg).getValue(); + } else if (arg instanceof Var && ((Var)arg).hasValue()) { + args[argI] = ((Var)arg).getValue(); + } else { + throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI"); + } + ++argI; + } + return args; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java index 38790c4..bf12f26 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexSetProvider.java @@ -28,22 +28,22 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.log4j.Logger; import org.apache.rya.indexing.IndexingExpr; import org.apache.rya.indexing.IndexingFunctionRegistry; import org.apache.rya.indexing.IndexingFunctionRegistry.FUNCTION_TYPE; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; import org.apache.rya.indexing.external.matching.ExternalSetProvider; import org.apache.rya.indexing.external.matching.QuerySegment; import org.apache.rya.indexing.geotemporal.model.EventQueryNode; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode.EventQueryNodeBuilder; import org.apache.rya.indexing.geotemporal.storage.EventStorage; import org.openrdf.model.URI; -import org.openrdf.model.Value; import org.openrdf.model.impl.URIImpl; import org.openrdf.query.algebra.FunctionCall; import org.openrdf.query.algebra.QueryModelNode; import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.ValueConstant; -import org.openrdf.query.algebra.ValueExpr; import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; @@ -54,6 +54,8 @@ import com.google.common.collect.Multimap; * Provides {@link GeoTupleSet}s. */ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQueryNode> { + private static final Logger LOG = Logger.getLogger(GeoTemporalIndexSetProvider.class); + //organzied by object var. Each object is a filter, or set of filters private Multimap<Var, IndexingExpr> filterMap; @@ -138,13 +140,20 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue } if(geoFilters.isPresent() && temporalFilters.isPresent() && geoPattern.isPresent() && temporalPattern.isPresent()) { - return new EventQueryNode(eventStorage, geoPattern.get(), temporalPattern.get(), geoFilters.get(), temporalFilters.get(), usedFilters); + return new EventQueryNodeBuilder() + .setStorage(eventStorage) + .setGeoPattern(geoPattern.get()) + .setTemporalPattern(temporalPattern.get()) + .setGeoFilters(geoFilters.get()) + .setTemporalFilters(temporalFilters.get()) + .setUsedFilters(usedFilters) + .build(); } else { return null; } } - private FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) { + private static FUNCTION_TYPE ensureSameType(final Collection<IndexingExpr> filters) { FUNCTION_TYPE type = null; for(final IndexingExpr filter : filters) { if(type == null) { @@ -174,7 +183,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue try { filter.visit(new FilterVisitor()); } catch (final Exception e) { - e.printStackTrace(); + LOG.error("Failed to match the filter object.", e); } } @@ -204,27 +213,7 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue private void addFilter(final FunctionCall call) { filterURI = new URIImpl(call.getURI()); final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs()); - filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call))); - } - - private Value[] extractArguments(final String matchName, final FunctionCall call) { - final Value args[] = new Value[call.getArgs().size() - 1]; - int argI = 0; - for (int i = 0; i != call.getArgs().size(); ++i) { - final ValueExpr arg = call.getArgs().get(i); - if (argI == i && arg instanceof Var && matchName.equals(((Var)arg).getName())) { - continue; - } - if (arg instanceof ValueConstant) { - args[argI] = ((ValueConstant)arg).getValue(); - } else if (arg instanceof Var && ((Var)arg).hasValue()) { - args[argI] = ((Var)arg).getValue(); - } else { - throw new IllegalArgumentException("Query error: Found " + arg + ", expected a Literal, BNode or URI"); - } - ++argI; - } - return args; + filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call))); } /** @@ -234,13 +223,12 @@ public class GeoTemporalIndexSetProvider implements ExternalSetProvider<EventQue private class FilterVisitor extends QueryModelVisitorBase<Exception> { @Override public void meet(final FunctionCall call) throws Exception { - filterURI = new URIImpl(call.getURI()); final FUNCTION_TYPE type = IndexingFunctionRegistry.getFunctionType(filterURI); if(type == FUNCTION_TYPE.GEO || type == FUNCTION_TYPE.TEMPORAL) { final Var objVar = IndexingFunctionRegistry.getResultVarFromFunctionCall(filterURI, call.getArgs()); if(objectPatterns.containsKey(objVar)) { - filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), extractArguments(objVar.getName(), call))); + filterMap.put(objVar, new IndexingExpr(filterURI, objectPatterns.get(objVar), GeoParseUtils.extractArguments(objVar.getName(), call))); matchedFilters.put(objVar, call); } else { unmatchedFilters.put(objVar, call); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java index 01b254b..cbc978b 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/GeoTemporalIndexer.java @@ -39,14 +39,50 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer { */ public abstract EventStorage getEventStorage(final Configuration conf); - public enum GeoPolicy { + /** + * Used to indicate which geo filter functions to use in a query. + */ + public static enum GeoPolicy { + /** + * The provided geo object equals the geo object where the event took place. + */ EQUALS(GeoConstants.GEO_SF_EQUALS), + + /** + * The provided geo object does not share any space with the event. + */ DISJOINT(GeoConstants.GEO_SF_DISJOINT), + + /** + * The provided geo object shares some amount of space with the event. + */ INTERSECTS(GeoConstants.GEO_SF_INTERSECTS), + + /** + * The provided geo object shares a point with the event, but only on the edge. + */ TOUCHES(GeoConstants.GEO_SF_TOUCHES), + + /** + * The provided geo object shares some, but not all space with the event. + */ CROSSES(GeoConstants.GEO_SF_CROSSES), + + /** + * The provided geo object exists completely within the event. + */ WITHIN(GeoConstants.GEO_SF_WITHIN), + + /** + * The event took place completely within the provided geo object. + */ CONTAINS(GeoConstants.GEO_SF_CONTAINS), + + /** + * The provided geo object has some but not all points in common with the event, + * are of the same dimension, and the intersection of the interiors has the + * same dimension as the geometries themselves. + */ OVERLAPS(GeoConstants.GEO_SF_OVERLAPS); private final URI uri; @@ -69,10 +105,9 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer { } } - String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; + static final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; /** - * All of the filter functions that can be used in a temporal based query. - * <p> + * Used to indicate which temporal filter functions to use in a query. */ public enum TemporalPolicy { /** @@ -106,12 +141,28 @@ public interface GeoTemporalIndexer extends RyaSecondaryIndexer { INSTANT_AFTER_INTERVAL(false, new URIImpl(TEMPORAL_NS+"afterInterval")), /** - * The provided instant in time equals the instant the event took place. + * The provided instant in time equals the start of the interval in which the event took place. */ INSTANT_START_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasBeginningInterval")), + + /** + * The provided instant in time equals the end of the interval in which the event took place. + */ INSTANT_END_INTERVAL(false, new URIImpl(TEMPORAL_NS+"hasEndInterval")), + + /** + * The provided interval equals the interval in which the event took place. + */ INTERVAL_EQUALS(false, new URIImpl(TEMPORAL_NS+"intervalEquals")), + + /** + * The provided interval is before the interval in which the event took place. + */ INTERVAL_BEFORE(false, new URIImpl(TEMPORAL_NS+"intervalBefore")), + + /** + * The provided interval is after the interval in which the event took place. + */ INTERVAL_AFTER(false, new URIImpl(TEMPORAL_NS+"intervalAfter")); private final boolean isInstant; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java index 6953714..104fca8 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/model/EventQueryNode.java @@ -29,12 +29,17 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.TemporalInstant; import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.entity.query.EntityQueryNode; import org.apache.rya.indexing.geotemporal.storage.EventStorage; import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.openrdf.model.Value; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.query.BindingSet; @@ -60,8 +65,10 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera //Information about the subject of the patterns. private final boolean subjectIsConstant; - private final Optional<String> subjectConstant; private final Optional<String> subjectVar; + //not final because if the subject is a variable and the evaluate() is + // provided a binding set that contains the subject, this optional is used. + private Optional<String> subjectConstant; //since and EventQueryNode exists in a single segment, all binding names are garunteed to be assured. private final Set<String> bindingNames; @@ -80,7 +87,7 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera * @param entities - The {@link EventStorage} that will be searched to match * {@link BindingSet}s when evaluating a query. (not null) */ - public EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException { + private EventQueryNode(final EventStorage eventStore, final StatementPattern geoPattern, final StatementPattern temporalPattern, final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters, final Collection<FunctionCall> usedFilters) throws IllegalStateException { this.geoPattern = requireNonNull(geoPattern); this.temporalPattern = requireNonNull(temporalPattern); this.geoFilters = requireNonNull(geoFilters); @@ -159,8 +166,15 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera try { final Collection<Event> searchEvents; final String subj; + //if the provided binding set has the subject already, set it to the constant subject. + if(!subjectConstant.isPresent() && bindings.hasBinding(subjectVar.get())) { + subjectConstant = Optional.of(bindings.getValue(subjectVar.get()).stringValue()); + } else if(bindings.size() != 0) { + list.add(bindings); + } + // If the subject needs to be filled in, check if the subject variable is in the binding set. - if(subjectIsConstant) { + if(subjectConstant.isPresent()) { // if it is, fetch that value and then fetch the entity for the subject. subj = subjectConstant.get(); searchEvents = eventStore.search(Optional.of(new RyaURI(subj)), Optional.of(geoFilters), Optional.of(temporalFilters)); @@ -179,7 +193,11 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera final Value temporalValue; if(event.isInstant() && event.getInstant().isPresent()) { - temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInstant().get().getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER)); + final Optional<TemporalInstant> opt = event.getInstant(); + DateTime dt = opt.get().getAsDateTime(); + dt = dt.toDateTime(DateTimeZone.UTC); + final String str = dt.toString(TemporalInstantRfc3339.FORMATTER); + temporalValue = ValueFactoryImpl.getInstance().createLiteral(str); } else if(event.getInterval().isPresent()) { temporalValue = ValueFactoryImpl.getInstance().createLiteral(event.getInterval().get().getAsPair()); } else { @@ -195,9 +213,6 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera } catch (final ObjectStorageException e) { throw new QueryEvaluationException("Failed to evaluate the binding set", e); } - if(bindings.size() != 0) { - list.add(bindings); - } return new CollectionIteration<>(list); } @@ -238,15 +253,16 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera public boolean equals(final Object other) { if(other instanceof EventQueryNode) { final EventQueryNode otherNode = (EventQueryNode)other; - - return Objects.equals(subjectIsConstant, otherNode.subjectIsConstant) && - Objects.equals(subjectVar, otherNode.subjectVar) && - Objects.equals(geoFilters, otherNode.geoFilters) && - Objects.equals(geoPattern, otherNode.geoPattern) && - Objects.equals(temporalFilters, otherNode.temporalFilters) && - Objects.equals(temporalPattern, otherNode.temporalPattern) && - Objects.equals(bindingNames, otherNode.bindingNames) && - Objects.equals(subjectConstant, otherNode.subjectConstant); + return new EqualsBuilder() + .append(subjectIsConstant, otherNode.subjectIsConstant) + .append(subjectVar, otherNode.subjectVar) + .append(geoFilters, otherNode.geoFilters) + .append(geoPattern, otherNode.geoPattern) + .append(temporalFilters, otherNode.temporalFilters) + .append(temporalPattern, otherNode.temporalPattern) + .append(bindingNames, otherNode.bindingNames) + .append(subjectConstant, otherNode.subjectConstant) + .isEquals(); } return false; } @@ -280,4 +296,77 @@ public class EventQueryNode extends ExternalSet implements ExternalBatchingItera throws QueryEvaluationException { return null; } + + /** + * Builder for {@link EventQueryNode}s. + */ + public static class EventQueryNodeBuilder { + private EventStorage store; + private StatementPattern geoPattern; + private StatementPattern temporalPattern; + private Collection<IndexingExpr> geoFilters; + private Collection<IndexingExpr> temporalFilters; + private Collection<FunctionCall> usedFilters; + + /** + * @param store - The {@link EventStorage} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setStorage(final EventStorage store) { + this.store = store; + return this; + } + + /** + * @param geoPattern - The geo {@link StatementPattern} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setGeoPattern(final StatementPattern geoPattern) { + this.geoPattern = geoPattern; + return this; + } + + /** + * @param temporalPattern - The temporal {@link StatementPattern} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setTemporalPattern(final StatementPattern temporalPattern) { + this.temporalPattern = temporalPattern; + return this; + } + + /** + * @param geoFilters - The geo filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setGeoFilters(final Collection<IndexingExpr> geoFilters) { + this.geoFilters = geoFilters; + return this; + } + + /** + * @param temporalFilters - The temporal filter(s) {@link IndexingExpr} to use in the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setTemporalFilters(final Collection<IndexingExpr> temporalFilters) { + this.temporalFilters = temporalFilters; + return this; + } + + /** + * @param usedFilters - The filter(s) used by the {@link EntityQueryNode} + * @return - The Builder. + */ + public EventQueryNodeBuilder setUsedFilters(final Collection<FunctionCall> usedFilters) { + this.usedFilters = usedFilters; + return this; + } + + /** + * @return The {@link EntityQueryNode} built by the builder. + */ + public EventQueryNode build() { + return new EventQueryNode(store, geoPattern, temporalPattern, geoFilters, temporalFilters, usedFilters); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java index c9f4658..1c62407 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java @@ -26,7 +26,7 @@ import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.geotemporal.model.Event; import org.apache.rya.indexing.geotemporal.storage.EventStorage; import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException; -import org.apache.rya.indexing.mongodb.update.DocumentUpdater; +import org.apache.rya.indexing.mongodb.update.MongoDocumentUpdater; import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Performs update operations over an {@link EventStorage}. */ @DefaultAnnotation(NonNull.class) -public class EventUpdater implements DocumentUpdater<RyaURI, Event>{ +public class EventUpdater implements MongoDocumentUpdater<RyaURI, Event>{ private final EventStorage events; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java index 352dcb6..ab44ffe 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java @@ -63,7 +63,8 @@ import com.vividsolutions.jts.io.WKTReader; import jline.internal.Log; /** - * TODO: doc + * Storage adapter for serializing Geo Temporal statements into mongo objects. + * This includes adapting the {@link IndexingExpr}s for the GeoTemporal indexer. */ public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java index 8ddf075..9c13c8b 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java @@ -31,10 +31,10 @@ import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.IndexingExpr; import org.apache.rya.indexing.entity.model.TypedEntity; import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; +import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException; import org.apache.rya.indexing.geotemporal.model.Event; import org.apache.rya.indexing.geotemporal.storage.EventStorage; -import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; import org.bson.BsonDocument; import org.bson.BsonString; import org.bson.Document; @@ -138,9 +138,8 @@ public class MongoEventStorage implements EventStorage { .iterator(); final List<Event> events = new ArrayList<>(); - final EventDocumentConverter adapter = new EventDocumentConverter(); while(results.hasNext()) { - events.add(adapter.fromDocument(results.next())); + events.add(EVENT_CONVERTER.fromDocument(results.next())); } return events; } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/646d21b4/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java index 1baab18..34df399 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java @@ -206,12 +206,16 @@ public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMon @Override public EventStorage getEventStorage(final Configuration conf) { + requireNonNull(conf); + if(events.get() != null) { return events.get(); } - final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf; + + final MongoDBRdfConfiguration mongoConf = new MongoDBRdfConfiguration(conf); mongoClient = mongoConf.getMongoClient(); + configuration.set(mongoConf); if (mongoClient == null) { mongoClient = MongoConnectorFactory.getMongoClient(conf); }
