RYA-237 Mongo GeoTemporal Indexer Mongo support for the GeoTemporalIndexer
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63095d45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63095d45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63095d45 Branch: refs/heads/master Commit: 63095d45a357da622343b42ed5ce22fd8217cc80 Parents: af5964a Author: isper3at <[email protected]> Authored: Wed Jan 18 15:38:11 2017 -0500 Committer: Aaron Mihalik <[email protected]> Committed: Wed Jun 14 13:27:56 2017 -0400 ---------------------------------------------------------------------- .../mongo/EventDocumentConverter.java | 147 ++++++++++ .../geotemporal/mongo/EventUpdater.java | 85 ++++++ .../GeoTemporalMongoDBStorageStrategy.java | 293 +++++++++++++++++++ .../geotemporal/mongo/MongoEventStorage.java | 196 +++++++++++++ .../mongo/MongoGeoTemporalIndexer.java | 222 ++++++++++++++ 5 files changed, 943 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java new file mode 100644 index 0000000..a41428e --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventDocumentConverter.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Date; +import java.util.List; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy; +import org.bson.Document; +import org.bson.types.BasicBSONList; +import org.joda.time.DateTime; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.GeometryFactory; + +public class EventDocumentConverter implements DocumentConverter<Event>{ + public static final String SUBJECT = "_id"; + public static final String GEO_KEY = "location"; + public static final String INTERVAL_START = "start"; + public static final String INTERVAL_END = "end"; + public static final String INSTANT = "instant"; + + private final GeoMongoDBStorageStrategy geoAdapter = new GeoMongoDBStorageStrategy(0); + + @Override + public Document toDocument(final Event event) { + requireNonNull(event); + + final Document doc = new Document(); + doc.append(SUBJECT, event.getSubject().getData()); + + if(event.getGeometry().isPresent()) { + final BasicBSONList points = new BasicBSONList(); + for(final double[] point : geoAdapter.getCorrespondingPoints(event.getGeometry().get())) { + final BasicBSONList pointDoc = new BasicBSONList(); + for(final double p : point) { + pointDoc.add(p); + } + points.add(pointDoc); + } + + doc.append(GEO_KEY, points); + } + if(event.isInstant()) { + if(event.getInstant().isPresent()) { + doc.append(INSTANT, event.getInstant().get().getAsDateTime().toDate()); + } + } else { + if(event.getInterval().isPresent()) { + doc.append(INTERVAL_START, event.getInterval().get().getHasBeginning().getAsDateTime().toDate()); + doc.append(INTERVAL_END, event.getInterval().get().getHasEnd().getAsDateTime().toDate()); + } + } + + return doc; + } + + @Override + public Event fromDocument(final Document document) throws DocumentConverterException { + requireNonNull(document); + + final boolean isInstant; + + // Preconditions. + if(!document.containsKey(SUBJECT)) { + throw new DocumentConverterException("Could not convert document '" + document + + "' because its '" + SUBJECT + "' field is missing."); + } + + if(document.containsKey(INSTANT)) { + isInstant = true; + } else { + isInstant = false; + } + + final String subject = document.getString(SUBJECT); + + final Event.Builder builder = new Event.Builder() + .setSubject(new RyaURI(subject)); + + if(document.containsKey(GEO_KEY)) { + final List<List<Double>> pointsList = (List<List<Double>>) document.get(GEO_KEY); + final Coordinate[] coords = new Coordinate[pointsList.size()]; + + int ii = 0; + for(final List<Double> point : pointsList) { + coords[ii] = new Coordinate(point.get(0), point.get(1)); + ii++; + } + + final GeometryFactory geoFact = new GeometryFactory(); + final Geometry geo; + if(coords.length == 1) { + geo = geoFact.createPoint(coords[0]); + } else { + geo = geoFact.createPolygon(coords); + } + builder.setGeometry(geo); + } + + if(isInstant) { + //we already know the key exists + final Date date = (Date) document.get(INSTANT); + final DateTime dt = new DateTime(date.getTime()); + final TemporalInstant instant = new TemporalInstantRfc3339(dt); + builder.setTemporalInstant(instant); + } else if(document.containsKey(INTERVAL_START)){ + Date date = (Date) document.get(INTERVAL_START); + DateTime dt = new DateTime(date.getTime()); + final TemporalInstant begining = new TemporalInstantRfc3339(dt); + + date = (Date) document.get(INTERVAL_END); + dt = new DateTime(date.getTime()); + final TemporalInstant end = new TemporalInstantRfc3339(dt); + + final TemporalInterval interval = new TemporalInterval(begining, end); + builder.setTemporalInterval(interval); + } + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java new file mode 100644 index 0000000..c9f4658 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/EventUpdater.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.indexing.geotemporal.storage.EventStorage.EventStorageException; +import org.apache.rya.indexing.mongodb.update.DocumentUpdater; +import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Performs update operations over an {@link EventStorage}. + */ +@DefaultAnnotation(NonNull.class) +public class EventUpdater implements DocumentUpdater<RyaURI, Event>{ + private final EventStorage events; + + /** + * Constructs an instance of {@link EventUpdater} + * + * @param events - The storage this updater operates over. (not null) + */ + public EventUpdater(final EventStorage events) { + this.events = requireNonNull(events); + } + + @Override + public Optional<Event> getOld(final RyaURI key) throws EventStorageException { + try { + return events.get(key); + } catch (final ObjectStorageException e) { + throw new EventStorageException(e.getMessage(), e); + } + } + + @Override + public void create(final Event newObj) throws EventStorageException { + try { + events.create(newObj); + } catch (final ObjectStorageException e) { + throw new EventStorageException(e.getMessage(), e); + } + } + + @Override + public void update(final Event old, final Event updated) throws EventStorageException { + try { + events.update(old, updated); + } catch (final ObjectStorageException e) { + throw new EventStorageException(e.getMessage(), e); + } + } + + public void delete(final Event event) throws EventStorageException { + try { + events.delete(event.getSubject()); + } catch (final ObjectStorageException e) { + throw new EventStorageException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java new file mode 100644 index 0000000..352dcb6 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/GeoTemporalMongoDBStorageStrategy.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS; +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS; +import static org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN; +import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT; +import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END; +import static org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; + +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.GeoPolicy; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer.TemporalPolicy; +import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; +import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy; +import org.apache.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery; +import org.apache.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy; +import org.joda.time.DateTime; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.MalformedQueryException; + +import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.QueryBuilder; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +import jline.internal.Log; + +/** + * TODO: doc + */ +public class GeoTemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { + private static final Logger LOG = Logger.getLogger(GeoTemporalMongoDBStorageStrategy.class); + private static final String GEO_KEY = "location"; + private static final String TIME_KEY = "time"; + private final TemporalMongoDBStorageStrategy temporalStrategy; + private final GeoMongoDBStorageStrategy geoStrategy; + + public GeoTemporalMongoDBStorageStrategy() { + geoStrategy = new GeoMongoDBStorageStrategy(0); + temporalStrategy = new TemporalMongoDBStorageStrategy(); + } + + @Override + public void createIndices(final DBCollection coll){ + coll.createIndex(GEO_KEY); + coll.createIndex(TIME_KEY); + } + + public DBObject getFilterQuery(final Collection<IndexingExpr> geoFilters, final Collection<IndexingExpr> temporalFilters) throws GeoTemporalIndexException { + final QueryBuilder builder = QueryBuilder.start(); + + if(!geoFilters.isEmpty()) { + final DBObject[] geo = getGeoObjs(geoFilters); + if(!temporalFilters.isEmpty()) { + final DBObject[] temporal = getTemporalObjs(temporalFilters); + builder.and(oneOrAnd(geo), oneOrAnd(temporal)); + return builder.get(); + } else { + return oneOrAnd(geo); + } + } else if(!temporalFilters.isEmpty()) { + final DBObject[] temporal = getTemporalObjs(temporalFilters); + return oneOrAnd(temporal); + } else { + return builder.get(); + } + } + + private DBObject oneOrAnd(final DBObject[] dbos) { + if(dbos.length == 1) { + return dbos[0]; + } + return QueryBuilder.start() + .and(dbos) + .get(); + } + + @Override + public DBObject serialize(final RyaStatement ryaStatement) { + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start("_id", ryaStatement.getSubject().hashCode()); + final URI obj = ryaStatement.getObject().getDataType(); + + + if(obj.equals(GeoConstants.GEO_AS_WKT) || obj.equals(GeoConstants.GEO_AS_GML) || + obj.equals(GeoConstants.XMLSCHEMA_OGC_GML) || obj.equals(GeoConstants.XMLSCHEMA_OGC_WKT)) { + try { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + final Geometry geo = GeoParseUtils.getGeometry(statement); + builder.add(GEO_KEY, geoStrategy.getCorrespondingPoints(geo)); + } catch (final ParseException e) { + LOG.error("Could not create geometry for statement " + ryaStatement, e); + return null; + } + } else { + builder.add(TIME_KEY, temporalStrategy.getTimeValue(ryaStatement.getObject().getData())); + } + return builder.get(); + } + + private DBObject[] getGeoObjs(final Collection<IndexingExpr> geoFilters) { + final List<DBObject> objs = new ArrayList<>(); + geoFilters.forEach(filter -> { + final GeoPolicy policy = GeoPolicy.fromURI(filter.getFunction()); + final WKTReader reader = new WKTReader(); + final String geoStr = filter.getArguments()[0].stringValue(); + try { + //This method is what is used in the GeoIndexer. + final Geometry geo = reader.read(geoStr); + objs.add(getGeoObject(geo, policy)); + } catch (final GeoTemporalIndexException | UnsupportedOperationException | ParseException e) { + Log.error("Unable to parse '" + geoStr + "'.", e); + } + }); + return objs.toArray(new DBObject[]{}); + } + + private DBObject[] getTemporalObjs(final Collection<IndexingExpr> temporalFilters) { + final List<DBObject> objs = new ArrayList<>(); + temporalFilters.forEach(filter -> { + final TemporalPolicy policy = TemporalPolicy.fromURI(filter.getFunction()); + final String timeStr = filter.getArguments()[0].stringValue(); + final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(timeStr); + if(matcher.find()) { + final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(timeStr); + if(policy == TemporalPolicy.INSTANT_AFTER_INSTANT || + policy == TemporalPolicy.INSTANT_BEFORE_INSTANT || + policy == TemporalPolicy.INSTANT_EQUALS_INSTANT) { + if(interval == null) { + Log.error("Cannot perform temporal interval based queries on an instant."); + } + } + objs.add(getTemporalObject(interval, policy)); + } else { + final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(timeStr)); + if(policy != TemporalPolicy.INSTANT_AFTER_INSTANT && + policy != TemporalPolicy.INSTANT_BEFORE_INSTANT && + policy != TemporalPolicy.INSTANT_EQUALS_INSTANT) { + Log.error("Cannot perform temporal instant based queries on an interval."); + } + objs.add(getTemporalObject(instant, policy)); + } + }); + return objs.toArray(new DBObject[]{}); + } + + private DBObject getGeoObject (final Geometry geo, final GeoPolicy policy) throws GeoTemporalIndexException { + switch(policy) { + case CONTAINS: + throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB."); + case CROSSES: + throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB."); + case DISJOINT: + throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB."); + case EQUALS: + try { + return geoStrategy.getQuery(new GeoQuery(EQUALS, geo)); + } catch (final MalformedQueryException e) { + throw new GeoTemporalIndexException(e.getMessage(), e); + } + case INTERSECTS: + try { + return geoStrategy.getQuery(new GeoQuery(INTERSECTS, geo)); + } catch (final MalformedQueryException e) { + throw new GeoTemporalIndexException(e.getMessage(), e); + } + case OVERLAPS: + throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB."); + case TOUCHES: + throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB."); + case WITHIN: + try { + return geoStrategy.getQuery(new GeoQuery(WITHIN, geo)); + } catch (final MalformedQueryException e) { + throw new GeoTemporalIndexException(e.getMessage(), e); + } + default: + return new BasicDBObject(); + } + } + + private DBObject getTemporalObject(final TemporalInstant instant, final TemporalPolicy policy) { + final DBObject temporalObj; + switch(policy) { + case INSTANT_AFTER_INSTANT: + temporalObj = QueryBuilder.start(INSTANT) + .greaterThan(instant.getAsDateTime().toDate()) + .get(); + break; + case INSTANT_BEFORE_INSTANT: + temporalObj = QueryBuilder.start(INSTANT) + .lessThan(instant.getAsDateTime().toDate()) + .get(); + break; + case INSTANT_EQUALS_INSTANT: + temporalObj = QueryBuilder.start(INSTANT) + .is(instant.getAsDateTime().toDate()) + .get(); + break; + default: + temporalObj = new BasicDBObject(); + } + return temporalObj; + } + + private DBObject getTemporalObject(final TemporalInterval interval, final TemporalPolicy policy) { + final DBObject temporalObj; + switch(policy) { + case INSTANT_AFTER_INTERVAL: + temporalObj = QueryBuilder.start(INSTANT) + .greaterThan(interval.getHasEnd().getAsDateTime().toDate()) + .get(); + break; + case INSTANT_BEFORE_INTERVAL: + temporalObj = QueryBuilder.start(INSTANT) + .lessThan(interval.getHasBeginning().getAsDateTime().toDate()) + .get(); + break; + case INSTANT_END_INTERVAL: + temporalObj = QueryBuilder.start(INSTANT) + .is(interval.getHasEnd().getAsDateTime().toDate()) + .get(); + break; + case INSTANT_IN_INTERVAL: + temporalObj = QueryBuilder.start(INSTANT) + .greaterThan(interval.getHasBeginning().getAsDateTime().toDate()) + .lessThan(interval.getHasEnd().getAsDateTime().toDate()) + .get(); + break; + case INSTANT_START_INTERVAL: + temporalObj = QueryBuilder.start(INSTANT) + .is(interval.getHasBeginning().getAsDateTime().toDate()) + .get(); + break; + case INTERVAL_AFTER: + temporalObj = QueryBuilder.start(INTERVAL_START) + .greaterThan(interval.getHasEnd().getAsDateTime().toDate()) + .get(); + break; + case INTERVAL_BEFORE: + temporalObj = QueryBuilder.start(INTERVAL_END) + .lessThan(interval.getHasBeginning().getAsDateTime().toDate()) + .get(); + break; + case INTERVAL_EQUALS: + temporalObj = QueryBuilder.start(INTERVAL_START) + .is(interval.getHasBeginning().getAsDateTime().toDate()) + .and(INTERVAL_END) + .is(interval.getHasEnd().getAsDateTime().toDate()) + .get(); + break; + default: + temporalObj = new BasicDBObject(); + } + return temporalObj; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java new file mode 100644 index 0000000..8ddf075 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoEventStorage.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.IndexingExpr; +import org.apache.rya.indexing.entity.model.TypedEntity; +import org.apache.rya.indexing.entity.storage.mongo.DocumentConverter.DocumentConverterException; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexException; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.indexing.entity.storage.mongo.MongoEntityStorage; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.DBObject; +import com.mongodb.ErrorCategory; +import com.mongodb.MongoClient; +import com.mongodb.MongoException; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; + +public class MongoEventStorage implements EventStorage { + + protected static final String COLLECTION_NAME = "geotemporal-events"; + + private static final EventDocumentConverter EVENT_CONVERTER = new EventDocumentConverter(); + + /** + * A client connected to the Mongo instance that hosts the Rya instance. + */ + protected final MongoClient mongo; + + /** + * The name of the Rya instance the {@link TypedEntity}s are for. + */ + protected final String ryaInstanceName; + + /* + * Used to get the filter query objects. + */ + private final GeoTemporalMongoDBStorageStrategy queryAdapter; + + /** + * Constructs an instance of {@link MongoEntityStorage}. + * + * @param mongo - A client connected to the Mongo instance that hosts the Rya instance. (not null) + * @param ryaInstanceName - The name of the Rya instance the {@link TypedEntity}s are for. (not null) + */ + public MongoEventStorage(final MongoClient mongo, final String ryaInstanceName) { + this.mongo = requireNonNull(mongo); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + queryAdapter = new GeoTemporalMongoDBStorageStrategy(); + } + + @Override + public void create(final Event event) throws EventStorageException { + requireNonNull(event); + + try { + mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .insertOne(EVENT_CONVERTER.toDocument(event)); + } catch(final MongoException e) { + final ErrorCategory category = ErrorCategory.fromErrorCode( e.getCode() ); + if(category == ErrorCategory.DUPLICATE_KEY) { + throw new EventAlreadyExistsException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e); + } + throw new EventStorageException("Failed to create Event with Subject '" + event.getSubject().getData() + "'.", e); + } + } + + @Override + public Optional<Event> get(final RyaURI subject) throws EventStorageException { + requireNonNull(subject); + + try { + final Document document = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( new BsonDocument(EventDocumentConverter.SUBJECT, new BsonString(subject.getData())) ) + .first(); + + return document == null ? + Optional.empty() : + Optional.of( EVENT_CONVERTER.fromDocument(document) ); + + } catch(final MongoException | DocumentConverterException e) { + throw new EventStorageException("Could not get the Event with Subject '" + subject.getData() + "'.", e); + } + } + + @Override + public Collection<Event> search(final Optional<RyaURI> subject, final Optional<Collection<IndexingExpr>> geoFilters, final Optional<Collection<IndexingExpr>> temporalFilters) throws EventStorageException { + requireNonNull(subject); + + try { + final Collection<IndexingExpr> geos = (geoFilters.isPresent() ? geoFilters.get() : new ArrayList<>()); + final Collection<IndexingExpr> tempos = (temporalFilters.isPresent() ? temporalFilters.get() : new ArrayList<>()); + final DBObject filterObj = queryAdapter.getFilterQuery(geos, tempos); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder + .start(filterObj.toMap()); + if(subject.isPresent()) { + builder.append(EventDocumentConverter.SUBJECT, subject.get().getData()); + } + final MongoCursor<Document> results = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .find( BsonDocument.parse(builder.get().toString()) ) + .iterator(); + + final List<Event> events = new ArrayList<>(); + final EventDocumentConverter adapter = new EventDocumentConverter(); + while(results.hasNext()) { + events.add(adapter.fromDocument(results.next())); + } + return events; + } catch(final MongoException | DocumentConverterException | GeoTemporalIndexException e) { + throw new EventStorageException("Could not get the Event.", e); + } + } + + @Override + public void update(final Event old, final Event updated) throws StaleUpdateException, EventStorageException { + requireNonNull(old); + requireNonNull(updated); + + // The updated entity must have the same Subject as the one it is replacing. + if(!old.getSubject().equals(updated.getSubject())) { + throw new EventStorageException("The old Event and the updated Event must have the same Subject. " + + "Old Subject: " + old.getSubject().getData() + ", Updated Subject: " + updated.getSubject().getData()); + } + + final Set<Bson> filters = new HashSet<>(); + + // Must match the old entity's Subject. + filters.add( makeSubjectFilter(old.getSubject()) ); + + // Do a find and replace. + final Bson oldEntityFilter = Filters.and(filters); + final Document updatedDoc = EVENT_CONVERTER.toDocument(updated); + + final MongoCollection<Document> collection = mongo.getDatabase(ryaInstanceName).getCollection(COLLECTION_NAME); + if(collection.findOneAndReplace(oldEntityFilter, updatedDoc) == null) { + throw new StaleUpdateException("Could not update the Event with Subject '" + updated.getSubject().getData() + "."); + } + } + + @Override + public boolean delete(final RyaURI subject) throws EventStorageException { + requireNonNull(subject); + + try { + final Document deleted = mongo.getDatabase(ryaInstanceName) + .getCollection(COLLECTION_NAME) + .findOneAndDelete( makeSubjectFilter(subject) ); + + return deleted != null; + + } catch(final MongoException e) { + throw new EventStorageException("Could not delete the Event with Subject '" + subject.getData() + "'.", e); + } + } + + private static Bson makeSubjectFilter(final RyaURI subject) { + return Filters.eq(EventDocumentConverter.SUBJECT, subject.getData()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63095d45/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java new file mode 100644 index 0000000..1baab18 --- /dev/null +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/geotemporal/mongo/MongoGeoTemporalIndexer.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.geotemporal.mongo; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.geo.GeoParseUtils; +import org.apache.rya.indexing.geotemporal.GeoTemporalIndexer; +import org.apache.rya.indexing.geotemporal.model.Event; +import org.apache.rya.indexing.geotemporal.storage.EventStorage; +import org.apache.rya.indexing.mongodb.AbstractMongoIndexer; +import org.apache.rya.indexing.mongodb.IndexingException; +import org.apache.rya.mongodb.MongoConnectorFactory; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.joda.time.DateTime; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; + +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +/** + * Indexer that stores 2 separate statements as one 'Event' entity. + * <p> + * The statements are required to have the same subject, one must be + * a Geo based statement and the other a temporal based statement. + * <p> + * This indexer is later used when querying for geo/temporal statements + * in the format of: + * <pre> + * {@code + * QUERY PARAMS + * ?SomeSubject geo:predicate ?Location + * ?SomeSubject time:predicate ?Time + * Filter(?Location, geoFunction()) + * Filter(?Time, temporalFunction()) + * } + * + * The number of filters is not strict, but there must be at least one + * query pattern for geo and one for temporal as well as at least one + * filter for each type. + */ +public class MongoGeoTemporalIndexer extends AbstractMongoIndexer<GeoTemporalMongoDBStorageStrategy> implements GeoTemporalIndexer { + private static final Logger LOG = Logger.getLogger(MongoGeoTemporalIndexer.class); + public static final String GEO_TEMPORAL_COLLECTION = "geo_temporal"; + + private final AtomicReference<MongoDBRdfConfiguration> configuration = new AtomicReference<>(); + private final AtomicReference<EventStorage> events = new AtomicReference<>(); + + @Override + public void init() { + initCore(); + predicates = ConfigUtils.getGeoPredicates(conf); + predicates.addAll(ConfigUtils.getTemporalPredicates(conf)); + storageStrategy = new GeoTemporalMongoDBStorageStrategy(); + } + + @Override + public void setConf(final Configuration conf) { + requireNonNull(conf); + events.set(null); + events.set(getEventStorage(conf)); + super.conf = conf; + configuration.set(new MongoDBRdfConfiguration(conf)); + } + + @Override + public void storeStatement(final RyaStatement ryaStatement) throws IOException { + requireNonNull(ryaStatement); + + try { + updateEvent(ryaStatement.getSubject(), ryaStatement); + } catch (IndexingException | ParseException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + + @Override + public void deleteStatement(final RyaStatement statement) throws IOException { + requireNonNull(statement); + final RyaURI subject = statement.getSubject(); + try { + final EventStorage eventStore = events.get(); + checkState(events != null, "Must set this indexers configuration before storing statements."); + + new EventUpdater(eventStore).update(subject, old -> { + final Event.Builder updated; + if(!old.isPresent()) { + return Optional.empty(); + } else { + updated = Event.builder(old.get()); + } + + final Event currentEvent = updated.build(); + final URI pred = statement.getObject().getDataType(); + if((pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) || + pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) + && currentEvent.getGeometry().isPresent()) { + //is geo and needs to be removed. + try { + if(currentEvent.getGeometry().get().equals(GeoParseUtils.getGeometry(RyaToRdfConversions.convertStatement(statement)))) { + updated.setGeometry(null); + } + } catch (final Exception e) { + LOG.debug("Unable to parse the stored geometry."); + } + } else { + //is time + final String dateTime = statement.getObject().getData(); + final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime); + if (matcher.find()) { + final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime); + if(currentEvent.getInterval().get().equals(interval)) { + updated.setTemporalInterval(null); + } + } else { + final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime)); + if(currentEvent.getInstant().get().equals(instant)) { + updated.setTemporalInstant(null); + } + } + } + return Optional.of(updated.build()); + }); + } catch (final IndexingException e) { + throw new IOException("Failed to update the Entity index.", e); + } + } + + private void updateEvent(final RyaURI subject, final RyaStatement statement) throws IndexingException, ParseException { + final EventStorage eventStore = events.get(); + checkState(events != null, "Must set this indexers configuration before storing statements."); + + new EventUpdater(eventStore).update(subject, old -> { + final Event.Builder updated; + if(!old.isPresent()) { + updated = Event.builder() + .setSubject(subject); + } else { + updated = Event.builder(old.get()); + } + + final URI pred = statement.getObject().getDataType(); + if(pred.equals(GeoConstants.GEO_AS_WKT) || pred.equals(GeoConstants.GEO_AS_GML) || + pred.equals(GeoConstants.XMLSCHEMA_OGC_WKT) || pred.equals(GeoConstants.XMLSCHEMA_OGC_GML)) { + //is geo + try { + final Statement geoStatement = RyaToRdfConversions.convertStatement(statement); + final Geometry geometry = GeoParseUtils.getGeometry(geoStatement); + updated.setGeometry(geometry); + } catch (final ParseException e) { + LOG.error(e.getMessage(), e); + } + } else { + //is time + final String dateTime = statement.getObject().getData(); + final Matcher matcher = TemporalInstantRfc3339.PATTERN.matcher(dateTime); + if (matcher.find()) { + final TemporalInterval interval = TemporalInstantRfc3339.parseInterval(dateTime); + updated.setTemporalInterval(interval); + } else { + final TemporalInstant instant = new TemporalInstantRfc3339(DateTime.parse(dateTime)); + updated.setTemporalInstant(instant); + } + } + return Optional.of(updated.build()); + }); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getTablePrefix(conf) + GEO_TEMPORAL_COLLECTION; + } + + @Override + public EventStorage getEventStorage(final Configuration conf) { + if(events.get() != null) { + return events.get(); + } + + final MongoDBRdfConfiguration mongoConf = (MongoDBRdfConfiguration) conf; + mongoClient = mongoConf.getMongoClient(); + if (mongoClient == null) { + mongoClient = MongoConnectorFactory.getMongoClient(conf); + } + final String ryaInstanceName = mongoConf.getMongoDBName(); + events.set(new MongoEventStorage(mongoClient, ryaInstanceName)); + return events.get(); + } +}
