Repository: incubator-rya Updated Branches: refs/heads/develop 42895eac0 -> dbd46e7a7
RYA-130 Fixed Rya Mongo Multi Binding Set Join Fixed bud in Mongo Query Engine; added test Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/dbd46e7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/dbd46e7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/dbd46e7a Branch: refs/heads/develop Commit: dbd46e7a7f4bacdc59a7ffb3c1b29b082df0bd99 Parents: 42895ea Author: Aaron Mihalik <[email protected]> Authored: Fri Jul 8 19:01:19 2016 -0400 Committer: pujav65 <[email protected]> Committed: Thu Jul 21 08:56:02 2016 -0400 ---------------------------------------------------------------------- .../mvm/rya/mongodb/MongoDBQueryEngine.java | 5 +- .../java/mvm/rya/mongodb/MongoDBRyaDAO.java | 1 + .../RyaStatementBindingSetCursorIterator.java | 59 +++++---- .../mvm/rya/mongodb/MongoDBQueryEngineTest.java | 124 +++++++++++++++++++ 4 files changed, 167 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dbd46e7a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java index dfaed00..afa0a77 100644 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java @@ -50,6 +50,9 @@ import mvm.rya.mongodb.iter.RyaStatementBindingSetCursorIterator; import mvm.rya.mongodb.iter.RyaStatementCursorIterable; import mvm.rya.mongodb.iter.RyaStatementCursorIterator; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + /** * Date: 7/17/12 * Time: 9:28 AM @@ -105,7 +108,7 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio conf = configuration; } final Long maxResults = conf.getLimit(); - final Map<DBObject, BindingSet> rangeMap = new HashMap<DBObject, BindingSet>(); + final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create(); //TODO: cannot span multiple tables here try { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dbd46e7a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java index 5695d8c..15537e5 100644 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRyaDAO.java @@ -72,6 +72,7 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ * @throws RyaDAOException */ public MongoDBRyaDAO(final MongoDBRdfConfiguration conf) throws RyaDAOException, NumberFormatException, UnknownHostException { + this.conf = conf; try { mongoClient = MongoConnectorFactory.getMongoClient(conf); conf.setMongoClient(mongoClient); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dbd46e7a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java index ce21ff7..d24cbdc 100644 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java @@ -22,8 +22,8 @@ package mvm.rya.mongodb.iter; import info.aduna.iteration.CloseableIteration; +import java.util.Collection; import java.util.Iterator; -import java.util.Map; import java.util.Map.Entry; import mvm.rya.api.RdfCloudTripleStoreUtils; @@ -33,6 +33,7 @@ import mvm.rya.mongodb.dao.MongoDBStorageStrategy; import org.openrdf.query.BindingSet; +import com.google.common.collect.Multimap; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; @@ -40,15 +41,17 @@ import com.mongodb.DBObject; public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> { private DBCollection coll; - private Map<DBObject, BindingSet> rangeMap; + private Multimap<DBObject, BindingSet> rangeMap; private Iterator<DBObject> queryIterator; private Long maxResults; - private DBCursor currentCursor; - private BindingSet currentBindingSet; + private DBCursor resultCursor; + private RyaStatement currentStatement; + private Collection<BindingSet> currentBindingSetCollection; + private Iterator<BindingSet> currentBindingSetIterator; private MongoDBStorageStrategy strategy; public RyaStatementBindingSetCursorIterator(DBCollection coll, - Map<DBObject, BindingSet> rangeMap, MongoDBStorageStrategy strategy) { + Multimap<DBObject, BindingSet> rangeMap, MongoDBStorageStrategy strategy) { this.coll = coll; this.rangeMap = rangeMap; this.queryIterator = rangeMap.keySet().iterator(); @@ -57,37 +60,51 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration< @Override public boolean hasNext() { - if (!currentCursorIsValid()) { - findNextValidCursor(); + if (!currentBindingSetIteratorIsValid()) { + findNextResult(); } - return currentCursorIsValid(); + return currentBindingSetIteratorIsValid(); } @Override public Entry<RyaStatement, BindingSet> next() { - if (!currentCursorIsValid()) { - findNextValidCursor(); + if (!currentBindingSetIteratorIsValid()) { + findNextResult(); } - if (currentCursorIsValid()) { - // convert to Rya Statement - DBObject queryResult = currentCursor.next(); - RyaStatement statement = strategy.deserializeDBObject(queryResult); - return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, currentBindingSet); + if (currentBindingSetIteratorIsValid()) { + BindingSet currentBindingSet = currentBindingSetIterator.next(); + return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentStatement, currentBindingSet); } return null; } - private void findNextValidCursor() { + private boolean currentBindingSetIteratorIsValid() { + return (currentBindingSetIterator != null) && currentBindingSetIterator.hasNext(); + } + + private void findNextResult() { + if (!currentResultCursorIsValid()) { + findNextValidResultCursor(); + } + if (currentResultCursorIsValid()) { + // convert to Rya Statement + DBObject queryResult = resultCursor.next(); + currentStatement = strategy.deserializeDBObject(queryResult); + currentBindingSetIterator = currentBindingSetCollection.iterator(); + } + } + + private void findNextValidResultCursor() { while (queryIterator.hasNext()){ DBObject currentQuery = queryIterator.next(); - currentCursor = coll.find(currentQuery); - currentBindingSet = rangeMap.get(currentQuery); - if (currentCursor.hasNext()) break; + resultCursor = coll.find(currentQuery); + currentBindingSetCollection = rangeMap.get(currentQuery); + if (resultCursor.hasNext()) return; } } - private boolean currentCursorIsValid() { - return (currentCursor != null) && currentCursor.hasNext(); + private boolean currentResultCursorIsValid() { + return (resultCursor != null) && resultCursor.hasNext(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/dbd46e7a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/MongoDBQueryEngineTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/MongoDBQueryEngineTest.java b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/MongoDBQueryEngineTest.java new file mode 100644 index 0000000..870115c --- /dev/null +++ b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/MongoDBQueryEngineTest.java @@ -0,0 +1,124 @@ +package mvm.rya.mongodb; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; +import mvm.rya.api.domain.RyaURI; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; +import com.mongodb.MongoClient; + +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; + +public class MongoDBQueryEngineTest { + + // private dao; + // private configuration; + + private MongoDBQueryEngine engine; + private MongoDBRdfConfiguration configuration; + + @Before + public void setUp() throws Exception { + // Set up Mongo/Rya + MongodForTestsFactory testsFactory = MongodForTestsFactory.with(Version.Main.PRODUCTION); + Configuration conf = new Configuration(); + conf.set(MongoDBRdfConfiguration.USE_TEST_MONGO, "true"); + conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test"); + conf.set(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya_"); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); + configuration = new MongoDBRdfConfiguration(conf); + MongoClient mongoClient = testsFactory.newMongo(); + int port = mongoClient.getServerAddressList().get(0).getPort(); + configuration.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(port)); + + engine = new MongoDBQueryEngine(configuration, mongoClient); + + // Add Data + MongoDBRyaDAO dao = new MongoDBRyaDAO(configuration, mongoClient); + dao.add(getStatement("u:a", "u:tt", "u:b")); + dao.add(getStatement("u:a", "u:tt", "u:c")); + } + + private RyaStatement getStatement(String s, String p, String o) { + RyaStatementBuilder builder = new RyaStatementBuilder(); + if (s != null) + builder.setSubject(new RyaURI(s)); + if (p != null) + builder.setPredicate(new RyaURI(p)); + if (o != null) + builder.setObject(new RyaURI(o)); + return builder.build(); + } + + public int size(CloseableIteration<?, ?> iter) throws Exception { + int i = 0; + while (iter.hasNext()) { + i++; + iter.next(); + } + return i; + } + + @Test + public void statementQuery() throws Exception { + RyaStatement s = getStatement("u:a", null, null); + Assert.assertEquals(2, size(engine.query(s, configuration))); + } + + @SuppressWarnings("unchecked") + @Test + public void bindingSetsQuery() throws Exception { + RyaStatement s = getStatement("u:a", null, null); + + MapBindingSet bs1 = new MapBindingSet(); + bs1.addBinding("foo", new URIImpl("u:x")); + + Map.Entry<RyaStatement, BindingSet> e1 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s, bs1); + Collection<Entry<RyaStatement, BindingSet>> stmts1 = Lists.newArrayList(e1); + Assert.assertEquals(2, size(engine.queryWithBindingSet(stmts1, configuration))); + + + MapBindingSet bs2 = new MapBindingSet(); + bs2.addBinding("foo", new URIImpl("u:y")); + + Map.Entry<RyaStatement, BindingSet> e2 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s, bs2); + + Collection<Entry<RyaStatement, BindingSet>> stmts2 = Lists.newArrayList(e1, e2); + Assert.assertEquals(4, size(engine.queryWithBindingSet(stmts2, configuration))); +} +}
