RYA-236 Changes to other indexers The GeoTemporal indexer is very closely related to the Entity Indexer. Abstracted out some common areas.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/440a4bfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/440a4bfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/440a4bfd Branch: refs/heads/master Commit: 440a4bfd37407c8e1937776e0ecd0460d646bd8e Parents: 63095d4 Author: isper3at <[email protected]> Authored: Thu Feb 23 14:51:25 2017 -0500 Committer: Aaron Mihalik <[email protected]> Committed: Wed Jun 14 13:28:09 2017 -0400 ---------------------------------------------------------------------- .../apache/rya/indexing/TemporalTupleSet.java | 10 +- .../rya/indexing/accumulo/ConfigUtils.java | 11 +- .../temporal/AccumuloTemporalIndexer.java | 1 - .../indexing/entity/EntityIndexException.java | 3 +- .../indexing/entity/storage/EntityStorage.java | 44 +----- .../storage/mongo/MongoEntityStorage.java | 6 +- .../entity/update/BaseEntityIndexer.java | 9 +- .../indexing/entity/update/EntityUpdater.java | 94 ++++--------- .../indexing/mongodb/AbstractMongoIndexer.java | 30 +++-- .../rya/indexing/mongodb/IndexingException.java | 53 ++++++++ .../TemporalMongoDBStorageStrategy.java | 41 +++--- .../mongodb/update/DocumentUpdater.java | 98 ++++++++++++++ .../mongodb/update/RyaObjectStorage.java | 135 +++++++++++++++++++ .../storage/mongo/MongoEntityStorageIT.java | 16 +-- .../rya/indexing/OptionalConfigUtils.java | 25 +++- .../rya/indexing/accumulo/geo/GeoTupleSet.java | 23 ++-- .../mongodb/geo/GeoMongoDBStorageStrategy.java | 4 +- 17 files changed, 422 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java index 1c5b72c..808afdf 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; import org.joda.time.DateTime; import org.openrdf.model.Statement; import org.openrdf.model.URI; @@ -35,7 +36,6 @@ import com.google.common.collect.Maps; */ import info.aduna.iteration.CloseableIteration; -import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; //Indexing Node for temporal expressions to be inserted into execution plan //to delegate temporal portion of query to temporal index @@ -111,7 +111,7 @@ public class TemporalTupleSet extends ExternalTupleSet { public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException { final URI funcURI = filterInfo.getFunction(); - final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); + final SearchFunction searchFunction = new TemporalSearchFunctionFactory(conf, temporalIndexer).getSearchFunction(funcURI); if(filterInfo.getArguments().length > 1) { throw new IllegalArgumentException("Index functions do not support more than two arguments."); @@ -123,12 +123,14 @@ public class TemporalTupleSet extends ExternalTupleSet { //returns appropriate search function for a given URI //search functions used by TemporalIndexer to query Temporal Index - private class TemporalSearchFunctionFactory { + public static class TemporalSearchFunctionFactory { private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + private final TemporalIndexer temporalIndexer; Configuration conf; - public TemporalSearchFunctionFactory(final Configuration conf) { + public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) { this.conf = conf; + this.temporalIndexer = temporalIndexer; } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java index 41ae9ad..5cc1c44 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -124,6 +124,7 @@ public class ConfigUtils { public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; @@ -427,6 +428,7 @@ public class ConfigUtils { return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false); } + /** * @return The name of the Fluo Application this instance of RYA is using to * incrementally update PCJs. @@ -436,10 +438,12 @@ public class ConfigUtils { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } + public static boolean getUseMongo(final Configuration conf) { return conf.getBoolean(USE_MONGO, false); } + public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { final List<String> indexList = Lists.newArrayList(); @@ -452,6 +456,7 @@ public class ConfigUtils { indexList.add(MongoFreeTextIndexer.class.getName()); useFilterIndex = true; } + if (getUseEntity(conf)) { indexList.add(MongoEntityIndexer.class.getName()); optimizers.add(EntityIndexOptimizer.class.getName()); @@ -462,9 +467,9 @@ public class ConfigUtils { useFilterIndex = true; } } else { - if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { - conf.setPcjOptimizer(PCJOptimizer.class); - } + if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { + conf.setPcjOptimizer(PCJOptimizer.class); + } if (getUsePcjUpdaterIndex(conf)) { indexList.add(PrecomputedJoinIndexer.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index e9d6c30..fcc1c58 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -378,7 +378,6 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } } - /** * statements where the datetime is exactly the same as the queryInstant. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java index 61efc91..1e6abdb 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java @@ -19,11 +19,12 @@ package org.apache.rya.indexing.entity; import org.apache.rya.indexing.entity.model.TypedEntity; +import org.apache.rya.indexing.mongodb.IndexingException; /** * An operation over the {@link TypedEntity} index failed to complete. */ -public class EntityIndexException extends Exception { +public class EntityIndexException extends IndexingException { private static final long serialVersionUID = 1L; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java index 34dbf15..6f0b9ae 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java @@ -22,12 +22,12 @@ import java.util.Optional; import java.util.Set; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.entity.EntityIndexException; import org.apache.rya.indexing.entity.model.Entity; import org.apache.rya.indexing.entity.model.Property; import org.apache.rya.indexing.entity.model.Type; import org.apache.rya.indexing.entity.model.TypedEntity; import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage; import org.calrissian.mango.collect.CloseableIterator; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -37,36 +37,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Stores and provides access to {@link Entity}s. */ @DefaultAnnotation(NonNull.class) -public interface EntityStorage { - - /** - * Creates a new {@link Entity} within the storage. The new Entity's subject must be unique. - * - * @param entity - The {@link Entity} to create. (not null) - * @throws EntityAlreadyExistsException An {@link Entity} could not be created because one already exists for the Subject. - * @throws EntityStorageException A problem occurred while creating the Entity. - */ - public void create(Entity entity) throws EntityAlreadyExistsException, EntityStorageException; - - /** - * Get an {@link Entity} from the storage by its subject. - * - * @param subject - Identifies which {@link Entity} to get. (not null) - * @return The {@link Entity} if one exists for the subject. - * @throws EntityStorageException A problem occurred while fetching the Entity from the storage. - */ - public Optional<Entity> get(RyaURI subject) throws EntityStorageException; - - /** - * Update the state of an {@link Entity}. - * - * @param old - The Entity the changes were applied to. (not null) - * @param updated - The updated Entity to store. (not null) - * @throws StaleUpdateException The {@code old} Entity does not match any Entities that are stored. - * @throws EntityStorageException A problem occurred while updating the Entity within the storage. - */ - public void update(Entity old, Entity updated) throws StaleUpdateException, EntityStorageException; - +public interface EntityStorage extends RyaObjectStorage<Entity> { /** * Search the stored {@link Entity}s that have a specific {@link Type} as * well as the provided {@link Property} values. @@ -80,18 +51,9 @@ public interface EntityStorage { public ConvertingCursor<TypedEntity> search(final Optional<RyaURI> subject, Type type, Set<Property> properties) throws EntityStorageException; /** - * Deletes an {@link Entity} from the storage. - * - * @param subject -Identifies which {@link Entity} to delete. (not null) - * @return {@code true} if something was deleted; otherwise {@code false}. - * @throws EntityStorageException A problem occurred while deleting from the storage. - */ - public boolean delete(RyaURI subject) throws EntityStorageException; - - /** * Indicates a problem while interacting with an {@link EntityStorage}. */ - public static class EntityStorageException extends EntityIndexException { + public static class EntityStorageException extends ObjectStorageException { private static final long serialVersionUID = 1L; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java index 1b4681d..a71d673 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java @@ -55,19 +55,19 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public class MongoEntityStorage implements EntityStorage { - private static final String COLLECTION_NAME = "entity-entities"; + protected static final String COLLECTION_NAME = "entity-entities"; private static final EntityDocumentConverter ENTITY_CONVERTER = new EntityDocumentConverter(); /** * A client connected to the Mongo instance that hosts the Rya instance. */ - private final MongoClient mongo; + protected final MongoClient mongo; /** * The name of the Rya instance the {@link TypedEntity}s are for. */ - private final String ryaInstanceName; + protected final String ryaInstanceName; /** * Constructs an instance of {@link MongoEntityStorage}. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java index 7da9918..84b0bdc 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java @@ -40,10 +40,10 @@ import org.apache.rya.indexing.entity.model.Entity; import org.apache.rya.indexing.entity.model.Property; import org.apache.rya.indexing.entity.model.Type; import org.apache.rya.indexing.entity.storage.EntityStorage; -import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; import org.apache.rya.indexing.entity.storage.TypeStorage; import org.apache.rya.indexing.entity.storage.TypeStorage.TypeStorageException; import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor; +import org.apache.rya.indexing.mongodb.IndexingException; import org.apache.rya.mongodb.MongoDBRdfConfiguration; import org.apache.rya.mongodb.MongoSecondaryIndex; import org.openrdf.model.URI; @@ -98,7 +98,7 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary for(final Entry<RyaURI, List<RyaStatement>> entry : groupedBySubject.entrySet()) { try { updateEntity(entry.getKey(), entry.getValue()); - } catch (final EntityStorageException e) { + } catch (final IndexingException e) { throw new IOException("Failed to update the Entity index.", e); } } @@ -109,8 +109,9 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary * * @param subject - The Subject of the {@link Entity} the statements are for. (not null) * @param statements - Statements that the {@link Entity} will be updated with. (not null) + * @throws IndexingException */ - private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws EntityStorageException { + private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws IndexingException { requireNonNull(subject); requireNonNull(statements); @@ -216,7 +217,7 @@ public abstract class BaseEntityIndexer implements EntityIndexer, MongoSecondary return Optional.of( updated.build() ); }); - } catch (final EntityStorageException e) { + } catch (final IndexingException e) { throw new IOException("Failed to update the Entity index.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java index fb5e957..2edbe37 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java @@ -21,14 +21,13 @@ package org.apache.rya.indexing.entity.update; import static java.util.Objects.requireNonNull; import java.util.Optional; -import java.util.function.Function; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.indexing.entity.model.Entity; import org.apache.rya.indexing.entity.storage.EntityStorage; -import org.apache.rya.indexing.entity.storage.EntityStorage.EntityAlreadyExistsException; import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; -import org.apache.rya.indexing.entity.storage.EntityStorage.StaleUpdateException; +import org.apache.rya.indexing.mongodb.update.DocumentUpdater; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -37,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Performs update operations over an {@link EntityStorage}. */ @DefaultAnnotation(NonNull.class) -public class EntityUpdater { +public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{ private final EntityStorage storage; @@ -50,73 +49,30 @@ public class EntityUpdater { this.storage = requireNonNull(storage); } - /** - * Tries to updates the state of an {@link Entity} until the update succeeds - * or a non-recoverable exception is thrown. - * - * @param subject - The Subject of the {@link Entity} that will be updated. (not null) - * @param mutator - Performs the mutation on the old state of the Entity and returns - * the new state of the Entity. (not null) - * @throws EntityStorageException A non-recoverable error has caused the update to fail. - */ - public void update(final RyaURI subject, final EntityMutator mutator) throws EntityStorageException { - requireNonNull(subject); - requireNonNull(mutator); - - // Fetch the current state of the Entity. - boolean completed = false; - while(!completed) { - try { - final Optional<Entity> old = storage.get(subject); - final Optional<Entity> updated = mutator.apply(old); + @Override + public void create(final Entity newObj) throws EntityStorageException { + try { + storage.create(newObj); + } catch (final ObjectStorageException e) { + throw new EntityStorageException(e.getMessage(), e); + } + } - final boolean doWork = updated.isPresent(); - if(doWork) { - if(!old.isPresent()) { - storage.create(updated.get()); - } else { - storage.update(old.get(), updated.get()); - } - } - completed = true; - } catch(final EntityAlreadyExistsException | StaleUpdateException e) { - // These are recoverable exceptions. Try again. - } catch(final RuntimeException e) { - throw new EntityStorageException("Failed to update Entity with Subject '" + subject.getData() + "'.", e); - } + @Override + public void update(final Entity old, final Entity updated) throws EntityStorageException { + try { + storage.update(old, updated); + } catch (final ObjectStorageException e) { + throw new EntityStorageException(e.getMessage(), e); } } - /** - * Implementations of this interface are used to update the state of an - * {@link Entity} in unison with a {@link EntityUpdater}. - * </p> - * This table describes what the updater will do depending on if an Entity - * exists and if an updated Entity is returned. - * </p> - * <table border="1px"> - * <tr><th>Entity Provided</th><th>Update Returned</th><th>Effect</th></tr> - * <tr> - * <td>true</td> - * <td>true</td> - * <td>The old Entity will be updated using the returned state.</td> - * </tr> - * <tr> - * <td>true</td> - * <td>false</td> - * <td>No work is performed.</td> - * </tr> - * <tr> - * <td>false</td> - * <td>true</td> - * <td>A new Entity will be created using the returned state.</td> - * </tr> - * <tr> - * <td>false</td> - * <td>false</td> - * <td>No work is performed.</td> - * </tr> - * </table> - */ - public interface EntityMutator extends Function<Optional<Entity>, Optional<Entity>> { } + @Override + public Optional<Entity> getOld(final RyaURI key) throws EntityStorageException { + try { + return storage.get(key); + } catch (final ObjectStorageException e) { + throw new EntityStorageException(e.getMessage(), e); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java index 56070b7..2428e28 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java @@ -25,11 +25,20 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.mongodb.MongoConnectorFactory; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.MongoSecondaryIndex; import org.openrdf.model.Literal; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.query.QueryEvaluationException; +import com.google.common.annotations.VisibleForTesting; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -37,16 +46,9 @@ import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.QueryBuilder; import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; import info.aduna.iteration.CloseableIteration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.resolver.RyaToRdfConversions; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.mongodb.MongoConnectorFactory; -import org.apache.rya.mongodb.MongoDBRdfConfiguration; -import org.apache.rya.mongodb.MongoDBRyaDAO; -import org.apache.rya.mongodb.MongoSecondaryIndex; /** * Secondary Indexer using MondoDB @@ -71,15 +73,16 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat db = this.mongoClient.getDB(dbName); collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName()); } - + @Override - public void setClient(MongoClient client){ + public void setClient(final MongoClient client){ this.mongoClient = client; } - // TODO this method is only intended to be used in testing + @VisibleForTesting public void initIndexer(final Configuration conf, final MongoClient client) { - ServerAddress address = client.getAddress(); + setClient(client); + final ServerAddress address = client.getAddress(); conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE, address.getHost()); conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(address.getPort())); setConf(conf); @@ -144,8 +147,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat if (isValidPredicate && (statement.getObject() instanceof Literal)) { final DBObject obj = storageStrategy.serialize(ryaStatement); if (obj != null) { - final DBObject query = storageStrategy.serialize(ryaStatement); - collection.update(query, obj, true, false); + collection.insert(obj, WriteConcern.ACKNOWLEDGED); } } } catch (final IllegalArgumentException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java new file mode 100644 index 0000000..7029b45 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb; + +/** + * An indexing operation over mongoDB failed to complete. + */ +public class IndexingException extends Exception { + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public IndexingException(final String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause. <p>Note that the detail message associated with + * {@code cause} is <i>not</i> automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A <tt>null</tt> value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public IndexingException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java index eefcfb1..6beb6f1 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java @@ -21,20 +21,21 @@ package org.apache.rya.indexing.mongodb.temporal; import java.util.regex.Matcher; -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; - import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.TemporalInstantRfc3339; import org.apache.rya.indexing.TemporalInterval; import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; +import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + /** * Defines how time based intervals/instants are stored in MongoDB. * <p> * Time can be stored as the following: - * <p> + * <p>l * <li><b>instant</b> {[statement], instant: TIME}</li> * <li><b>interval</b> {[statement], start: TIME, end: TIME}</li> * @see {@link TemporalInstantRfc3339} for how the dates are formatted. @@ -53,16 +54,24 @@ public class TemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrate @Override public DBObject serialize(final RyaStatement ryaStatement) { - final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); - final String objString = ryaStatement.getObject().getData(); - final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(objString); - if(match.find()) { - final TemporalInterval date = TemporalInstantRfc3339.parseInterval(ryaStatement.getObject().getData()); - base.append(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate()); - base.append(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate()); - } else { - base.append(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(objString).toDate()); - } - return base; + final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); + final DBObject time = getTimeValue(ryaStatement.getObject().getData()); + base.putAll(time.toMap()); + return base; + } + + public DBObject getTimeValue(final String timeData) { + final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(timeData); + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); + if(match.find()) { + final TemporalInterval date = TemporalInstantRfc3339.parseInterval(timeData); + builder.add(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate()); + builder.add(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate()); + } else { + builder.add(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(timeData).toDate()); + } + return builder.get(); } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java new file mode 100644 index 0000000..0b9db13 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.update; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.function.Function; + +import org.apache.rya.indexing.mongodb.IndexingException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Performs an update operation on a Document in mongodb. + * @param <T> - The key to find the object. + * @param <V> - The type of object to get updated. + */ +@DefaultAnnotation(NonNull.class) +public interface DocumentUpdater<T, V> { + public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException { + requireNonNull(mutator); + + // Fetch the current state of the Entity. + boolean completed = false; + while(!completed) { + //this cast is safe since the mutator interface is defined below to use Optional<V> + final Optional<V> old = getOld(key); + final Optional<V> updated = mutator.apply(old); + + final boolean doWork = updated.isPresent(); + if(doWork) { + if(!old.isPresent()) { + create(updated.get()); + } else { + update(old.get(), updated.get()); + } + } + completed = true; + } + } + + Optional<V> getOld(T key) throws IndexingException; + + void create(final V newObj) throws IndexingException; + + void update(final V old, final V updated) throws IndexingException; + + /** + * Implementations of this interface are used to update the state of a + * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}. + * </p> + * This table describes what the updater will do depending on if the object + * exists and if an updated object is returned. + * </p> + * <table border="1px"> + * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr> + * <tr> + * <td>true</td> + * <td>true</td> + * <td>The old Object will be updated using the returned state.</td> + * </tr> + * <tr> + * <td>true</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>true</td> + * <td>A new Object will be created using the returned state.</td> + * </tr> + * <tr> + * <td>false</td> + * <td>false</td> + * <td>No work is performed.</td> + * </tr> + * </table> + */ + public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java new file mode 100644 index 0000000..10feb0d --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.update; + +import java.util.Optional; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.mongodb.IndexingException; + +/** + * Stores and provides access to objects of type T. + * @param <T> - The type of object to store/access. + */ +public interface RyaObjectStorage<T> { + + /** + * Creates a new {@link RyaObjectStorage#T} within the storage. The new object's subject must be unique. + * + * @param obj - The {@link RyaObjectStorage#T} to create. (not null) + * @throws ObjectAlreadyExistsException An Object could not be created because one already exists for the Subject. + * @throws ObjectStorageException A problem occurred while creating the Object. + */ + public void create(T doc) throws ObjectAlreadyExistsException, ObjectStorageException; + + /** + * Get an Object from the storage by its subject. + * + * @param subject - Identifies which Object to get. (not null) + * @return The Object if one exists for the subject. + * @throws ObjectStorageException A problem occurred while fetching the Object from the storage. + */ + public Optional<T> get(RyaURI subject) throws ObjectStorageException; + + /** + * Update the state of an {@link RyaObjectStorage#T}. + * + * @param old - The Object the changes were applied to. (not null) + * @param updated - The updated Object to store. (not null) + * @throws StaleUpdateException The {@code old} Object does not match any that are stored. + * @throws ObjectStorageException A problem occurred while updating the Object within the storage. + */ + public void update(T old, T updated) throws StaleUpdateException, ObjectStorageException; + + /** + * Deletes an {@link RyaObjectStorage#T} from the storage. + * + * @param subject -Identifies which {@link RyaObjectStorage#T} to delete. (not null) + * @return {@code true} if something was deleted; otherwise {@code false}. + * @throws ObjectStorageException A problem occurred while deleting from the storage. + */ + public boolean delete(RyaURI subject) throws ObjectStorageException; + + /** + * Indicates a problem while interacting with an {@link RyaObjectStorage}. + */ + public static class ObjectStorageException extends IndexingException { + private static final long serialVersionUID = 1L; + + /** + * Constructs a new exception with the specified detail message. The + * cause is not initialized, and may subsequently be initialized by + * a call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public ObjectStorageException(final String message) { + super(message); + } + + /** + * Constructs a new exception with the specified detail message and + * cause. <p>Note that the detail message associated with + * {@code cause} is <i>not</i> automatically incorporated in + * this exception's detail message. + * + * @param message the detail message (which is saved for later retrieval + * by the {@link #getMessage()} method). + * @param cause the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A <tt>null</tt> value is + * permitted, and indicates that the cause is nonexistent or + * unknown.) + */ + public ObjectStorageException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * An {@link RyaObjectStorage#T} could not be created because one already exists for the Subject. + */ + public static class ObjectAlreadyExistsException extends ObjectStorageException { + private static final long serialVersionUID = 1L; + + public ObjectAlreadyExistsException(final String message) { + super(message); + } + + public ObjectAlreadyExistsException(final String message, final Throwable cause) { + super(message, cause); + } + } + + /** + * An object could not be updated because the old state does not + * match the current state. + */ + public static class StaleUpdateException extends ObjectStorageException { + private static final long serialVersionUID = 1L; + + public StaleUpdateException(final String message) { + super(message); + } + + public StaleUpdateException(final String message, final Throwable cause) { + super(message, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java index d271ba0..5d26bc0 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java @@ -50,7 +50,7 @@ public class MongoEntityStorageIT extends MongoITBase { private static final String RYA_INSTANCE_NAME = "testInstance"; @Test - public void create_and_get() throws EntityStorageException { + public void create_and_get() throws Exception { // An Entity that will be stored. final Entity entity = Entity.builder() .setSubject(new RyaURI("urn:GTIN-14/00012345600012")) @@ -71,7 +71,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test - public void can_not_create_with_same_subject() throws EntityStorageException { + public void can_not_create_with_same_subject() throws Exception { // A Type that will be stored. final Entity entity = Entity.builder() .setSubject(new RyaURI("urn:GTIN-14/00012345600012")) @@ -95,7 +95,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test - public void get_noneExisting() throws EntityStorageException { + public void get_noneExisting() throws Exception { // Get a Type that hasn't been created. final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME); final Optional<Entity> storedEntity = storage.get(new RyaURI("urn:GTIN-14/00012345600012")); @@ -105,7 +105,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test - public void delete() throws EntityStorageException { + public void delete() throws Exception { // An Entity that will be stored. final Entity entity = Entity.builder() .setSubject(new RyaURI("urn:GTIN-14/00012345600012")) @@ -126,7 +126,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test - public void delete_nonExisting() throws EntityStorageException { + public void delete_nonExisting() throws Exception { // Delete an Entity that has not been created. final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME); final boolean deleted = storage.delete( new RyaURI("urn:GTIN-14/00012345600012") ); @@ -305,7 +305,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test - public void update() throws EntityStorageException { + public void update() throws Exception { final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME); // Store Alice in the repository. @@ -338,7 +338,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test(expected = StaleUpdateException.class) - public void update_stale() throws EntityStorageException { + public void update_stale() throws Exception { final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME); // Store Alice in the repository. @@ -370,7 +370,7 @@ public class MongoEntityStorageIT extends MongoITBase { } @Test(expected = EntityStorageException.class) - public void update_differentSubjects() throws StaleUpdateException, EntityStorageException { + public void update_differentSubjects() throws Exception { // Two objects that do not have the same Subjects. final Entity old = Entity.builder() .setSubject( new RyaURI("urn:SSN/111-11-1111") ) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java index dd6ea40..8d4486f 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java @@ -28,6 +28,8 @@ import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.instance.RyaDetails; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import org.apache.rya.indexing.geotemporal.GeoTemporalOptimizer; +import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer; import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer; import org.openrdf.model.URI; @@ -46,6 +48,7 @@ public class OptionalConfigUtils extends ConfigUtils { public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions"; public static final String USE_GEO = "sc.use_geo"; + public static final String USE_GEOTEMPORAL = "sc.use_geotemporal"; public static final String USE_FREETEXT = "sc.use_freetext"; public static final String USE_TEMPORAL = "sc.use_temporal"; public static final String USE_ENTITY = "sc.use_entity"; @@ -67,6 +70,10 @@ public class OptionalConfigUtils extends ConfigUtils { return conf.getBoolean(USE_GEO, false); } + public static boolean getUseGeoTemporal(final Configuration conf) { + return conf.getBoolean(USE_GEOTEMPORAL, false); + } + /** * Retrieves the value for the geo indexer type from the config. * @param conf the {@link Configuration}. @@ -83,11 +90,14 @@ public class OptionalConfigUtils extends ConfigUtils { boolean useFilterIndex = false; ConfigUtils.setIndexers(conf); - for (final String index : conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS)){ - indexList.add(index); - } - for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){ - optimizers.add(optimizer); + final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS); + if(existingIndexers != null ) { + for (final String index : existingIndexers) { + indexList.add(index); + } + for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){ + optimizers.add(optimizer); + } } final GeoIndexerType geoIndexerType = getGeoIndexerType(conf); @@ -102,6 +112,11 @@ public class OptionalConfigUtils extends ConfigUtils { } useFilterIndex = true; } + + if (getUseGeoTemporal(conf)) { + indexList.add(MongoGeoTemporalIndexer.class.getName()); + optimizers.add(GeoTemporalOptimizer.class.getName()); + } } else { if (getUseGeo(conf)) { if (geoIndexerType == null) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java index f77e726..d00b849 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java @@ -4,6 +4,13 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.IteratorFactory; +import org.apache.rya.indexing.SearchFunction; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.query.BindingSet; @@ -36,13 +43,6 @@ import com.vividsolutions.jts.io.WKTReader; import info.aduna.iteration.CloseableIteration; -import org.apache.rya.indexing.GeoConstants; -import org.apache.rya.indexing.GeoIndexer; -import org.apache.rya.indexing.IndexingExpr; -import org.apache.rya.indexing.IteratorFactory; -import org.apache.rya.indexing.SearchFunction; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; //Indexing Node for geo expressions to be inserted into execution plan //to delegate geo portion of query to geo index @@ -116,7 +116,7 @@ public class GeoTupleSet extends ExternalTupleSet { final URI funcURI = filterInfo.getFunction(); - final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf).getSearchFunction(funcURI); + final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf, geoIndexer).getSearchFunction(funcURI); if(filterInfo.getArguments().length > 1) { throw new IllegalArgumentException("Index functions do not support more than two arguments."); } @@ -130,14 +130,17 @@ public class GeoTupleSet extends ExternalTupleSet { //returns appropriate search function for a given URI //search functions used in GeoMesaGeoIndexer to access index - public class GeoSearchFunctionFactory { + public static class GeoSearchFunctionFactory { Configuration conf; private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - public GeoSearchFunctionFactory(final Configuration conf) { + private final GeoIndexer geoIndexer; + + public GeoSearchFunctionFactory(final Configuration conf, final GeoIndexer geoIndexer) { this.conf = conf; + this.geoIndexer = geoIndexer; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/440a4bfd/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java index 7069d73..8b2ebc3 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java @@ -65,7 +65,7 @@ public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { public abstract String getKeyword(); } - static class GeoQuery { + public static class GeoQuery { private final GeoQueryType queryType; private final Geometry geo; @@ -140,7 +140,7 @@ public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { } } - private List<double[]> getCorrespondingPoints(final Geometry geo){ + public List<double[]> getCorrespondingPoints(final Geometry geo){ final List<double[]> points = new ArrayList<double[]>(); for (final Coordinate coord : geo.getCoordinates()){ points.add(new double[] {
