Repository: incubator-rya
Updated Branches:
  refs/heads/master ad60aca8d -> 0d80871ff


RYA-337 Adding batch queries to MongoDB. Closes #204

Additionally, simplifying MongoDBQueryEngine


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/0d80871f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/0d80871f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/0d80871f

Branch: refs/heads/master
Commit: 0d80871ff614683fb0b4105a7778b5e56e53051c
Parents: ad60aca
Author: Aaron Mihalik <miha...@alum.mit.edu>
Authored: Tue Aug 8 11:17:37 2017 -0400
Committer: Caleb Meier <caleb.me...@parsons.com>
Committed: Fri Aug 18 17:54:17 2017 -0700

----------------------------------------------------------------------
 .../apache/rya/mongodb/MongoDBQueryEngine.java  | 120 ++++++-------------
 .../NonCloseableRyaStatementCursorIterator.java |  57 ---------
 .../RyaStatementBindingSetCursorIterator.java   | 115 ++++++++++++------
 .../iter/RyaStatementCursorIterable.java        |  67 -----------
 .../iter/RyaStatementCursorIterator.java        |  94 ++++-----------
 .../rya/mongodb/MongoDBQueryEngineTest.java     |  30 +++++
 6 files changed, 165 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
index 8932fc4..f1115b1 100644
--- 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
+++ 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBQueryEngine.java
@@ -21,11 +21,13 @@ package org.apache.rya.mongodb;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.IOException;
+import java.util.AbstractMap;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.persist.RyaDAOException;
@@ -34,19 +36,17 @@ import org.apache.rya.api.persist.query.RyaQuery;
 import org.apache.rya.api.persist.query.RyaQueryEngine;
 import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
 import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
-import org.apache.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator;
 import org.apache.rya.mongodb.iter.RyaStatementBindingSetCursorIterator;
-import org.apache.rya.mongodb.iter.RyaStatementCursorIterable;
 import org.apache.rya.mongodb.iter.RyaStatementCursorIterator;
 import org.bson.Document;
 import org.calrissian.mango.collect.CloseableIterable;
+import org.calrissian.mango.collect.CloseableIterables;
 import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
@@ -61,13 +61,10 @@ public class MongoDBQueryEngine implements 
RyaQueryEngine<MongoDBRdfConfiguratio
 
     private MongoDBRdfConfiguration configuration;
     private final MongoClient mongoClient;
-    private final DBCollection coll;
     private final MongoDBStorageStrategy<RyaStatement> strategy;
 
     public MongoDBQueryEngine(final MongoDBRdfConfiguration conf, final 
MongoClient mongoClient) {
         this.mongoClient = checkNotNull(mongoClient);
-        final DB db = 
mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
-        coll = db.getCollection(conf.getTriplesCollectionName());
         strategy = new SimpleMongoDBStorageStrategy();
     }
 
@@ -86,50 +83,36 @@ public class MongoDBQueryEngine implements 
RyaQueryEngine<MongoDBRdfConfiguratio
     public CloseableIteration<RyaStatement, RyaDAOException> query(
             final RyaStatement stmt, MongoDBRdfConfiguration conf)
             throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-        final Long maxResults = conf.getLimit();
-        final Set<DBObject> queries = new HashSet<DBObject>();
-        final DBObject query = strategy.getQuery(stmt);
-        queries.add(query);
-        final MongoDatabase db = 
mongoClient.getDatabase(conf.getMongoDBName());
-        final MongoCollection<Document> collection = 
db.getCollection(conf.getTriplesCollectionName());
-        final RyaStatementCursorIterator iterator = new 
RyaStatementCursorIterator(collection, queries, strategy,
-                conf.getAuthorizations());
-
-        if (maxResults != null) {
-            iterator.setMaxResults(maxResults);
-        }
-        return iterator;
+        Preconditions.checkNotNull(stmt);
+        Preconditions.checkNotNull(conf);
+        
+        Entry<RyaStatement, BindingSet> entry = new 
AbstractMap.SimpleEntry<>(stmt, new MapBindingSet());
+        Collection<Entry<RyaStatement, BindingSet>> collection = 
Collections.singleton(entry);
+        
+        return new RyaStatementCursorIterator(queryWithBindingSet(collection, 
conf));
     }
 
     @Override
     public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(
             final Collection<Entry<RyaStatement, BindingSet>> stmts,
             MongoDBRdfConfiguration conf) throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-        final Long maxResults = conf.getLimit();
-        final Multimap<DBObject, BindingSet> rangeMap = HashMultimap.create();
+        Preconditions.checkNotNull(stmts);
+        Preconditions.checkNotNull(conf);
+        
+        final Multimap<RyaStatement, BindingSet> rangeMap = 
HashMultimap.create();
 
         //TODO: cannot span multiple tables here
         try {
             for (final Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
                 final RyaStatement stmt = stmtbs.getKey();
                 final BindingSet bs = stmtbs.getValue();
-                final DBObject query = strategy.getQuery(stmt);
-                rangeMap.put(query, bs);
+                rangeMap.put(stmt, bs);
             }
 
             // TODO not sure what to do about regex ranges?
             final RyaStatementBindingSetCursorIterator iterator = new 
RyaStatementBindingSetCursorIterator(
                     getCollection(conf), rangeMap, strategy, 
conf.getAuthorizations());
 
-            if (maxResults != null) {
-                iterator.setMaxResults(maxResults);
-            }
             return iterator;
         } catch (final Exception e) {
             throw new RyaDAOException(e);
@@ -140,72 +123,39 @@ public class MongoDBQueryEngine implements 
RyaQueryEngine<MongoDBRdfConfiguratio
     public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(
             final Collection<RyaStatement> stmts, MongoDBRdfConfiguration conf)
             throws RyaDAOException {
-        if (conf == null) {
-            conf = configuration;
-        }
-        final Long maxResults = conf.getLimit();
-        final Set<DBObject> queries = new HashSet<DBObject>();
+        final Map<RyaStatement, BindingSet> queries = new HashMap<>();
 
-        try {
-            for (final RyaStatement stmt : stmts) {
-                queries.add( strategy.getQuery(stmt));
-             }
-
-            // TODO not sure what to do about regex ranges?
-            final RyaStatementCursorIterator iterator = new 
RyaStatementCursorIterator(getCollection(conf), queries,
-                    strategy, configuration.getAuthorizations());
-
-            if (maxResults != null) {
-                iterator.setMaxResults(maxResults);
-            }
-            return iterator;
-        } catch (final Exception e) {
-            throw new RyaDAOException(e);
+        for (final RyaStatement stmt : stmts) {
+            queries.put(stmt, new MapBindingSet());
         }
 
+        return new 
RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), conf));
     }
+    
     @Override
     public CloseableIterable<RyaStatement> query(final RyaQuery ryaQuery)
             throws RyaDAOException {
-            final Set<DBObject> queries = new HashSet<DBObject>();
-
-            try {
-                queries.add( strategy.getQuery(ryaQuery));
-
-                // TODO not sure what to do about regex ranges?
-                // TODO this is gross
-            final RyaStatementCursorIterable iterator = new 
RyaStatementCursorIterable(
-                    new NonCloseableRyaStatementCursorIterator(new 
RyaStatementCursorIterator(getCollection(getConf()),
-                            queries, strategy, 
configuration.getAuthorizations())));
+        Preconditions.checkNotNull(ryaQuery);
 
-                return iterator;
-            } catch (final Exception e) {
-                throw new RyaDAOException(e);
-            }
+        return query(new 
BatchRyaQuery(Collections.singleton(ryaQuery.getQuery())));
     }
+
     @Override
     public CloseableIterable<RyaStatement> query(final BatchRyaQuery 
batchRyaQuery)
             throws RyaDAOException {
-         try {
-             final Set<DBObject> queries = new HashSet<DBObject>();
-            for (final RyaStatement statement : batchRyaQuery.getQueries()){
-                queries.add( strategy.getQuery(statement));
+        Preconditions.checkNotNull(batchRyaQuery);
 
-            }
+        final Map<RyaStatement, BindingSet> queries = new HashMap<>();
 
-            // TODO not sure what to do about regex ranges?
-            // TODO this is gross
-            final RyaStatementCursorIterable iterator = new 
RyaStatementCursorIterable(
-                    new NonCloseableRyaStatementCursorIterator(new 
RyaStatementCursorIterator(getCollection(getConf()),
-                            queries, strategy, 
configuration.getAuthorizations())));
-
-            return iterator;
-        } catch (final Exception e) {
-            throw new RyaDAOException(e);
+        for (final RyaStatement stmt : batchRyaQuery.getQueries()) {
+            queries.put(stmt, new MapBindingSet());
         }
+
+        Iterator<RyaStatement> iterator = new 
RyaStatementCursorIterator(queryWithBindingSet(queries.entrySet(), getConf()));
+        return CloseableIterables.wrap((Iterable<RyaStatement>) () -> 
iterator);
     }
 
-    private MongoCollection getCollection(final MongoDBRdfConfiguration conf) {
+    private MongoCollection<Document> getCollection(final 
MongoDBRdfConfiguration conf) {
         final MongoDatabase db = 
mongoClient.getDatabase(conf.getMongoDBName());
         return db.getCollection(conf.getTriplesCollectionName());
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
deleted file mode 100644
index 35dab6d..0000000
--- 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/NonCloseableRyaStatementCursorIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Iterator;
-
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.persist.RyaDAOException;
-
-public class NonCloseableRyaStatementCursorIterator implements 
Iterator<RyaStatement> {
-
-       RyaStatementCursorIterator iterator;
-       
-       @Override
-       public boolean hasNext() {
-               return iterator.hasNext();
-       }
-
-       @Override
-       public RyaStatement next() {
-               return iterator.next();
-       }
-
-       public NonCloseableRyaStatementCursorIterator(
-                       RyaStatementCursorIterator iterator) {
-               this.iterator = iterator;
-       }
-
-       @Override
-       public void remove() {
-               try {
-                       iterator.remove();
-               } catch (RyaDAOException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
index 18f71d2..de5e8b0 100644
--- 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
+++ 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
@@ -19,21 +19,25 @@
 package org.apache.rya.mongodb.iter;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.log4j.Logger;
 import org.apache.rya.api.RdfCloudTripleStoreUtils;
 import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.persist.RyaDAOException;
 import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
 import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
 import org.bson.Document;
 import org.openrdf.query.BindingSet;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Multimap;
 import com.mongodb.DBObject;
 import com.mongodb.client.AggregateIterable;
@@ -44,20 +48,21 @@ import info.aduna.iteration.CloseableIteration;
 
 public class RyaStatementBindingSetCursorIterator implements 
CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
     private static final Logger log = 
Logger.getLogger(RyaStatementBindingSetCursorIterator.class);
+    
+    private static final int QUERY_BATCH_SIZE = 50;
 
     private final MongoCollection<Document> coll;
-    private final Multimap<DBObject, BindingSet> rangeMap;
-    private final Iterator<DBObject> queryIterator;
-    private Long maxResults;
-    private Iterator<Document> resultsIterator;
-    private RyaStatement currentStatement;
-    private Collection<BindingSet> currentBindingSetCollection;
+    private final Multimap<RyaStatement, BindingSet> rangeMap;
+    private final Multimap<RyaStatement, BindingSet> executedRangeMap = 
HashMultimap.create();
+    private final Iterator<RyaStatement> queryIterator;
+    private Iterator<Document> batchQueryResultsIterator;
+    private RyaStatement currentResultStatement;
     private Iterator<BindingSet> currentBindingSetIterator;
     private final MongoDBStorageStrategy<RyaStatement> strategy;
     private final Authorizations auths;
 
     public RyaStatementBindingSetCursorIterator(final 
MongoCollection<Document> coll,
-            final Multimap<DBObject, BindingSet> rangeMap, final 
MongoDBStorageStrategy<RyaStatement> strategy,
+            final Multimap<RyaStatement, BindingSet> rangeMap, final 
MongoDBStorageStrategy<RyaStatement> strategy,
             final Authorizations auths) {
         this.coll = coll;
         this.rangeMap = rangeMap;
@@ -81,7 +86,7 @@ public class RyaStatementBindingSetCursorIterator implements 
CloseableIteration<
         }
         if (currentBindingSetIteratorIsValid()) {
             final BindingSet currentBindingSet = 
currentBindingSetIterator.next();
-            return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, 
BindingSet>(currentStatement, currentBindingSet);
+            return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, 
BindingSet>(currentResultStatement, currentBindingSet);
         }
         return null;
     }
@@ -91,47 +96,81 @@ public class RyaStatementBindingSetCursorIterator 
implements CloseableIteration<
     }
 
     private void findNextResult() {
-        if (!currentResultCursorIsValid()) {
-            findNextValidResultCursor();
+        if (!currentBatchQueryResultCursorIsValid()) {
+            submitBatchQuery();
         }
-        if (currentResultCursorIsValid()) {
+        
+        if (currentBatchQueryResultCursorIsValid()) {
             // convert to Rya Statement
-            final Document queryResult = resultsIterator.next();
+            final Document queryResult = batchQueryResultsIterator.next();
             final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
-            currentStatement = strategy.deserializeDBObject(dbo);
-            currentBindingSetIterator = currentBindingSetCollection.iterator();
+            currentResultStatement = strategy.deserializeDBObject(dbo);
+            
+            // Find all of the queries in the executed RangeMap that this 
result matches
+            // and collect all of those binding sets
+            Set<BindingSet> bsList = new HashSet<>();
+            for (RyaStatement executedQuery : executedRangeMap.keys()) {
+                if (isResultForQuery(executedQuery, currentResultStatement)) {
+                    bsList.addAll(executedRangeMap.get(executedQuery));
+                }
+            }
+            currentBindingSetIterator = bsList.iterator();
+        }
+        
+        // Handle case of invalid currentResultStatement or no binding sets 
returned
+        if ((currentBindingSetIterator == null || 
!currentBindingSetIterator.hasNext()) && 
(currentBatchQueryResultCursorIsValid() || queryIterator.hasNext())) {
+            findNextResult();
         }
     }
+    
+    private static boolean isResultForQuery(RyaStatement query, RyaStatement 
result) {
+        return isResult(query.getSubject(), result.getSubject()) &&
+                isResult(query.getPredicate(), result.getPredicate()) &&
+                isResult(query.getObject(), result.getObject()) &&
+                isResult(query.getContext(), result.getContext());
+    }
+    
+    private static boolean isResult(RyaType query, RyaType result) {
+        return (query == null) || query.equals(result);
+    }
 
-    private void findNextValidResultCursor() {
-        while (queryIterator.hasNext()){
-            final DBObject currentQuery = queryIterator.next();
-            currentBindingSetCollection = rangeMap.get(currentQuery);
-            // Executing redact aggregation to only return documents the user
-            // has access to.
-            final List<Document> pipeline = new ArrayList<>();
-            pipeline.add(new Document("$match", currentQuery));
-            pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
-            log.debug(pipeline);
-
-            final AggregateIterable<Document> aggIter = 
coll.aggregate(pipeline);
-            aggIter.batchSize(1000);
-            resultsIterator = aggIter.iterator();
-            if (resultsIterator.hasNext()) {
-                break;
-            }
+    private void submitBatchQuery() {
+        int count = 0;
+        executedRangeMap.clear();
+        final List<Document> pipeline = new ArrayList<>();
+        final List<DBObject> match = new ArrayList<>();
+
+        while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
+            count++;
+            RyaStatement query = queryIterator.next();
+            executedRangeMap.putAll(query, rangeMap.get(query));
+            final DBObject currentQuery = strategy.getQuery(query);
+            match.add(currentQuery);
         }
-    }
 
-    private boolean currentResultCursorIsValid() {
-        return (resultsIterator != null) && resultsIterator.hasNext();
+        if (match.size() > 1) {
+            pipeline.add(new Document("$match", new Document("$or", match)));
+        } else if (match.size() == 1) {
+            pipeline.add(new Document("$match", match.get(0)));
+        } else {
+            batchQueryResultsIterator = Iterators.emptyIterator();
+            return;
+        }
+        
+        // Executing redact aggregation to only return documents the user has 
access to.
+        pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
+        log.info(pipeline);
+
+        final AggregateIterable<Document> aggIter = coll.aggregate(pipeline);
+        aggIter.batchSize(1000);
+        batchQueryResultsIterator = aggIter.iterator();
     }
 
-
-    public void setMaxResults(final Long maxResults) {
-        this.maxResults = maxResults;
+    private boolean currentBatchQueryResultCursorIsValid() {
+        return (batchQueryResultsIterator != null) && 
batchQueryResultsIterator.hasNext();
     }
 
+
     @Override
     public void close() throws RyaDAOException {
         // TODO don't know what to do here

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
deleted file mode 100644
index f9d84b2..0000000
--- 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.rya.mongodb.iter;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import info.aduna.iteration.CloseableIteration;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.rya.api.RdfCloudTripleStoreUtils;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.persist.RyaDAOException;
-
-import org.calrissian.mango.collect.CloseableIterable;
-import org.calrissian.mango.collect.CloseableIterator;
-import org.openrdf.query.BindingSet;
-
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-
-public class RyaStatementCursorIterable implements 
CloseableIterable<RyaStatement> {
-
-
-       private NonCloseableRyaStatementCursorIterator iterator;
-
-       public 
RyaStatementCursorIterable(NonCloseableRyaStatementCursorIterator iterator) {
-               this.iterator = iterator;
-       }
-
-       @Override
-       public Iterator<RyaStatement> iterator() {
-               // TODO Auto-generated method stub
-               return iterator;
-       }
-
-       @Override
-       public void closeQuietly() {
-               //TODO  don't know what to do here
-       }
-
-       @Override
-       public void close() throws IOException {
-               // TODO Auto-generated method stub
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
index 1a5eb99..82eed6f 100644
--- 
a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
+++ 
b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementCursorIterator.java
@@ -18,103 +18,55 @@
  */
 package org.apache.rya.mongodb.iter;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.Map.Entry;
 
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
-import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
-import org.bson.Document;
+import org.openrdf.query.BindingSet;
 
-import com.mongodb.DBObject;
-import com.mongodb.client.AggregateIterable;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.util.JSON;
+import com.google.common.base.Throwables;
 
 import info.aduna.iteration.CloseableIteration;
 
-public class RyaStatementCursorIterator implements 
CloseableIteration<RyaStatement, RyaDAOException> {
-    private static final Logger log = 
Logger.getLogger(RyaStatementCursorIterator.class);
+public class RyaStatementCursorIterator implements Iterator<RyaStatement>, 
CloseableIteration<RyaStatement, RyaDAOException> {
+    private final CloseableIteration<? extends Entry<RyaStatement, 
BindingSet>, RyaDAOException> iterator;
 
-    private final MongoCollection coll;
-    private final Iterator<DBObject> queryIterator;
-    private Iterator<Document> resultsIterator;
-    private final MongoDBStorageStrategy<RyaStatement> strategy;
-    private Long maxResults;
-    private final Authorizations auths;
-
-    public RyaStatementCursorIterator(final MongoCollection<Document> 
collection, final Set<DBObject> queries,
-            final MongoDBStorageStrategy<RyaStatement> strategy, final 
Authorizations auths) {
-        coll = collection;
-        queryIterator = queries.iterator();
-        this.strategy = strategy;
-        this.auths = auths;
+    public RyaStatementCursorIterator(CloseableIteration<? extends 
Entry<RyaStatement, BindingSet>, RyaDAOException> iterator) {
+        this.iterator = iterator;
     }
 
     @Override
     public boolean hasNext() {
-        if (!currentCursorIsValid()) {
-            findNextValidCursor();
+        try {
+            return iterator.hasNext();
+        } catch (RyaDAOException e) {
+            Throwables.propagate(e);
         }
-        return currentCursorIsValid();
+        return false;
     }
 
     @Override
     public RyaStatement next() {
-        if (!currentCursorIsValid()) {
-            findNextValidCursor();
-        }
-        if (currentCursorIsValid()) {
-            // convert to Rya Statement
-            final Document queryResult = resultsIterator.next();
-            final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
-            final RyaStatement statement = strategy.deserializeDBObject(dbo);
-            return statement;
+        try {
+            return iterator.next().getKey();
+        } catch (RyaDAOException e) {
+            Throwables.propagate(e);
         }
         return null;
     }
 
-    private void findNextValidCursor() {
-        while (queryIterator.hasNext()){
-            final DBObject currentQuery = queryIterator.next();
-
-            // Executing redact aggregation to only return documents the user
-            // has access to.
-            final List<Document> pipeline = new ArrayList<>();
-            pipeline.add(new Document("$match", currentQuery));
-            pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
-            log.debug(pipeline);
-            final AggregateIterable<Document> output = 
coll.aggregate(pipeline);
-            output.batchSize(1000);
-
-            resultsIterator = output.iterator();
-            if (resultsIterator.hasNext()) {
-                break;
-            }
-        }
-    }
-
-    private boolean currentCursorIsValid() {
-        return (resultsIterator != null) && resultsIterator.hasNext();
-    }
-
-
-    public void setMaxResults(final Long maxResults) {
-        this.maxResults = maxResults;
-    }
-
     @Override
     public void close() throws RyaDAOException {
-        // TODO don't know what to do here
+        iterator.close();
     }
 
     @Override
-    public void remove() throws RyaDAOException {
-        next();
+    public void remove() {
+        try {
+            iterator.remove();
+        } catch (RyaDAOException e) {
+            Throwables.propagate(e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/0d80871f/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
 
b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
index d843d22..4187c85 100644
--- 
a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
+++ 
b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBQueryEngineTest.java
@@ -98,6 +98,36 @@ public class MongoDBQueryEngineTest extends MongoRyaTestBase 
{
 
     @SuppressWarnings("unchecked")
     @Test
+    public void batchbindingSetsQuery() throws Exception {
+        final RyaStatement s1 = getStatement(null, null, "u:b");
+
+        final MapBindingSet bs1 = new MapBindingSet();
+        bs1.addBinding("foo", new URIImpl("u:x"));
+
+        final Map.Entry<RyaStatement, BindingSet> e1 = new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs1);
+        final Collection<Entry<RyaStatement, BindingSet>> stmts1 = 
Lists.newArrayList(e1);
+        Assert.assertEquals(1, size(engine.queryWithBindingSet(stmts1, 
configuration)));
+
+
+        final MapBindingSet bs2 = new MapBindingSet();
+        bs2.addBinding("foo", new URIImpl("u:y"));
+
+        final RyaStatement s2 = getStatement(null, null, "u:c");
+
+        final Map.Entry<RyaStatement, BindingSet> e2 = new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs2);
+
+        final Collection<Entry<RyaStatement, BindingSet>> stmts2 = 
Lists.newArrayList(e1, e2);
+        Assert.assertEquals(2, size(engine.queryWithBindingSet(stmts2, 
configuration)));
+
+
+        final Map.Entry<RyaStatement, BindingSet> e3 = new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s2, bs1);
+        final Map.Entry<RyaStatement, BindingSet> e4 = new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(s1, bs2);
+        
+        final Collection<Entry<RyaStatement, BindingSet>> stmts3 = 
Lists.newArrayList(e1, e2, e3, e4);
+        Assert.assertEquals(4, size(engine.queryWithBindingSet(stmts3, 
configuration)));
+}
+    @SuppressWarnings("unchecked")
+    @Test
     public void bindingSetsQuery() throws Exception {
         final RyaStatement s = getStatement("u:a", null, null);
 

Reply via email to