http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java new file mode 100644 index 0000000..9c13c8b --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.entity.model.TypedEntity; +import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; +import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBObject; +import com.mongodb.ErrorCategory; +import com.mongodb.MongoClient; +import com.mongodb.MongoException; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; + +public class MongoEventStorage implements EventStorage { + + protected static final String COLLECTION_NAME = "geotemporal-events"; + + private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter(); + + /** + * A client connected to the Mongo instance that hosts the Rya instance. + */ + protected final MongoClient mongo; + + /** + * The name of the Rya instance the {@link TypedEntity}s are for. + */ + protected final String ryaInstanceName; + + /* + * Used to get the filter query objects. + */ + private final GeoTemporalMongoDBStorageStrategy queryAdapter; + + /** + * Constructs an instance of {@link MongoEntityStorage}. + * + * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null) + * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null) + */ + public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) { + this.mongo = requireNonNull(mongo); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + queryAdapter = new GeoTemporalMongoDBStorageStrategy(); + } + + @Override + public void create(final Event event) throws EventStorageException { + requireNonNull(event); + + try { + mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .insertOne(EVENT_CONVERTER.toDocument(event)); + } catch(final MongoException e) { + final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() ); + if(category == ErrorCategory.DUPLICATE_KEY) { + throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e); + } + throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e); + } + } + + @Override + public Optional<Event> get(final RyaURI subject) throws EventStorageException { + requireNonNull(subject); + + try { + final Document document = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) ) + .first(); + + return document == null ? + Optional.empty() : + Optional.of( EVENT_CONVERTER.fromDocument(document) ); + + } catch(final MongoException | DocumentConverterException e) { + throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e); + } + } + + @Override + public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException { + requireNonNull(subject); + + try { + final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>()); + final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>()); + final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder + .start(filterObj.toMap()); + if(subject.isPresent()) { + builder.append(EventDocumentConverter.SUBJECT, subject.get().getData()); + } + final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( BsonDocument.parse(builder.get().toString()) ) + .iterator(); + + final List<Event> events = new ArrayList<>(); + while(results.hasNext()) { + events.add(EVENT_CONVERTER.fromDocument(results.next())); + } + return events; + } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) { + throw new EventStorageException("Could not get the Event.", e); + } + } + + @Override + public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException { + requireNonNull(old); + requireNonNull(updated); + + // The updated entity must have the same Subject as the one it is replacing. + if(!old.getSubject().equals(updated.getSubject())) { + throw new EventStorageException("The old Event and the updated Event must have the same Subject. " + + "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData()); + } + + final Set<Bson> filters = new HashSet<>(); + + // Must match the old entity's Subject. + filters.add( makeSubjectFilter(old.getSubject()) ); + + // Do a find and replace. + final Bson oldEntityFilter = Filters.and(filters); + final Document updatedDoc = EVENT_CONVERTER.toDocument(updated); + + final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME); + if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) { + throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + "."); + } + } + + @Override + public boolean delete(final RyaURI subject) throws EventStorageException { + requireNonNull(subject); + + try { + final Document deleted = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .findOneAndDelete( makeSubjectFilter(subject) ); + + return deleted != null; + + } catch(final MongoException e) { + throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e); + } + } + + private static Bson makeSubjectFilter(final RyaURI subject) { + return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java new file mode 100644 index 0000000..2561c23 --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.indexing.mongodb.AbstractMongoIndexer; +import org.apache.rya.indexing.mongodb.IndexingException; +import org.apache.rya.indexing.mongodb.geo.GmlParser; +import org.apache.rya.mongodb.MongoConnectorFactory; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.joda.time.DateTime; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +/** + * Indexer that stores 2 separate statements as one 'Event' entity. + * <p> + * The statements are required to have the same subject, one must be + * a Geo based statement and the other a temporal based statement. + * <p> + * This indexer is later used when querying for geo/temporal statements + * in the format of: + * <pre> + * {@code + * QUERY PARAMS + * ?SomeSubject geo:predicate ?Location + * ?SomeSubject time:predicate ?Time + * Filter(?Location, geoFunction()) + * Filter(?Time, temporalFunction()) + * } + * + * The number of filters is not strict, but there must be at least one + * query pattern for geo and one for temporal as well as at least one + * filter for each type. + */ +public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer { + private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class); + public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal"; + + private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>(); + private final AtomicReference<EventStorage> events = new AtomicReference<>(); + + @Override + public void init() { + initCore(); + predicates = ConfigUtils.getGeoPredicates(conf); + predicates.addAll(ConfigUtils.getTemporalPredicates(conf)); + storageStrategy = new GeoTemporalMongoDBStorageStrategy(); + } + + @Override + public void setConf(final Configuration conf) { + requireNonNull(conf); + events.set(null); + events.set(getEventStorage(conf)); + super.conf = conf; + configuration.set(new MongoDBRdfConfiguration(conf)); + } + + @Override + public void storeStatement(final RyaStatement ryaStatement) throws IOException { + requireNonNull(ryaStatement); + + try { + updateEvent(ryaStatement.getSubject(), ryaStatement); + } catch (IndexingException | ParseException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + requireNonNull(statement); + final RyaURI subject = statement.getSubject(); + try { + final EventStorage eventStore = events.get(); + checkState(events != null, "Must set this indexers configuration before storing statements."); + + new EventUpdater(eventStore).update(subject, old -> { + final Event.Builder updated; + if(!old.isPresent()) { + return Optional.empty(); + } else { + updated = Event.builder(old.get()); + } + + final Event currentEvent = updated.build(); + final URI pred = statement.getObject().getDataType(); + if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) || + pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) + && currentEvent.getGeometry().isPresent()) { + //is geo and needs to be removed. + try { + if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement), new GmlParser()))) { + updated.setGeometry(null); + } + } catch (final Exception e) { + LOG.debug("Unable to parse the stored geometry."); + } + } else { + //is time + final String dateTime = statement.getObject().getData(); + final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime); + if (matcher.find()) { + final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime); + if(currentEvent.getInterval().get().equals(interval)) { + updated.setTemporalInterval(null); + } + } else { + final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime)); + if(currentEvent.getInstant().get().equals(instant)) { + updated.setTemporalInstant(null); + } + } + } + return Optional.of(updated.build()); + }); + } catch (final IndexingException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + + private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException { + final EventStorage eventStore = events.get(); + checkState(events != null, "Must set this indexers configuration before storing statements."); + + new EventUpdater(eventStore).update(subject, old -> { + final Event.Builder updated; + if(!old.isPresent()) { + updated = Event.builder() + .setSubject(subject); + } else { + updated = Event.builder(old.get()); + } + + final URI pred = statement.getObject().getDataType(); + if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) || + pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) { + //is geo + try { + final Statement geoStatement = RyaToRdfConversions.convertStatement(statement); + final Geometry geometry = GeoParseUtils.getGeometry(geoStatement, new GmlParser()); + updated.setGeometry(geometry); + } catch (final ParseException e) { + LOG.error(e.getMessage(), e); + } + } else { + //is time + final String dateTime = statement.getObject().getData(); + final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime); + if (matcher.find()) { + final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime); + updated.setTemporalInterval(interval); + } else { + final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime)); + updated.setTemporalInstant(instant); + } + } + return Optional.of(updated.build()); + }); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getTablePrefix(conf) + GEO_TEMPORAL_COLLECTION; + } + + @Override + public EventStorage getEventStorage(final Configuration conf) { + requireNonNull(conf); + + if(events.get() != null) { + return events.get(); + } + + + final MongoDBRdfConfiguration mongoConf = new MongoDBRdfConfiguration(conf); + mongoClient = mongoConf.getMongoClient(); + configuration.set(mongoConf); + if (mongoClient == null) { + mongoClient = MongoConnectorFactory.getMongoClient(conf); + } + final String ryaInstanceName = mongoConf.getMongoDBName(); + events.set(new MongoEventStorage(mongoClient, ryaInstanceName)); + return events.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java new file mode 100644 index 0000000..634359f --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java @@ -0,0 +1,247 @@ +package org.apache.rya.indexing.mongodb.geo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; +import org.bson.Document; +import org.openrdf.model.Statement; +import org.openrdf.query.MalformedQueryException; + +import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.Point; +import com.vividsolutions.jts.geom.Polygon; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { + private static final Logger LOG = Logger.getLogger(GeoMongoDBStorageStrategy.class); + + private static final String GEO = "location"; + public enum GeoQueryType { + INTERSECTS { + @Override + public String getKeyword() { + return "$geoIntersects"; + } + }, WITHIN { + @Override + public String getKeyword() { + return "$geoWithin"; + } + }, EQUALS { + @Override + public String getKeyword() { + return "$near"; + } + }, NEAR { + @Override + public String getKeyword() { + return "$near"; + } + }; + + public abstract String getKeyword(); + } + + public static class GeoQuery { + private final GeoQueryType queryType; + private final Geometry geo; + + private final Double maxDistance; + private final Double minDistance; + + public GeoQuery(final GeoQueryType queryType, final Geometry geo) { + this(queryType, geo, 0, 0); + } + + public GeoQuery(final GeoQueryType queryType, final Geometry geo, final double maxDistance, + final double minDistance) { + this.queryType = queryType; + this.geo = geo; + this.maxDistance = maxDistance; + this.minDistance = minDistance; + } + + public GeoQueryType getQueryType() { + return queryType; + } + + public Geometry getGeo() { + return geo; + } + + public Double getMaxDistance() { + return maxDistance; + } + + public Double getMinDistance() { + return minDistance; + } + } + + private final Double maxDistance; + + public GeoMongoDBStorageStrategy(final Double maxDistance) { + this.maxDistance = maxDistance; + } + + @Override + public void createIndices(final DBCollection coll){ + coll.createIndex(new BasicDBObject(GEO, "2dsphere")); + } + + public DBObject getQuery(final GeoQuery queryObj) throws MalformedQueryException { + final Geometry geo = queryObj.getGeo(); + final GeoQueryType queryType = queryObj.getQueryType(); + if (queryType == GeoQueryType.WITHIN && !(geo instanceof Polygon)) { + //They can also be applied to MultiPolygons, but those are not supported either. + throw new MalformedQueryException("Mongo Within operations can only be performed on Polygons."); + } else if(queryType == GeoQueryType.NEAR && !(geo instanceof Point)) { + //They can also be applied to Point, but those are not supported either. + throw new MalformedQueryException("Mongo near operations can only be performed on Points."); + } + + BasicDBObject query; + if (queryType.equals(GeoQueryType.EQUALS)){ + if(geo.getNumPoints() == 1) { + final List circle = new ArrayList(); + circle.add(getPoint(geo)); + circle.add(maxDistance); + final BasicDBObject polygon = new BasicDBObject("$centerSphere", circle); + query = new BasicDBObject(GEO, new BasicDBObject(GeoQueryType.WITHIN.getKeyword(), polygon)); + } else { + query = new BasicDBObject(GEO, getCorrespondingPoints(geo)); + } + } else if(queryType.equals(GeoQueryType.NEAR)) { + final BasicDBObject geoDoc = new BasicDBObject("$geometry", getDBPoint(geo)); + if(queryObj.getMaxDistance() != 0) { + geoDoc.append("$maxDistance", queryObj.getMaxDistance()); + } + + if(queryObj.getMinDistance() != 0) { + geoDoc.append("$minDistance", queryObj.getMinDistance()); + } + query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), geoDoc)); + } else { + final BasicDBObject geoDoc = new BasicDBObject("$geometry", getCorrespondingPoints(geo)); + query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), geoDoc)); + } + + return query; + } + + @Override + public DBObject serialize(final RyaStatement ryaStatement) { + // if the object is wkt, then try to index it + // write the statement data to the fields + try { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + final Geometry geo = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); + if(geo == null) { + LOG.error("Failed to parse geo statement: " + statement.toString()); + return null; + } + final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); + if (geo.getNumPoints() > 1) { + base.append(GEO, getCorrespondingPoints(geo)); + } else { + base.append(GEO, getDBPoint(geo)); + } + return base; + } catch(final ParseException e) { + LOG.error("Could not create geometry for statement " + ryaStatement, e); + return null; + } + } + + public Document getCorrespondingPoints(final Geometry geo) { + //Polygons must be a 3 dimensional array. + + //polygons must be a closed loop + final Document geoDoc = new Document(); + if (geo instanceof Polygon) { + final Polygon poly = (Polygon) geo; + final List<List<List<Double>>> DBpoints = new ArrayList<>(); + + // outer shell of the polygon + final List<List<Double>> ring = new ArrayList<>(); + for (final Coordinate coord : poly.getExteriorRing().getCoordinates()) { + ring.add(getPoint(coord)); + } + DBpoints.add(ring); + + // each hold in the polygon + for (int ii = 0; ii < poly.getNumInteriorRing(); ii++) { + final List<List<Double>> holeCoords = new ArrayList<>(); + for (final Coordinate coord : poly.getInteriorRingN(ii).getCoordinates()) { + holeCoords.add(getPoint(coord)); + } + DBpoints.add(holeCoords); + } + geoDoc.append("coordinates", DBpoints) + .append("type", "Polygon"); + } else { + final List<List<Double>> points = getPoints(geo); + geoDoc.append("coordinates", points) + .append("type", "LineString"); + } + return geoDoc; + } + + private List<List<Double>> getPoints(final Geometry geo) { + final List<List<Double>> points = new ArrayList<>(); + for (final Coordinate coord : geo.getCoordinates()) { + points.add(getPoint(coord)); + } + return points; + } + + public Document getDBPoint(final Geometry geo) { + return new Document() + .append("coordinates", getPoint(geo)) + .append("type", "Point"); + } + + private List<Double> getPoint(final Coordinate coord) { + final List<Double> point = new ArrayList<>(); + point.add(coord.x); + point.add(coord.y); + return point; + } + + private List<Double> getPoint(final Geometry geo) { + final List<Double> point = new ArrayList<>(); + point.add(geo.getCoordinate().x); + point.add(geo.getCoordinate().y); + return point; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java new file mode 100644 index 0000000..be5f1bc --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/GmlParser.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.geo; + +import java.io.IOException; +import java.io.Reader; + +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser; +import org.geotools.gml3.GMLConfiguration; +import org.xml.sax.SAXException; + +import com.vividsolutions.jts.geom.Geometry; + + +/** + * This wraps geotools parser for rya.geoCommon that cannot be dependent on geotools. + * + */ +public class GmlParser implements GmlToGeometryParser { + + /* (non-Javadoc) + * @see org.apache.rya.indexing.accumulo.geo.GeoParseUtils.GmlToGeometryParser#parse(java.io.Reader) + */ + @Override + public Geometry parse(Reader reader) throws IOException, SAXException, ParserConfigurationException { + final org.geotools.xml.Parser gmlParser = new org.geotools.xml.Parser(new GMLConfiguration()); + return (Geometry) gmlParser.parse(reader); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java new file mode 100644 index 0000000..2abee76 --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoIndexer.java @@ -0,0 +1,154 @@ +package org.apache.rya.indexing.mongodb.geo; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS; +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS; +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.NEAR; +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet.GeoSearchFunctionFactory.NearQuery; +import org.apache.rya.indexing.mongodb.AbstractMongoIndexer; +import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.openrdf.model.Statement; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; + +import com.mongodb.DBObject; +import com.vividsolutions.jts.geom.Geometry; + +import info.aduna.iteration.CloseableIteration; + +public class MongoGeoIndexer extends AbstractMongoIndexer<GeoMongoDBStorageStrategy> implements GeoIndexer { + private static final String COLLECTION_SUFFIX = "geo"; + private static final Logger logger = Logger.getLogger(MongoGeoIndexer.class); + + @Override + public void init() { + initCore(); + predicates = ConfigUtils.getGeoPredicates(conf); + if(predicates.size() == 0) { + logger.debug("No predicates specified for geo indexing. During insertion, all statements will be attempted to be indexed into the geo indexer."); + } + storageStrategy = new GeoMongoDBStorageStrategy(Double.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_GEO_MAXDISTANCE, "1e-10"))); + storageStrategy.createIndices(collection); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals( + final Geometry query, final StatementConstraints constraints) { + try { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(EQUALS, query)); + return withConstraints(constraints, queryObj); + } catch (final MalformedQueryException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Disjoint queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects( + final Geometry query, final StatementConstraints constraints) { + try { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(INTERSECTS, query)); + return withConstraints(constraints, queryObj); + } catch (final MalformedQueryException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Touches queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Crosses queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin( + final Geometry query, final StatementConstraints constraints) { + try { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(WITHIN, query)); + return withConstraints(constraints, queryObj); + } catch (final MalformedQueryException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryNear(final NearQuery query, final StatementConstraints constraints) { + double maxDistance = 0; + double minDistance = 0; + if (query.getMaxDistance().isPresent()) { + maxDistance = query.getMaxDistance().get(); + } + + if (query.getMinDistance().isPresent()) { + minDistance = query.getMinDistance().get(); + } + + try { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(NEAR, query.getGeometry(), maxDistance, minDistance)); + return withConstraints(constraints, queryObj); + } catch (final MalformedQueryException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Contains queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Overlaps queries are not supported in Mongo DB."); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getTablePrefix(conf) + COLLECTION_SUFFIX; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java new file mode 100644 index 0000000..c564d02 --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/main/java/org/apache/rya/indexing/mongodb/geo/MongoGeoTupleSet.java @@ -0,0 +1,361 @@ +package org.apache.rya.indexing.mongodb.geo; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.GeoIndexer; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.IteratorFactory; +import org.apache.rya.indexing.SearchFunction; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.accumulo.geo.GeoTupleSet; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; + +public class MongoGeoTupleSet extends ExternalTupleSet { + + private Configuration conf; + private GeoIndexer geoIndexer; + private IndexingExpr filterInfo; + + + public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) { + this.filterInfo = filterInfo; + this.geoIndexer = geoIndexer; + this.conf = geoIndexer.getConf(); + } + + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + public GeoTupleSet clone() { + return new GeoTupleSet(filterInfo, geoIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + @Override + public String getSignature() { + return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + } + + + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof MongoGeoTupleSet)) { + return false; + } + MongoGeoTupleSet arg = (MongoGeoTupleSet) other; + return this.filterInfo.equals(arg.filterInfo); + } + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + + /** + * Returns an iterator over the result set of the contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + + + URI funcURI = filterInfo.getFunction(); + SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI); + if(filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + String queryText = ((Value) filterInfo.getArguments()[0]).stringValue(); + + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + + + + //returns appropriate search function for a given URI + //search functions used in GeoMesaGeoIndexer to access index + public class MongoGeoSearchFunctionFactory { + + Configuration conf; + + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + + public MongoGeoSearchFunctionFactory(Configuration conf) { + this.conf = conf; + } + + + /** + * Get a {@link GeoSearchFunction} for a given URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + + SearchFunction geoFunc = null; + + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + } + + private final SearchFunction GEO_EQUALS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_EQUALS"; + }; + }; + + private final SearchFunction GEO_DISJOINT = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_DISJOINT"; + }; + }; + + private final SearchFunction GEO_INTERSECTS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_INTERSECTS"; + }; + }; + + private final SearchFunction GEO_TOUCHES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_TOUCHES"; + }; + }; + + private final SearchFunction GEO_CONTAINS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CONTAINS"; + }; + }; + + private final SearchFunction GEO_OVERLAPS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_OVERLAPS"; + }; + }; + + private final SearchFunction GEO_CROSSES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CROSSES"; + }; + }; + + private final SearchFunction GEO_WITHIN = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_WITHIN"; + }; + }; + + { + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java new file mode 100644 index 0000000..7151b56 --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalProviderTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import java.util.List; + +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexSetProvider; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +public class GeoTemporalProviderTest extends GeoTemporalTestBase { + private static final String URI_PROPERTY_AT_TIME = "Property:atTime"; + private GeoTemporalIndexSetProvider provider; + private EventStorage events; + @Before + public void setup() { + events = mock(EventStorage.class); + provider = new GeoTemporalIndexSetProvider(events); + } + + /* + * Simplest Happy Path test + */ + @Test + public void twoPatternsTwoFilters_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(time:equals(?time, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(1, nodes.size()); + } + + @Test + public void onePatternTwoFilters_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(time:equals(?time, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(0, nodes.size()); + } + + @Test + public void twoPatternsOneFilter_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(0, nodes.size()); + } + + @Test + public void twoPatternsNoFilter_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(0, nodes.size()); + } + + @Test + public void twoPatternsTwoFiltersNotValid_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + //Only handles geo and temporal filters + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX text: <http://rdf.useekm.com/fts#text>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(text:equals(?time, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(0, nodes.size()); + } + + @Test + public void twoSubjOneFilter_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + "?subj2 <" + tempPred + "> ?time2 ."+ + "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(time:equals(?time, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(1, nodes.size()); + } + + @Test + public void twoNode_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + "?subj2 <" + tempPred + "> ?time2 ."+ + "?subj2 <" + GeoConstants.GEO_AS_WKT + "> ?loc2 . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(time:equals(?time, " + temp + ")) . " + + " FILTER(geos:sfContains(?loc2, " + geo + ")) . " + + " FILTER(time:equals(?time2, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(2, nodes.size()); + } + + @Test + public void twoSubjectMultiFilter_test() throws Exception { + final ValueFactory vf = new ValueFactoryImpl(); + final Value geo = vf.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + final Value temp = vf.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + final URI tempPred = vf.createURI(URI_PROPERTY_AT_TIME); + final String query = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geos: <http://www.opengis.net/def/function/geosparql/>" + + "PREFIX time: <tag:rya-rdf.org,2015:temporal#>" + + "SELECT * WHERE { " + + "?subj <" + tempPred + "> ?time ."+ + "?subj <" + GeoConstants.GEO_AS_WKT + "> ?loc . " + + " FILTER(geos:sfContains(?loc, " + geo + ")) . " + + " FILTER(time:equals(?time, " + temp + ")) . " + + " FILTER(geos:sfWithin(?loc, " + geo + ")) . " + + " FILTER(time:before(?time, " + temp + ")) . " + + "}"; + final QuerySegment<EventQueryNode> node = getQueryNode(query); + final List<EventQueryNode> nodes = provider.getExternalSets(node); + assertEquals(1, nodes.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java new file mode 100644 index 0000000..6b6bf15 --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/GeoTemporalTestBase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.external.matching.QuerySegment; +import org.apache.rya.indexing.geotemporal.model.EventQueryNode; +import org.junit.ComparisonFailure; +import org.mockito.Mockito; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.LineString; +import com.vividsolutions.jts.geom.LinearRing; +import com.vividsolutions.jts.geom.Point; +import com.vividsolutions.jts.geom.Polygon; +import com.vividsolutions.jts.geom.PrecisionModel; +import com.vividsolutions.jts.geom.impl.PackedCoordinateSequence; + +public class GeoTemporalTestBase { + private static final GeometryFactory gf = new GeometryFactory(new PrecisionModel(), 4326); + + /** + * Make an uniform instant with given seconds. + */ + protected static TemporalInstant makeInstant(final int secondsMakeMeUnique) { + return new TemporalInstantRfc3339(2015, 12, 30, 12, 00, secondsMakeMeUnique); + } + + protected static Polygon poly(final double[] arr) { + final LinearRing r1 = gf.createLinearRing(new PackedCoordinateSequence.Double(arr, 2)); + final Polygon p1 = gf.createPolygon(r1, new LinearRing[] {}); + return p1; + } + + protected static Point point(final double x, final double y) { + return gf.createPoint(new Coordinate(x, y)); + } + + protected static LineString line(final double x1, final double y1, final double x2, final double y2) { + return new LineString(new PackedCoordinateSequence.Double(new double[] { x1, y1, x2, y2 }, 2), gf); + } + + protected static double[] bbox(final double x1, final double y1, final double x2, final double y2) { + return new double[] { x1, y1, x1, y2, x2, y2, x2, y1, x1, y1 }; + } + + protected void assertEqualMongo(final Object expected, final Object actual) throws ComparisonFailure { + try { + assertEquals(expected, actual); + } catch(final Throwable e) { + throw new ComparisonFailure(e.getMessage(), expected.toString(), actual.toString()); + } + } + + public List<FunctionCall> getFilters(final String query) throws Exception { + final FunctionCallCollector collector = new FunctionCallCollector(); + new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector); + return collector.getTupleExpr(); + } + + public List<StatementPattern> getSps(final String query) throws Exception { + final StatementPatternCollector collector = new StatementPatternCollector(); + new SPARQLParser().parseQuery(query, null).getTupleExpr().visit(collector); + return collector.getStatementPatterns(); + } + + public QuerySegment<EventQueryNode> getQueryNode(final String query) throws Exception { + final List<QueryModelNode> exprs = getNodes(query); + final QuerySegment<EventQueryNode> node = Mockito.mock(QuerySegment.class); + //provider only cares about Ordered nodes. + Mockito.when(node.getOrderedNodes()).thenReturn(exprs); + return node; + } + + private static List<QueryModelNode> getNodes(final String sparql) throws Exception { + final NodeCollector collector = new NodeCollector(); + new SPARQLParser().parseQuery(sparql, null).getTupleExpr().visit(collector); + return collector.getTupleExpr(); + } + + private static class NodeCollector extends QueryModelVisitorBase<RuntimeException> { + private final List<QueryModelNode> stPatterns = new ArrayList<>(); + + public List<QueryModelNode> getTupleExpr() { + return stPatterns; + } + + @Override + public void meet(final FunctionCall node) { + stPatterns.add(node); + } + + @Override + public void meet(final StatementPattern node) { + stPatterns.add(node); + } + } + + private static class FunctionCallCollector extends QueryModelVisitorBase<RuntimeException> { + private final List<FunctionCall> filters = new ArrayList<>(); + + public List<FunctionCall> getTupleExpr() { + return filters; + } + + @Override + public void meet(final FunctionCall node) { + filters.add(node); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9e76b8d7/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java new file mode 100644 index 0000000..ff778ba --- /dev/null +++ b/extras/rya.geoindexing/geo.mongo/src/test/java/org/apache/rya/indexing/geotemporal/MongoGeoTemporalIndexIT.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.GeoRyaSailFactory; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer; +import org.apache.rya.indexing.geotemporal.mongo.MongoITBase; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +import com.mongodb.MongoClient; + +public class MongoGeoTemporalIndexIT extends MongoITBase { + private static final String URI_PROPERTY_AT_TIME = "Property:atTime"; + + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + private MongoDBRdfConfiguration conf; + private SailRepositoryConnection conn; + private MongoClient mongoClient; + private static final AtomicInteger COUNTER = new AtomicInteger(1); + + @Before + public void setUp() throws Exception{ + mongoClient = super.getMongoClient(); + conf = new MongoDBRdfConfiguration(); + conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, MongoGeoTemporalIndexIT.class.getSimpleName() + "_" + COUNTER.getAndIncrement()); + conf.set(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya"); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya"); + conf.setBoolean(ConfigUtils.USE_MONGO, true); + conf.setBoolean(OptionalConfigUtils.USE_GEOTEMPORAL, true); + conf.setMongoClient(mongoClient); + + final Sail sail = GeoRyaSailFactory.getInstance(conf); + conn = new SailRepository(sail).getConnection(); + conn.begin(); + + addStatements(); + } + + @Test + public void ensureInEventStore_Test() throws Exception { + final MongoGeoTemporalIndexer indexer = new MongoGeoTemporalIndexer(); + indexer.initIndexer(conf, mongoClient); + + final EventStorage events = indexer.getEventStorage(conf); + final RyaURI subject = new RyaURI("urn:event1"); + final Optional<Event> event = events.get(subject); + assertTrue(event.isPresent()); + } + + @Test + public void constantSubjQuery_Test() throws Exception { + final String query = + "PREFIX time: <http://www.w3.org/2006/time#> \n" + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n" + + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>" + + "SELECT * " + + "WHERE { " + + " <urn:event1> time:atTime ?time . " + + " <urn:event1> geo:asWKT ?point . " + + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) " + + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) " + + "}"; + + final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate(); + final Set<BindingSet> results = new HashSet<>(); + while(rez.hasNext()) { + final BindingSet bs = rez.next(); + results.add(bs); + } + final MapBindingSet expected = new MapBindingSet(); + expected.addBinding("point", VF.createLiteral("POINT (0 0)")); + expected.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z")); + + assertEquals(1, results.size()); + assertEquals(expected, results.iterator().next()); + } + + @Test + public void variableSubjQuery_Test() throws Exception { + final String query = + "PREFIX time: <http://www.w3.org/2006/time#> \n" + + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n" + + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>" + + "PREFIX geof: <http://www.opengis.net/def/function/geosparql/>" + + "SELECT * " + + "WHERE { " + + " ?subj time:atTime ?time . " + + " ?subj geo:asWKT ?point . " + + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) " + + " FILTER(tempo:equals(?time, \"2015-12-30T12:00:00Z\")) " + + "}"; + + final TupleQueryResult rez = conn.prepareTupleQuery(QueryLanguage.SPARQL, query).evaluate(); + final List<BindingSet> results = new ArrayList<>(); + while(rez.hasNext()) { + final BindingSet bs = rez.next(); + results.add(bs); + } + final MapBindingSet expected1 = new MapBindingSet(); + expected1.addBinding("point", VF.createLiteral("POINT (0 0)")); + expected1.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z")); + + final MapBindingSet expected2 = new MapBindingSet(); + expected2.addBinding("point", VF.createLiteral("POINT (1 1)")); + expected2.addBinding("time", VF.createLiteral("2015-12-30T12:00:00Z")); + + assertEquals(2, results.size()); + assertEquals(expected1, results.get(0)); + assertEquals(expected2, results.get(1)); + } + + private void addStatements() throws Exception { + URI subject = VF.createURI("urn:event1"); + final URI predicate = VF.createURI(URI_PROPERTY_AT_TIME); + Value object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + conn.add(VF.createStatement(subject, predicate, object)); + + object = VF.createLiteral("Point(0 0)", GeoConstants.XMLSCHEMA_OGC_WKT); + conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object)); + + subject = VF.createURI("urn:event2"); + object = VF.createLiteral(new TemporalInstantRfc3339(2015, 12, 30, 12, 00, 0).toString()); + conn.add(VF.createStatement(subject, predicate, object)); + + object = VF.createLiteral("Point(1 1)", GeoConstants.XMLSCHEMA_OGC_WKT); + conn.add(VF.createStatement(subject, GeoConstants.GEO_AS_WKT, object)); + } +}