Repository: incubator-rya Updated Branches: refs/heads/master ad60aca8d -> 0d80871ff
RYA-337 Adding batch queries to MongoDB. Closes #204 Additionally, simplifying MongoDBQueryEngine Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/0d80871f Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/0d80871f Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/0d80871f Branch: refs/heads/master Commit: 0d80871ff614683fb0b4105a7778b5e56e53051c Parents: ad60aca Author: Aaron Mihalik <miha...@alum.mit.edu> Authored: Tue Aug 8 11:17:37 2017 -0400 Committer: Caleb Meier <caleb.me...@parsons.com> Committed: Fri Aug 18 17:54:17 2017 -0700 ---------------------------------------------------------------------- .../apache/rya/mongodb/MongoDBQueryEngine.java | 120 ++++++------------- .../NonCloseableRyaStatementCursorIterator.java | 57 --------- .../RyaStatementBindingSetCursorIterator.java | 115 ++++++++++++------ .../iter/RyaStatementCursorIterable.java | 67 ----------- .../iter/RyaStatementCursorIterator.java | 94 ++++----------- .../rya/mongodb/MongoDBQueryEngineTest.java | 30 +++++ 6 files changed, 165 insertions(+), 318 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java index 8932fc4..f1115b1 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java @@ -21,11 +21,13 @@ package org.apache.rya.mongodb; import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; +import java.util.AbstractMap; import java.util.Collection; -import java.util.HashSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.persist.RyaDAOException; @@ -34,19 +36,17 @@ import org.apache.rya.api.persist.query.RyaQuery; import org.apache.rya.api.persist.query.RyaQueryEngine; import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; -import org.apache.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator; import org.apache.rya.mongodb.iter.RyaStatementBindingSetCursorIterator; -import org.apache.rya.mongodb.iter.RyaStatementCursorIterable; import org.apache.rya.mongodb.iter.RyaStatementCursorIterator; import org.bson.Document; import org.calrissian.mango.collect.CloseableIterable; +import org.calrissian.mango.collect.CloseableIterables; import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -61,13 +61,10 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio private MongoDBRdfConfiguration configuration; private final MongoClient mongoClient; - private final DBCollection coll; private final MongoDBStorageStrategy<RyaStatement> strategy; public MongoDBQueryEngine(final MongoDBRdfConfiguration conf, final MongoClient mongoClient) { this.mongoClient = checkNotNull(mongoClient); - final DB db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME)); - coll = db.getCollection(conf.getTriplesCollectionName()); strategy = new SimpleMongoDBStorageStrategy(); } @@ -86,50 +83,36 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio public CloseableIteration<RyaStatement, RyaDAOException> query( final RyaStatement stmt, MongoDBRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - final Long maxResults = conf.getLimit(); - final Set<DBObject> queries = new HashSet<DBObject>(); - final DBObject query = strategy.getQuery(stmt); - queries.add(query); - final MongoDatabase db = mongoClient.getDatabase(conf.getMongoDBName()); - final MongoCollection<Document> collection = db.getCollection(conf.getTriplesCollectionName()); - final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(collection, queries, strategy, - conf.getAuthorizations()); - - if (maxResults != null) { - iterator.setMaxResults(maxResults); - } - return iterator; + Preconditions.checkNotNull(stmt); + Preconditions.checkNotNull(conf); + + Entry<RyaStatement, BindingSet> entry = new AbstractMap.SimpleEntry<>(stmt, new MapBindingSet()); + Collection<Entry<RyaStatement, BindingSet>> collection = Collections.singleton(entry); + + return new RyaStatementCursorIterator(queryWithBindingSet(collection, conf)); } @Override public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> queryWithBindingSet( final Collection<Entry<RyaStatement, BindingSet>> stmts, MongoDBRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - final Long maxResults = conf.getLimit(); - final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create(); + Preconditions.checkNotNull(stmts); + Preconditions.checkNotNull(conf); + + final Multimap<RyaStatement, BindingSet> rangeMap = HashMultimap.create(); //TODO: cannot span multiple tables here try { for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) { final RyaStatement stmt = stmtbs.getKey(); final BindingSet bs = stmtbs.getValue(); - final DBObject query = strategy.getQuery(stmt); - rangeMap.put(query, bs); + rangeMap.put(stmt, bs); } // TODO not sure what to do about regex ranges? final RyaStatementBindingSetCursorIterator iterator = new RyaStatementBindingSetCursorIterator( getCollection(conf), rangeMap, strategy, conf.getAuthorizations()); - if (maxResults != null) { - iterator.setMaxResults(maxResults); - } return iterator; } catch (final Exception e) { throw new RyaDAOException(e); @@ -140,72 +123,39 @@ public class MongoDBQueryEngine implements RyaQueryEngine<MongoDBRdfConfiguratio public CloseableIteration<RyaStatement, RyaDAOException> batchQuery( final Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf) throws RyaDAOException { - if (conf == null) { - conf = configuration; - } - final Long maxResults = conf.getLimit(); - final Set<DBObject> queries = new HashSet<DBObject>(); + final Map<RyaStatement, BindingSet> queries = new HashMap<>(); - try { - for (final RyaStatement stmt : stmts) { - queries.add( strategy.getQuery(stmt)); - } - - // TODO not sure what to do about regex ranges? - final RyaStatementCursorIterator iterator = new RyaStatementCursorIterator(getCollection(conf), queries, - strategy, configuration.getAuthorizations()); - - if (maxResults != null) { - iterator.setMaxResults(maxResults); - } - return iterator; - } catch (final Exception e) { - throw new RyaDAOException(e); + for (final RyaStatement stmt : stmts) { + queries.put(stmt, new MapBindingSet()); } + return new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), conf)); } + @Override public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery) throws RyaDAOException { - final Set<DBObject> queries = new HashSet<DBObject>(); - - try { - queries.add( strategy.getQuery(ryaQuery)); - - // TODO not sure what to do about regex ranges? - // TODO this is gross - final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable( - new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(getCollection(getConf()), - queries, strategy, configuration.getAuthorizations()))); + Preconditions.checkNotNull(ryaQuery); - return iterator; - } catch (final Exception e) { - throw new RyaDAOException(e); - } + return query(new BatchRyaQuery(Collections.singleton(ryaQuery.getQuery()))); } + @Override public CloseableIterable<RyaStatement> query(final BatchRyaQuery batchRyaQuery) throws RyaDAOException { - try { - final Set<DBObject> queries = new HashSet<DBObject>(); - for (final RyaStatement statement : batchRyaQuery.getQueries()){ - queries.add( strategy.getQuery(statement)); + Preconditions.checkNotNull(batchRyaQuery); - } + final Map<RyaStatement, BindingSet> queries = new HashMap<>(); - // TODO not sure what to do about regex ranges? - // TODO this is gross - final RyaStatementCursorIterable iterator = new RyaStatementCursorIterable( - new NonCloseableRyaStatementCursorIterator(new RyaStatementCursorIterator(getCollection(getConf()), - queries, strategy, configuration.getAuthorizations()))); - - return iterator; - } catch (final Exception e) { - throw new RyaDAOException(e); + for (final RyaStatement stmt : batchRyaQuery.getQueries()) { + queries.put(stmt, new MapBindingSet()); } + + Iterator<RyaStatement> iterator = new RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), getConf())); + return CloseableIterables.wrap((Iterable<RyaStatement>) () -> iterator); } - private MongoCollection getCollection(final MongoDBRdfConfiguration conf) { + private MongoCollection<Document> getCollection(final MongoDBRdfConfiguration conf) { final MongoDatabase db = mongoClient.getDatabase(conf.getMongoDBName()); return db.getCollection(conf.getTriplesCollectionName()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java deleted file mode 100644 index 35dab6d..0000000 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Iterator; - -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.persist.RyaDAOException; - -public class NonCloseableRyaStatementCursorIterator implements Iterator<RyaStatement> { - - RyaStatementCursorIterator iterator; - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public RyaStatement next() { - return iterator.next(); - } - - public NonCloseableRyaStatementCursorIterator( - RyaStatementCursorIterator iterator) { - this.iterator = iterator; - } - - @Override - public void remove() { - try { - iterator.remove(); - } catch (RyaDAOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java index 18f71d2..de5e8b0 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java @@ -19,21 +19,25 @@ package org.apache.rya.mongodb.iter; import java.util.ArrayList; -import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Set; import org.apache.accumulo.core.security.Authorizations; import org.apache.log4j.Logger; import org.apache.rya.api.RdfCloudTripleStoreUtils; import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil; import org.bson.Document; import org.openrdf.query.BindingSet; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; import com.mongodb.DBObject; import com.mongodb.client.AggregateIterable; @@ -44,20 +48,21 @@ import info.aduna.iteration.CloseableIteration; public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> { private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class); + + private static final int QUERY_BATCH_SIZE = 50; private final MongoCollection<Document> coll; - private final Multimap<DBObject, BindingSet> rangeMap; - private final Iterator<DBObject> queryIterator; - private Long maxResults; - private Iterator<Document> resultsIterator; - private RyaStatement currentStatement; - private Collection<BindingSet> currentBindingSetCollection; + private final Multimap<RyaStatement, BindingSet> rangeMap; + private final Multimap<RyaStatement, BindingSet> executedRangeMap = HashMultimap.create(); + private final Iterator<RyaStatement> queryIterator; + private Iterator<Document> batchQueryResultsIterator; + private RyaStatement currentResultStatement; private Iterator<BindingSet> currentBindingSetIterator; private final MongoDBStorageStrategy<RyaStatement> strategy; private final Authorizations auths; public RyaStatementBindingSetCursorIterator(final MongoCollection<Document> coll, - final Multimap<DBObject, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy, + final Multimap<RyaStatement, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy, final Authorizations auths) { this.coll = coll; this.rangeMap = rangeMap; @@ -81,7 +86,7 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration< } if (currentBindingSetIteratorIsValid()) { final BindingSet currentBindingSet = currentBindingSetIterator.next(); - return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentStatement, currentBindingSet); + return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentResultStatement, currentBindingSet); } return null; } @@ -91,47 +96,81 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration< } private void findNextResult() { - if (!currentResultCursorIsValid()) { - findNextValidResultCursor(); + if (!currentBatchQueryResultCursorIsValid()) { + submitBatchQuery(); } - if (currentResultCursorIsValid()) { + + if (currentBatchQueryResultCursorIsValid()) { // convert to Rya Statement - final Document queryResult = resultsIterator.next(); + final Document queryResult = batchQueryResultsIterator.next(); final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson()); - currentStatement = strategy.deserializeDBObject(dbo); - currentBindingSetIterator = currentBindingSetCollection.iterator(); + currentResultStatement = strategy.deserializeDBObject(dbo); + + // Find all of the queries in the executed RangeMap that this result matches + // and collect all of those binding sets + Set<BindingSet> bsList = new HashSet<>(); + for (RyaStatement executedQuery : executedRangeMap.keys()) { + if (isResultForQuery(executedQuery, currentResultStatement)) { + bsList.addAll(executedRangeMap.get(executedQuery)); + } + } + currentBindingSetIterator = bsList.iterator(); + } + + // Handle case of invalid currentResultStatement or no binding sets returned + if ((currentBindingSetIterator == null || !currentBindingSetIterator.hasNext()) && (currentBatchQueryResultCursorIsValid() || queryIterator.hasNext())) { + findNextResult(); } } + + private static boolean isResultForQuery(RyaStatement query, RyaStatement result) { + return isResult(query.getSubject(), result.getSubject()) && + isResult(query.getPredicate(), result.getPredicate()) && + isResult(query.getObject(), result.getObject()) && + isResult(query.getContext(), result.getContext()); + } + + private static boolean isResult(RyaType query, RyaType result) { + return (query == null) || query.equals(result); + } - private void findNextValidResultCursor() { - while (queryIterator.hasNext()){ - final DBObject currentQuery = queryIterator.next(); - currentBindingSetCollection = rangeMap.get(currentQuery); - // Executing redact aggregation to only return documents the user - // has access to. - final List<Document> pipeline = new ArrayList<>(); - pipeline.add(new Document("$match", currentQuery)); - pipeline.addAll(AggregationUtil.createRedactPipeline(auths)); - log.debug(pipeline); - - final AggregateIterable<Document> aggIter = coll.aggregate(pipeline); - aggIter.batchSize(1000); - resultsIterator = aggIter.iterator(); - if (resultsIterator.hasNext()) { - break; - } + private void submitBatchQuery() { + int count = 0; + executedRangeMap.clear(); + final List<Document> pipeline = new ArrayList<>(); + final List<DBObject> match = new ArrayList<>(); + + while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){ + count++; + RyaStatement query = queryIterator.next(); + executedRangeMap.putAll(query, rangeMap.get(query)); + final DBObject currentQuery = strategy.getQuery(query); + match.add(currentQuery); } - } - private boolean currentResultCursorIsValid() { - return (resultsIterator != null) && resultsIterator.hasNext(); + if (match.size() > 1) { + pipeline.add(new Document("$match", new Document("$or", match))); + } else if (match.size() == 1) { + pipeline.add(new Document("$match", match.get(0))); + } else { + batchQueryResultsIterator = Iterators.emptyIterator(); + return; + } + + // Executing redact aggregation to only return documents the user has access to. + pipeline.addAll(AggregationUtil.createRedactPipeline(auths)); + log.info(pipeline); + + final AggregateIterable<Document> aggIter = coll.aggregate(pipeline); + aggIter.batchSize(1000); + batchQueryResultsIterator = aggIter.iterator(); } - - public void setMaxResults(final Long maxResults) { - this.maxResults = maxResults; + private boolean currentBatchQueryResultCursorIsValid() { + return (batchQueryResultsIterator != null) && batchQueryResultsIterator.hasNext(); } + @Override public void close() throws RyaDAOException { // TODO don't know what to do here http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java deleted file mode 100644 index f9d84b2..0000000 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.rya.mongodb.iter; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.rya.api.RdfCloudTripleStoreUtils; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.persist.RyaDAOException; - -import org.calrissian.mango.collect.CloseableIterable; -import org.calrissian.mango.collect.CloseableIterator; -import org.openrdf.query.BindingSet; - -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; - -public class RyaStatementCursorIterable implements CloseableIterable<RyaStatement> { - - - private NonCloseableRyaStatementCursorIterator iterator; - - public RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) { - this.iterator = iterator; - } - - @Override - public Iterator<RyaStatement> iterator() { - // TODO Auto-generated method stub - return iterator; - } - - @Override - public void closeQuietly() { - //TODO don't know what to do here - } - - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java index 1a5eb99..82eed6f 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java @@ -18,103 +18,55 @@ */ package org.apache.rya.mongodb.iter; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.Map.Entry; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.log4j.Logger; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; -import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil; -import org.bson.Document; +import org.openrdf.query.BindingSet; -import com.mongodb.DBObject; -import com.mongodb.client.AggregateIterable; -import com.mongodb.client.MongoCollection; -import com.mongodb.util.JSON; +import com.google.common.base.Throwables; import info.aduna.iteration.CloseableIteration; -public class RyaStatementCursorIterator implements CloseableIteration<RyaStatement, RyaDAOException> { - private static final Logger log = Logger.getLogger(RyaStatementCursorIterator.class); +public class RyaStatementCursorIterator implements Iterator<RyaStatement>, CloseableIteration<RyaStatement, RyaDAOException> { + private final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iterator; - private final MongoCollection coll; - private final Iterator<DBObject> queryIterator; - private Iterator<Document> resultsIterator; - private final MongoDBStorageStrategy<RyaStatement> strategy; - private Long maxResults; - private final Authorizations auths; - - public RyaStatementCursorIterator(final MongoCollection<Document> collection, final Set<DBObject> queries, - final MongoDBStorageStrategy<RyaStatement> strategy, final Authorizations auths) { - coll = collection; - queryIterator = queries.iterator(); - this.strategy = strategy; - this.auths = auths; + public RyaStatementCursorIterator(CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iterator) { + this.iterator = iterator; } @Override public boolean hasNext() { - if (!currentCursorIsValid()) { - findNextValidCursor(); + try { + return iterator.hasNext(); + } catch (RyaDAOException e) { + Throwables.propagate(e); } - return currentCursorIsValid(); + return false; } @Override public RyaStatement next() { - if (!currentCursorIsValid()) { - findNextValidCursor(); - } - if (currentCursorIsValid()) { - // convert to Rya Statement - final Document queryResult = resultsIterator.next(); - final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson()); - final RyaStatement statement = strategy.deserializeDBObject(dbo); - return statement; + try { + return iterator.next().getKey(); + } catch (RyaDAOException e) { + Throwables.propagate(e); } return null; } - private void findNextValidCursor() { - while (queryIterator.hasNext()){ - final DBObject currentQuery = queryIterator.next(); - - // Executing redact aggregation to only return documents the user - // has access to. - final List<Document> pipeline = new ArrayList<>(); - pipeline.add(new Document("$match", currentQuery)); - pipeline.addAll(AggregationUtil.createRedactPipeline(auths)); - log.debug(pipeline); - final AggregateIterable<Document> output = coll.aggregate(pipeline); - output.batchSize(1000); - - resultsIterator = output.iterator(); - if (resultsIterator.hasNext()) { - break; - } - } - } - - private boolean currentCursorIsValid() { - return (resultsIterator != null) && resultsIterator.hasNext(); - } - - - public void setMaxResults(final Long maxResults) { - this.maxResults = maxResults; - } - @Override public void close() throws RyaDAOException { - // TODO don't know what to do here + iterator.close(); } @Override - public void remove() throws RyaDAOException { - next(); + public void remove() { + try { + iterator.remove(); + } catch (RyaDAOException e) { + Throwables.propagate(e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java index d843d22..4187c85 100644 --- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java @@ -98,6 +98,36 @@ public class MongoDBQueryEngineTest extends MongoRyaTestBase { @SuppressWarnings("unchecked") @Test + public void batchbindingSetsQuery() throws Exception { + final RyaStatement s1 = getStatement(null, null, "u:b"); + + final MapBindingSet bs1 = new MapBindingSet(); + bs1.addBinding("foo", new URIImpl("u:x")); + + final Map.Entry<RyaStatement, BindingSet> e1 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs1); + final Collection<Entry<RyaStatement, BindingSet>> stmts1 = Lists.newArrayList(e1); + Assert.assertEquals(1, size(engine.queryWithBindingSet(stmts1, configuration))); + + + final MapBindingSet bs2 = new MapBindingSet(); + bs2.addBinding("foo", new URIImpl("u:y")); + + final RyaStatement s2 = getStatement(null, null, "u:c"); + + final Map.Entry<RyaStatement, BindingSet> e2 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs2); + + final Collection<Entry<RyaStatement, BindingSet>> stmts2 = Lists.newArrayList(e1, e2); + Assert.assertEquals(2, size(engine.queryWithBindingSet(stmts2, configuration))); + + + final Map.Entry<RyaStatement, BindingSet> e3 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs1); + final Map.Entry<RyaStatement, BindingSet> e4 = new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs2); + + final Collection<Entry<RyaStatement, BindingSet>> stmts3 = Lists.newArrayList(e1, e2, e3, e4); + Assert.assertEquals(4, size(engine.queryWithBindingSet(stmts3, configuration))); +} + @SuppressWarnings("unchecked") + @Test public void bindingSetsQuery() throws Exception { final RyaStatement s = getStatement("u:a", null, null);