http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java deleted file mode 100644 index c21f574..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java +++ /dev/null @@ -1,150 +0,0 @@ -package mvm.rya.indexing.mongodb; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.StatementSerializer; -import mvm.rya.indexing.accumulo.geo.GeoParseUtils; -import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; - -import org.apache.commons.codec.binary.Hex; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; - -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.vividsolutions.jts.geom.Coordinate; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; -import com.vividsolutions.jts.io.WKTReader; - -public class GeoMongoDBStorageStrategy extends SimpleMongoDBStorageStrategy{ - - private static final String GEO = "location"; - public enum GeoQueryType { - INTERSECTS { - public String getKeyword() { - return "$geoIntersects"; - } - }, WITHIN { - public String getKeyword() { - return "$geoWithin"; - } - }, - EQUALS { - public String getKeyword() { - return "$near"; - } - }; - - public abstract String getKeyword(); - } - - private double maxDistance; - - - public GeoMongoDBStorageStrategy(double maxDistance) { - this.maxDistance = maxDistance; - } - - public void createIndices(DBCollection coll){ - coll.createIndex("{" + GEO + " : \"2dsphere\"" ); - } - - public DBObject getQuery(StatementContraints contraints, Geometry geo, GeoQueryType queryType) { - BasicDBObject query; - if (queryType.equals(GeoQueryType.EQUALS)){ - List<double[]> points = getCorrespondingPoints(geo); - if (points.size() == 1){ - List circle = new ArrayList(); - circle.add(points.get(0)); - circle.add(maxDistance); - BasicDBObject polygon = new BasicDBObject("$centerSphere", circle); - query = new BasicDBObject(GEO, new BasicDBObject(GeoQueryType.WITHIN.getKeyword(), polygon)); - }else { - query = new BasicDBObject(GEO, points); - } - - } - else { - query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), new BasicDBObject("$polygon", getCorrespondingPoints(geo)))); - } - if (contraints.hasSubject()){ - query.append(SUBJECT, contraints.getSubject().toString()); - } - if (contraints.hasPredicates()){ - Set<URI> predicates = contraints.getPredicates(); - if (predicates.size() > 1){ - BasicDBList or = new BasicDBList(); - for (URI pred : predicates){ - DBObject currentPred = new BasicDBObject(PREDICATE, pred.toString()); - or.add(currentPred); - } - query.append("$or", or); - } - else if (!predicates.isEmpty()){ - query.append(PREDICATE, predicates.iterator().next().toString()); - } - } - if (contraints.hasContext()){ - query.append(CONTEXT, contraints.getContext().toString()); - } - - return query; - } - - public DBObject serialize(Statement statement) throws ParseException{ - // if the object is wkt, then try to index it - // write the statement data to the fields - Geometry geo = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); - if(geo == null || geo.isEmpty() || !geo.isValid()) { - throw new ParseException("Could not create geometry for statement " + statement); - } - RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); - BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); - base.append(GEO, getCorrespondingPoints(geo)); - return base; - - } - - private List<double[]> getCorrespondingPoints(Geometry geo){ - List<double[]> points = new ArrayList<double[]>(); - for (Coordinate coord : geo.getCoordinates()){ - points.add(new double[] { - coord.x, coord.y - }); - } - return points; - - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/IndexingMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/IndexingMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/IndexingMongoDBStorageStrategy.java new file mode 100644 index 0000000..2b73e6e --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/IndexingMongoDBStorageStrategy.java @@ -0,0 +1,57 @@ +package mvm.rya.indexing.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Set; + +import org.openrdf.model.URI; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.QueryBuilder; + +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; + +public class IndexingMongoDBStorageStrategy extends SimpleMongoDBStorageStrategy { + public DBObject getQuery(final StatementConstraints contraints) { + final QueryBuilder queryBuilder = QueryBuilder.start(); + if (contraints.hasSubject()){ + queryBuilder.and(new BasicDBObject(SUBJECT, contraints.getSubject().toString())); + } + + if (contraints.hasPredicates()){ + final Set<URI> predicates = contraints.getPredicates(); + if (predicates.size() > 1){ + for (final URI pred : predicates){ + final DBObject currentPred = new BasicDBObject(PREDICATE, pred.toString()); + queryBuilder.or(currentPred); + } + } + else if (!predicates.isEmpty()){ + queryBuilder.and(new BasicDBObject(PREDICATE, predicates.iterator().next().toString())); + } + } + if (contraints.hasContext()){ + queryBuilder.and(new BasicDBObject(CONTEXT, contraints.getContext().toString())); + } + return queryBuilder.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoFreeTextIndexer.java deleted file mode 100644 index 3908eb3..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoFreeTextIndexer.java +++ /dev/null @@ -1,236 +0,0 @@ -package mvm.rya.indexing.mongodb; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Set; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.mongodb.MongoDBRdfConfiguration; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.MongoCredential; -import com.mongodb.ServerAddress; - -import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; - -public class MongoFreeTextIndexer extends AbstractMongoIndexer implements FreeTextIndexer { - - private static final Logger logger = Logger - .getLogger(MongoFreeTextIndexer.class); - - private TextMongoDBStorageStrategy storageStrategy; - private MongoClient mongoClient; - private DB db; - private DBCollection coll; - private Set<URI> predicates; - private Configuration conf; - private boolean isInit = false; - private String tableName = ""; - - private void init() throws IOException{ - boolean useMongoTest = conf.getBoolean(MongoDBRdfConfiguration.USE_TEST_MONGO, false); - if (useMongoTest) { - boolean initializedClient = false; - if (conf instanceof MongoDBRdfConfiguration){ - MongoDBRdfConfiguration castedConf = (MongoDBRdfConfiguration) conf; - if (castedConf.getMongoClient() != null){ - this.mongoClient = castedConf.getMongoClient(); - initializedClient = true; - } - } - if (!initializedClient){ - MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION); - mongoClient = testsFactory.newMongo(); - int port = mongoClient.getServerAddressList().get(0).getPort(); - conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(port)); - } } else { - ServerAddress server = new ServerAddress(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), - Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT))); - if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) { - MongoCredential cred = MongoCredential.createCredential( - conf.get(MongoDBRdfConfiguration.MONGO_USER), - conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME), - conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD).toCharArray()); - mongoClient = new MongoClient(server, Arrays.asList(cred)); - } else { - mongoClient = new MongoClient(server); - } - } - predicates = ConfigUtils.getFreeTextPredicates(conf); - tableName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME); - db = mongoClient.getDB(tableName); - coll = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya")); - storageStrategy = new TextMongoDBStorageStrategy(); - storageStrategy.createIndices(coll); - } - - @Override - public String getTableName() { - return tableName; - } - - @Override - public Configuration getConf() { - return conf; - } - - // setConf initializes because index is created via reflection - @Override - public void setConf(Configuration conf) { - this.conf = conf; - if (!isInit) { - try { - init(); - isInit = true; - } catch (IOException e) { - logger.warn( - "Unable to initialize index. Throwing Runtime Exception. ", - e); - throw new RuntimeException(e); - } - } - } - - private void storeStatement(Statement statement) throws IOException { - // if this is a valid predicate and a valid geometry - boolean isValidPredicate = predicates.isEmpty() - || predicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - - // add it to the collection - try { - DBObject obj = storageStrategy.serialize(statement); - if (obj != null) { - DBObject query = storageStrategy - .getQuery(RdfToRyaConversions - .convertStatement(statement)); - coll.update(query, obj, true, false); - } - } catch (com.mongodb.MongoException.DuplicateKey exception) { - // ignore - } catch (com.mongodb.DuplicateKeyException exception) { - // ignore - } catch (Exception ex) { - // ignore single exceptions - ex.printStackTrace(); - } - } - } - - @Override - public void storeStatement(RyaStatement statement) throws IOException { - storeStatement(RyaToRdfConversions.convertStatement(statement)); - } - - - private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper( - final DBObject query, final DBCollection coll, - final TextMongoDBStorageStrategy storageStrategy) { - - return new CloseableIteration<Statement, QueryEvaluationException>() { - - private DBCursor cursor = null; - - private DBCursor getIterator() throws QueryEvaluationException { - if (cursor == null) { - cursor = coll.find(query); - } - return cursor; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - return getIterator().hasNext(); - } - - @Override - public Statement next() throws QueryEvaluationException { - DBObject feature = getIterator().next(); - RyaStatement statement = storageStrategy - .deserializeDBObject(feature); - return RyaToRdfConversions.convertStatement(statement); - } - - @Override - public void remove() { - throw new UnsupportedOperationException( - "Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - getIterator().close(); - } - }; - } - - @Override - public Set<URI> getIndexablePredicates() { - return predicates; - } - - @Override - public void flush() throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void close() throws IOException { - mongoClient.close(); - } - - @Override - public void deleteStatement(RyaStatement stmt) throws IOException { - DBObject obj = storageStrategy.getQuery(stmt); - coll.remove(obj); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryText( - String query, StatementContraints contraints) throws IOException { - DBObject obj = storageStrategy.getQuery(contraints, query); - long count = coll.count(obj); - return getIteratorWrapper(obj, coll, storageStrategy); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java deleted file mode 100644 index 9aab6cd..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java +++ /dev/null @@ -1,295 +0,0 @@ -package mvm.rya.indexing.mongodb; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.Set; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.mongodb.GeoMongoDBStorageStrategy.GeoQueryType; -import mvm.rya.mongodb.MongoDBRdfConfiguration; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.MongoCredential; -import com.mongodb.ServerAddress; -import com.vividsolutions.jts.geom.Geometry; - -import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; - -public class MongoGeoIndexer extends AbstractMongoIndexer implements GeoIndexer { - - private static final Logger logger = Logger - .getLogger(MongoGeoIndexer.class); - - private GeoMongoDBStorageStrategy storageStrategy; - private MongoClient mongoClient; - private DB db; - private DBCollection coll; - private Set<URI> predicates; - private Configuration conf; - private boolean isInit = false; - private String tableName = ""; - - private void init() throws NumberFormatException, IOException{ - boolean useMongoTest = conf.getBoolean(MongoDBRdfConfiguration.USE_TEST_MONGO, false); - if (useMongoTest) { - boolean initializedClient = false; - if (conf instanceof MongoDBRdfConfiguration){ - MongoDBRdfConfiguration castedConf = (MongoDBRdfConfiguration) conf; - if (castedConf.getMongoClient() != null){ - this.mongoClient = castedConf.getMongoClient(); - initializedClient = true; - } - } - if (!initializedClient){ - MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION); - mongoClient = testsFactory.newMongo(); - int port = mongoClient.getServerAddressList().get(0).getPort(); - conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(port)); - } - - } else { - ServerAddress server = new ServerAddress(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), - Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT))); - if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null) { - MongoCredential cred = MongoCredential.createCredential( - conf.get(MongoDBRdfConfiguration.MONGO_USER), - conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME), - conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD).toCharArray()); - mongoClient = new MongoClient(server, Arrays.asList(cred)); - } else { - mongoClient = new MongoClient(server); - } - } - predicates = ConfigUtils.getGeoPredicates(conf); - tableName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME); - db = mongoClient.getDB(tableName); - coll = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya")); - storageStrategy = new GeoMongoDBStorageStrategy(Double.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_GEO_MAXDISTANCE, "1e-10"))); - } - - @Override - public String getTableName() { - return tableName; - } - - @Override - public Configuration getConf() { - return conf; - } - - // setConf initializes because index is created via reflection - @Override - public void setConf(Configuration conf) { - this.conf = conf; - if (!isInit) { - try { - init(); - isInit = true; - } catch (NumberFormatException e) { - logger.warn( - "Unable to initialize index. Throwing Runtime Exception. ", - e); - throw new RuntimeException(e); - } catch (IOException e) { - logger.warn( - "Unable to initialize index. Throwing Runtime Exception. ", - e); - throw new RuntimeException(e); - } - } - } - - private void storeStatement(Statement statement) throws IOException { - // if this is a valid predicate and a valid geometry - boolean isValidPredicate = predicates.isEmpty() - || predicates.contains(statement.getPredicate()); - - if (isValidPredicate && (statement.getObject() instanceof Literal)) { - - // add it to the collection - try { - DBObject obj = storageStrategy.serialize(statement); - if (obj != null) { - DBObject query = storageStrategy - .getQuery(RdfToRyaConversions - .convertStatement(statement)); - coll.update(query, obj, true, false); - } - } catch (com.mongodb.MongoException.DuplicateKey exception) { - // ignore - } catch (com.mongodb.DuplicateKeyException exception) { - // ignore - } catch (Exception ex) { - // ignore single exceptions - ex.printStackTrace(); - } - } - } - - @Override - public void storeStatement(RyaStatement statement) throws IOException { - storeStatement(RyaToRdfConversions.convertStatement(statement)); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryEquals( - Geometry query, StatementContraints contraints) { - DBObject queryObj = storageStrategy.getQuery(contraints, query, - GeoQueryType.EQUALS); - return getIteratorWrapper(queryObj, coll, storageStrategy); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint( - Geometry query, StatementContraints contraints) { - throw new UnsupportedOperationException( - "Disjoint queries are not supported in Mongo DB."); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntersects( - Geometry query, StatementContraints contraints) { - DBObject queryObj = storageStrategy.getQuery(contraints, query, - GeoQueryType.INTERSECTS); - return getIteratorWrapper(queryObj, coll, storageStrategy); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryTouches( - Geometry query, StatementContraints contraints) { - throw new UnsupportedOperationException( - "Touches queries are not supported in Mongo DB."); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryCrosses( - Geometry query, StatementContraints contraints) { - throw new UnsupportedOperationException( - "Crosses queries are not supported in Mongo DB."); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryWithin( - Geometry query, StatementContraints contraints) { - DBObject queryObj = storageStrategy.getQuery(contraints, query, - GeoQueryType.WITHIN); - return getIteratorWrapper(queryObj, coll, storageStrategy); - } - - private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper( - final DBObject query, final DBCollection coll, - final GeoMongoDBStorageStrategy storageStrategy) { - - return new CloseableIteration<Statement, QueryEvaluationException>() { - - private DBCursor cursor = null; - - private DBCursor getIterator() throws QueryEvaluationException { - if (cursor == null) { - cursor = coll.find(query); - } - return cursor; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - return getIterator().hasNext(); - } - - @Override - public Statement next() throws QueryEvaluationException { - DBObject feature = getIterator().next(); - RyaStatement statement = storageStrategy - .deserializeDBObject(feature); - return RyaToRdfConversions.convertStatement(statement); - } - - @Override - public void remove() { - throw new UnsupportedOperationException( - "Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - getIterator().close(); - } - }; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryContains( - Geometry query, StatementContraints contraints) { - throw new UnsupportedOperationException( - "Contains queries are not supported in Mongo DB."); - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps( - Geometry query, StatementContraints contraints) { - throw new UnsupportedOperationException( - "Overlaps queries are not supported in Mongo DB."); - } - - @Override - public Set<URI> getIndexablePredicates() { - return predicates; - } - - @Override - public void flush() throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void close() throws IOException { - mongoClient.close(); - } - - @Override - public void deleteStatement(RyaStatement stmt) throws IOException { - DBObject obj = storageStrategy.getQuery(stmt); - coll.remove(obj); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java deleted file mode 100644 index da49904..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java +++ /dev/null @@ -1,361 +0,0 @@ -package mvm.rya.indexing.mongodb; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Map; -import java.util.Set; - -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.IndexingExpr; -import mvm.rya.indexing.IteratorFactory; -import mvm.rya.indexing.SearchFunction; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.accumulo.geo.GeoConstants; -import mvm.rya.indexing.accumulo.geo.GeoTupleSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.vividsolutions.jts.geom.Geometry; -import com.vividsolutions.jts.io.ParseException; -import com.vividsolutions.jts.io.WKTReader; - -public class MongoGeoTupleSet extends ExternalTupleSet { - - private Configuration conf; - private GeoIndexer geoIndexer; - private IndexingExpr filterInfo; - - - public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) { - this.filterInfo = filterInfo; - this.geoIndexer = geoIndexer; - this.conf = geoIndexer.getConf(); - } - - @Override - public Set<String> getBindingNames() { - return filterInfo.getBindingNames(); - } - - public GeoTupleSet clone() { - return new GeoTupleSet(filterInfo, geoIndexer); - } - - @Override - public double cardinality() { - return 0.0; // No idea how the estimate cardinality here. - } - - - @Override - public String getSignature() { - return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); - } - - - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if (!(other instanceof MongoGeoTupleSet)) { - return false; - } - MongoGeoTupleSet arg = (MongoGeoTupleSet) other; - return this.filterInfo.equals(arg.filterInfo); - } - - @Override - public int hashCode() { - int result = 17; - result = 31*result + filterInfo.hashCode(); - - return result; - } - - - - /** - * Returns an iterator over the result set of the contained IndexingExpr. - * <p> - * Should be thread-safe (concurrent invocation {@link OfflineIterable} this - * method can be expected with some query evaluators. - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) - throws QueryEvaluationException { - - - URI funcURI = filterInfo.getFunction(); - SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI); - if(filterInfo.getArguments().length > 1) { - throw new IllegalArgumentException("Index functions do not support more than two arguments."); - } - - String queryText = filterInfo.getArguments()[0].stringValue(); - - return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); - } - - - - //returns appropriate search function for a given URI - //search functions used in GeoMesaGeoIndexer to access index - public class MongoGeoSearchFunctionFactory { - - Configuration conf; - - private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - - public MongoGeoSearchFunctionFactory(Configuration conf) { - this.conf = conf; - } - - - /** - * Get a {@link GeoSearchFunction} for a given URI. - * - * @param searchFunction - * @return - */ - public SearchFunction getSearchFunction(final URI searchFunction) { - - SearchFunction geoFunc = null; - - try { - geoFunc = getSearchFunctionInternal(searchFunction); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } - - return geoFunc; - } - - private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); - - if (sf != null) { - return sf; - } else { - throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); - } - } - - private final SearchFunction GEO_EQUALS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_EQUALS"; - }; - }; - - private final SearchFunction GEO_DISJOINT = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_DISJOINT"; - }; - }; - - private final SearchFunction GEO_INTERSECTS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_INTERSECTS"; - }; - }; - - private final SearchFunction GEO_TOUCHES = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_TOUCHES"; - }; - }; - - private final SearchFunction GEO_CONTAINS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_CONTAINS"; - }; - }; - - private final SearchFunction GEO_OVERLAPS = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_OVERLAPS"; - }; - }; - - private final SearchFunction GEO_CROSSES = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_CROSSES"; - }; - }; - - private final SearchFunction GEO_WITHIN = new SearchFunction() { - - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, - StatementContraints contraints) throws QueryEvaluationException { - try { - WKTReader reader = new WKTReader(); - Geometry geometry = reader.read(queryText); - CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( - geometry, contraints); - return statements; - } catch (ParseException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public String toString() { - return "GEO_WITHIN"; - }; - }; - - { - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); - SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/TextMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/TextMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/TextMongoDBStorageStrategy.java deleted file mode 100644 index fe8442a..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/TextMongoDBStorageStrategy.java +++ /dev/null @@ -1,89 +0,0 @@ -package mvm.rya.indexing.mongodb; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Set; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; - -import org.openrdf.model.Statement; -import org.openrdf.model.URI; - -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.mongodb.QueryBuilder; -import com.vividsolutions.jts.io.ParseException; - -public class TextMongoDBStorageStrategy extends SimpleMongoDBStorageStrategy{ - - private static final String text = "text"; - - public void createIndices(DBCollection coll){ - BasicDBObject basicDBObject = new BasicDBObject(); - basicDBObject.append(text, "text"); - coll.createIndex(basicDBObject); - - } - - public DBObject getQuery(StatementContraints contraints, String textquery) { - // TODO right now assuming the query string is a valid mongo query, this is - // not the case. Should reuse the Accumulo free text parsing of the query string to make sure - // that behavior is consistent across Accumulo vs Mongo - QueryBuilder queryBuilder = QueryBuilder.start().text(textquery); - - if (contraints.hasSubject()){ - queryBuilder.and(new BasicDBObject(SUBJECT, contraints.getSubject().toString())); - } - if (contraints.hasPredicates()){ - Set<URI> predicates = contraints.getPredicates(); - if (predicates.size() > 1){ - BasicDBList or = new BasicDBList(); - for (URI pred : predicates){ - DBObject currentPred = new BasicDBObject(PREDICATE, pred.toString()); - or.add(currentPred); - } - queryBuilder.or(or); - } - else if (!predicates.isEmpty()){ - queryBuilder.and(new BasicDBObject(PREDICATE, predicates.iterator().next().toString())); - } - } - if (contraints.hasContext()){ - queryBuilder.and(new BasicDBObject(CONTEXT, contraints.getContext().toString())); - } - return queryBuilder.get(); - } - - public DBObject serialize(Statement statement) throws ParseException{ - - RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); - BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); - base.append(text, ryaStatement.getObject().getData()); - return base; - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/MongoFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/MongoFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/MongoFreeTextIndexer.java new file mode 100644 index 0000000..f8dc820 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/MongoFreeTextIndexer.java @@ -0,0 +1,62 @@ +package mvm.rya.indexing.mongodb.freetext; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.query.QueryEvaluationException; + +import com.mongodb.MongoClient; +import com.mongodb.QueryBuilder; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.mongodb.AbstractMongoIndexer; + +public class MongoFreeTextIndexer extends AbstractMongoIndexer<TextMongoDBStorageStrategy> implements FreeTextIndexer { + private static final Logger logger = Logger.getLogger(MongoFreeTextIndexer.class); + + public MongoFreeTextIndexer(final MongoClient mongoClient) { + super(mongoClient); + } + + @Override + protected void init() throws IOException{ + super.init(); + predicates = ConfigUtils.getFreeTextPredicates(conf); + storageStrategy = new TextMongoDBStorageStrategy(); + storageStrategy.createIndices(collection); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryText( + final String query, final StatementConstraints constraints) throws IOException { + final QueryBuilder qb = QueryBuilder.start().text(query); + return withConstraints(constraints, qb.get()); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getFreeTextDocTablename(conf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/TextMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/TextMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/TextMongoDBStorageStrategy.java new file mode 100644 index 0000000..cc5029c --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/freetext/TextMongoDBStorageStrategy.java @@ -0,0 +1,45 @@ +package mvm.rya.indexing.mongodb.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; + +public class TextMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { + private static final String text = "text"; + + @Override + public void createIndices(final DBCollection coll){ + final BasicDBObject basicDBObject = new BasicDBObject(); + basicDBObject.append(text, "text"); + coll.createIndex(basicDBObject); + } + + @Override + public DBObject serialize(final RyaStatement ryaStatement) { + final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); + base.append(text, ryaStatement.getObject().getData()); + return base; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java new file mode 100644 index 0000000..9411330 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java @@ -0,0 +1,143 @@ +package mvm.rya.indexing.mongodb.geo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.accumulo.geo.GeoParseUtils; +import mvm.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; + +public class GeoMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { + private static final Logger LOG = Logger.getLogger(GeoMongoDBStorageStrategy.class); + + private static final String GEO = "location"; + public enum GeoQueryType { + INTERSECTS { + @Override + public String getKeyword() { + return "$geoIntersects"; + } + }, WITHIN { + @Override + public String getKeyword() { + return "$geoWithin"; + } + }, + EQUALS { + @Override + public String getKeyword() { + return "$near"; + } + }; + + public abstract String getKeyword(); + } + + static class GeoQuery { + private final GeoQueryType queryType; + private final Geometry geo; + + public GeoQuery(final GeoQueryType queryType, final Geometry geo) { + this.queryType = queryType; + this.geo = geo; + } + + public GeoQueryType getQueryType() { + return queryType; + } + public Geometry getGeo() { + return geo; + } + } + + private final double maxDistance; + + public GeoMongoDBStorageStrategy(final double maxDistance) { + this.maxDistance = maxDistance; + } + + @Override + public void createIndices(final DBCollection coll){ + coll.createIndex("{" + GEO + " : \"2dsphere\"" ); + } + + public DBObject getQuery(final GeoQuery queryObj) { + final Geometry geo = queryObj.getGeo(); + final GeoQueryType queryType = queryObj.getQueryType(); + + BasicDBObject query; + if (queryType.equals(GeoQueryType.EQUALS)){ + final List<double[]> points = getCorrespondingPoints(geo); + if (points.size() == 1){ + final List circle = new ArrayList(); + circle.add(points.get(0)); + circle.add(maxDistance); + final BasicDBObject polygon = new BasicDBObject("$centerSphere", circle); + query = new BasicDBObject(GEO, new BasicDBObject(GeoQueryType.WITHIN.getKeyword(), polygon)); + } else { + query = new BasicDBObject(GEO, points); + } + } else { + query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), new BasicDBObject("$polygon", getCorrespondingPoints(geo)))); + } + + return query; + } + + @Override + public DBObject serialize(final RyaStatement ryaStatement) { + // if the object is wkt, then try to index it + // write the statement data to the fields + try { + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + final Geometry geo = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); + final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); + base.append(GEO, getCorrespondingPoints(geo)); + return base; + } catch(final ParseException e) { + LOG.error("Could not create geometry for statement " + ryaStatement, e); + return null; + } + } + + private List<double[]> getCorrespondingPoints(final Geometry geo){ + final List<double[]> points = new ArrayList<double[]>(); + for (final Coordinate coord : geo.getCoordinates()){ + points.add(new double[] { + coord.x, coord.y + }); + } + return points; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoIndexer.java new file mode 100644 index 0000000..6076fb2 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoIndexer.java @@ -0,0 +1,118 @@ +package mvm.rya.indexing.mongodb.geo; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static mvm.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.EQUALS; +import static mvm.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.INTERSECTS; +import static mvm.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQueryType.WITHIN; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.query.QueryEvaluationException; + +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.vividsolutions.jts.geom.Geometry; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.mongodb.AbstractMongoIndexer; +import mvm.rya.indexing.mongodb.geo.GeoMongoDBStorageStrategy.GeoQuery; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +public class MongoGeoIndexer extends AbstractMongoIndexer<GeoMongoDBStorageStrategy> implements GeoIndexer { + private static final Logger logger = Logger.getLogger(MongoGeoIndexer.class); + + public MongoGeoIndexer(final MongoClient mongoClient) { + super(mongoClient); + } + + @Override + protected void init() throws NumberFormatException, IOException{ + super.init(); + predicates = ConfigUtils.getGeoPredicates(conf); + storageStrategy = new GeoMongoDBStorageStrategy(Double.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_GEO_MAXDISTANCE, "1e-10"))); + storageStrategy.createIndices(collection); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals( + final Geometry query, final StatementConstraints constraints) { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(EQUALS, query)); + return withConstraints(constraints, queryObj); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Disjoint queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects( + final Geometry query, final StatementConstraints constraints) { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(INTERSECTS, query)); + return withConstraints(constraints, queryObj); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Touches queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Crosses queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin( + final Geometry query, final StatementConstraints constraints) { + final DBObject queryObj = storageStrategy.getQuery(new GeoQuery(WITHIN, query)); + return withConstraints(constraints, queryObj); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Contains queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps( + final Geometry query, final StatementConstraints constraints) { + throw new UnsupportedOperationException( + "Overlaps queries are not supported in Mongo DB."); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getGeoTablename(conf); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoTupleSet.java new file mode 100644 index 0000000..3ab9037 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/geo/MongoGeoTupleSet.java @@ -0,0 +1,361 @@ +package mvm.rya.indexing.mongodb.geo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +import java.util.Map; +import java.util.Set; + +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.IndexingExpr; +import mvm.rya.indexing.IteratorFactory; +import mvm.rya.indexing.SearchFunction; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.accumulo.geo.GeoConstants; +import mvm.rya.indexing.accumulo.geo.GeoTupleSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +public class MongoGeoTupleSet extends ExternalTupleSet { + + private Configuration conf; + private GeoIndexer geoIndexer; + private IndexingExpr filterInfo; + + + public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) { + this.filterInfo = filterInfo; + this.geoIndexer = geoIndexer; + this.conf = geoIndexer.getConf(); + } + + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + public GeoTupleSet clone() { + return new GeoTupleSet(filterInfo, geoIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + @Override + public String getSignature() { + return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + } + + + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof MongoGeoTupleSet)) { + return false; + } + MongoGeoTupleSet arg = (MongoGeoTupleSet) other; + return this.filterInfo.equals(arg.filterInfo); + } + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + + /** + * Returns an iterator over the result set of the contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + + + URI funcURI = filterInfo.getFunction(); + SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI); + if(filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + String queryText = filterInfo.getArguments()[0].stringValue(); + + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + + + + //returns appropriate search function for a given URI + //search functions used in GeoMesaGeoIndexer to access index + public class MongoGeoSearchFunctionFactory { + + Configuration conf; + + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + + public MongoGeoSearchFunctionFactory(Configuration conf) { + this.conf = conf; + } + + + /** + * Get a {@link GeoSearchFunction} for a given URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + + SearchFunction geoFunc = null; + + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + } + + private final SearchFunction GEO_EQUALS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_EQUALS"; + }; + }; + + private final SearchFunction GEO_DISJOINT = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_DISJOINT"; + }; + }; + + private final SearchFunction GEO_INTERSECTS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_INTERSECTS"; + }; + }; + + private final SearchFunction GEO_TOUCHES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_TOUCHES"; + }; + }; + + private final SearchFunction GEO_CONTAINS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CONTAINS"; + }; + }; + + private final SearchFunction GEO_OVERLAPS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_OVERLAPS"; + }; + }; + + private final SearchFunction GEO_CROSSES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CROSSES"; + }; + }; + + private final SearchFunction GEO_WITHIN = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_WITHIN"; + }; + }; + + { + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/MongoTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/MongoTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/MongoTemporalIndexer.java new file mode 100644 index 0000000..6b34ab8 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/MongoTemporalIndexer.java @@ -0,0 +1,160 @@ +package mvm.rya.indexing.mongodb.temporal; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static mvm.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INSTANT; +import static mvm.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_END; +import static mvm.rya.indexing.mongodb.temporal.TemporalMongoDBStorageStrategy.INTERVAL_START; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.annotations.VisibleForTesting; +import com.mongodb.DBCollection; +import com.mongodb.MongoClient; +import com.mongodb.QueryBuilder; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.TemporalIndexer; +import mvm.rya.indexing.TemporalInstant; +import mvm.rya.indexing.TemporalInterval; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.mongodb.AbstractMongoIndexer; + +/** + * Indexes MongoDB based on time instants or intervals. + */ +public class MongoTemporalIndexer extends AbstractMongoIndexer<TemporalMongoDBStorageStrategy> implements TemporalIndexer { + private static final Logger LOG = Logger.getLogger(MongoTemporalIndexer.class); + + /** + * Creates a new {@link MongoTemporalIndexer} + * @param mongoClient - The {@link MongoClient} used to interact with MongoDB. + */ + public MongoTemporalIndexer(final MongoClient mongoClient) { + super(mongoClient); + } + + @Override + protected void init() throws IOException{ + super.init(); + predicates = ConfigUtils.getTemporalPredicates(conf); + storageStrategy = new TemporalMongoDBStorageStrategy(); + storageStrategy.createIndices(collection); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant( + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INSTANT) + .is(queryInstant.getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant( + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INSTANT) + .lessThan(queryInstant.getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant( + final TemporalInstant queryInstant, final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INSTANT) + .greaterThan(queryInstant.getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval( + final TemporalInterval givenInterval, final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INSTANT) + .lessThan(givenInterval.getHasBeginning().getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval( + final TemporalInterval givenInterval, final StatementConstraints constraints) throws QueryEvaluationException { + return queryInstantAfterInstant(givenInterval.getHasEnd(), constraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( + final TemporalInterval givenInterval, final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INSTANT) + .greaterThan(givenInterval.getHasBeginning().getAsDateTime().toDate()) + .lessThan(givenInterval.getHasEnd().getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval( + final TemporalInterval queryInterval, final StatementConstraints constraints) throws QueryEvaluationException { + return queryInstantEqualsInstant(queryInterval.getHasBeginning(), constraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval( + final TemporalInterval queryInterval, final StatementConstraints constraints) throws QueryEvaluationException { + return queryInstantEqualsInstant(queryInterval.getHasEnd(), constraints); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals(final TemporalInterval query, + final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INTERVAL_START) + .is(query.getHasBeginning().getAsDateTime().toDate()) + .and(INTERVAL_END) + .is(query.getHasEnd().getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore(final TemporalInterval query, + final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INTERVAL_END) + .lessThan(query.getHasBeginning().getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter(final TemporalInterval query, + final StatementConstraints constraints) throws QueryEvaluationException { + final QueryBuilder qb = QueryBuilder.start(INTERVAL_START) + .greaterThan(query.getHasEnd().getAsDateTime().toDate()); + return withConstraints(constraints, qb.get()); + } + + @Override + public String getCollectionName() { + return ConfigUtils.getTemporalTableName(conf); + } + + @VisibleForTesting + public DBCollection getCollection() { + return collection; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/917e7a57/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java new file mode 100644 index 0000000..3292685 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java @@ -0,0 +1,68 @@ +package mvm.rya.indexing.mongodb.temporal; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.regex.Matcher; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.indexing.TemporalInstantRfc3339; +import mvm.rya.indexing.TemporalInterval; +import mvm.rya.indexing.mongodb.IndexingMongoDBStorageStrategy; + +/** + * Defines how time based intervals/instants are stored in MongoDB. + * <p> + * Time can be stored as the following: + * <p> + * <li><b>instant</b> {[statement], instant: TIME}</li> + * <li><b>interval</b> {[statement], start: TIME, end: TIME}</li> + * @see {@link TemporalInstantRfc3339} for how the dates are formatted. + */ +public class TemporalMongoDBStorageStrategy extends IndexingMongoDBStorageStrategy { + public static final String INTERVAL_START = "start"; + public static final String INTERVAL_END = "end"; + public static final String INSTANT = "instant"; + + @Override + public void createIndices(final DBCollection coll){ + coll.createIndex(INTERVAL_START); + coll.createIndex(INTERVAL_END); + coll.createIndex(INSTANT); + } + + @Override + public DBObject serialize(final RyaStatement ryaStatement) { + final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement); + final String objString = ryaStatement.getObject().getData(); + final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(objString); + if(match.find()) { + final TemporalInterval date = TemporalInstantRfc3339.parseInterval(ryaStatement.getObject().getData()); + base.append(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate()); + base.append(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate()); + } else { + base.append(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(objString).toDate()); + } + return base; + } +} \ No newline at end of file
