http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java
new file mode 100644
index 0000000..a62aedb
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/CloudbaseRyaQueryEngine.java
@@ -0,0 +1,385 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.client.BatchScanner;
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.ScannerBase;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Range;
+import cloudbase.core.data.Value;
+import cloudbase.core.iterators.FilteringIterator;
+import cloudbase.core.iterators.RegExIterator;
+import cloudbase.core.iterators.filter.AgeOffFilter;
+import cloudbase.core.iterators.filter.RegExFilter;
+import cloudbase.core.security.Authorizations;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterators;
+import mango.collect.CloseableIterable;
+import mango.collect.CloseableIterables;
+import mango.collect.FluentCloseableIterable;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaRange;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.layout.TableLayoutStrategy;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.query.BatchRyaQuery;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.api.persist.query.RyaQueryEngine;
+import mvm.rya.api.query.strategy.ByteRange;
+import mvm.rya.api.query.strategy.TriplePatternStrategy;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRowRegex;
+import mvm.rya.api.utils.CloseableIterableIteration;
+import mvm.rya.cloudbase.CloudbaseRdfConfiguration;
+import mvm.rya.iterators.LimitingAgeOffFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.openrdf.query.BindingSet;
+
+import java.io.IOException;
+import java.util.*;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import static mvm.rya.api.RdfCloudTripleStoreUtils.layoutToTable;
+
+/**
+ * Date: 7/17/12
+ * Time: 9:28 AM
+ */
+public class CloudbaseRyaQueryEngine implements 
RyaQueryEngine<CloudbaseRdfConfiguration> {
+
+    private Log logger = LogFactory.getLog(CloudbaseRyaQueryEngine.class);
+    private CloudbaseRdfConfiguration configuration;
+    private RyaContext ryaContext = RyaContext.getInstance();
+    private Connector connector;
+    private Map<TABLE_LAYOUT, KeyValueToRyaStatementFunction> 
keyValueToRyaStatementFunctionMap = new HashMap<TABLE_LAYOUT, 
KeyValueToRyaStatementFunction>();
+
+    public CloudbaseRyaQueryEngine(Connector connector) {
+        this(connector, new CloudbaseRdfConfiguration());
+    }
+
+    public CloudbaseRyaQueryEngine(Connector connector, 
CloudbaseRdfConfiguration conf) {
+        this.connector = connector;
+        this.configuration = conf;
+
+        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.SPO, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.SPO));
+        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.PO, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.PO));
+        keyValueToRyaStatementFunctionMap.put(TABLE_LAYOUT.OSP, new 
KeyValueToRyaStatementFunction(TABLE_LAYOUT.OSP));
+    }
+
+    @Override
+    public CloseableIteration<RyaStatement, RyaDAOException> 
query(RyaStatement stmt, CloudbaseRdfConfiguration conf) throws RyaDAOException 
{
+        if (conf == null) {
+            conf = configuration;
+        }
+
+        RyaQuery ryaQuery = RyaQuery.builder(stmt).load(conf).build();
+        CloseableIterable<RyaStatement> results = query(ryaQuery);
+
+        return new CloseableIterableIteration<RyaStatement, 
RyaDAOException>(results);
+    }
+
+    protected String getData(RyaType ryaType) {
+        return (ryaType != null) ? (ryaType.getData()) : (null);
+    }
+
+    @Override
+    public CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(Collection<Map.Entry<RyaStatement, 
BindingSet>> stmts, CloudbaseRdfConfiguration conf) throws RyaDAOException {
+        if (conf == null) {
+            conf = configuration;
+        }
+        //query configuration
+        Authorizations authorizations = conf.getAuthorizations();
+        Long ttl = conf.getTtl();
+        Long maxResults = conf.getLimit();
+        Integer maxRanges = conf.getMaxRangesForScanner();
+        Integer numThreads = conf.getNumThreads();
+
+        //TODO: cannot span multiple tables here
+        try {
+            Collection<Range> ranges = new HashSet<Range>();
+            RangeBindingSetEntries rangeMap = new RangeBindingSetEntries();
+            TABLE_LAYOUT layout = null;
+            RyaURI context = null;
+            TriplePatternStrategy strategy = null;
+            for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
+                RyaStatement stmt = stmtbs.getKey();
+                context = stmt.getContext(); //TODO: This will be overwritten
+                BindingSet bs = stmtbs.getValue();
+                strategy = ryaContext.retrieveStrategy(stmt);
+                if (strategy == null) {
+                    throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
+                }
+
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
+                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), conf);
+
+                //use range to set scanner
+                //populate scanner based on authorizations, ttl
+                layout = entry.getKey();
+                ByteRange byteRange = entry.getValue();
+                Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
+                ranges.add(range);
+                rangeMap.ranges.add(new 
RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, bs));
+            }
+            //no ranges
+            if (layout == null) return null;
+            String regexSubject = conf.getRegexSubject();
+            String regexPredicate = conf.getRegexPredicate();
+            String regexObject = conf.getRegexObject();
+            TripleRowRegex tripleRowRegex = strategy.buildRegex(regexSubject, 
regexPredicate, regexObject, null, null);
+
+            String table = layoutToTable(layout, conf);
+            boolean useBatchScanner = ranges.size() > maxRanges;
+            RyaStatementBindingSetKeyValueIterator iterator = null;
+            if (useBatchScanner) {
+                ScannerBase scanner = connector.createBatchScanner(table, 
authorizations, numThreads);
+                ((BatchScanner) scanner).setRanges(ranges);
+                fillScanner(scanner, context, ttl, tripleRowRegex);
+                iterator = new RyaStatementBindingSetKeyValueIterator(layout, 
scanner, rangeMap);
+            } else {
+                Scanner scannerBase = null;
+                Iterator<Map.Entry<Key, Value>>[] iters = new 
Iterator[ranges.size()];
+                int i = 0;
+                for (Range range : ranges) {
+                    scannerBase = connector.createScanner(table, 
authorizations);
+                    scannerBase.setRange(range);
+                    fillScanner(scannerBase, context, ttl, tripleRowRegex);
+                    iters[i] = scannerBase.iterator();
+                    i++;
+                }
+                iterator = new RyaStatementBindingSetKeyValueIterator(layout, 
Iterators.concat(iters), rangeMap);
+            }
+            if (maxResults != null) {
+                iterator.setMaxResults(maxResults);
+            }
+            return iterator;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+
+    }
+
+    @Override
+    public CloseableIteration<RyaStatement, RyaDAOException> 
batchQuery(Collection<RyaStatement> stmts, CloudbaseRdfConfiguration conf)
+            throws RyaDAOException {
+        if (conf == null) {
+            conf = configuration;
+        }
+
+        BatchRyaQuery batchRyaQuery = 
BatchRyaQuery.builder(stmts).load(conf).build();
+        CloseableIterable<RyaStatement> results = query(batchRyaQuery);
+
+        return new CloseableIterableIteration<RyaStatement, 
RyaDAOException>(results);
+    }
+
+    @Override
+    public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery) throws 
RyaDAOException {
+        Preconditions.checkNotNull(ryaQuery);
+        RyaStatement stmt = ryaQuery.getQuery();
+        Preconditions.checkNotNull(stmt);
+
+        //query configuration
+        String[] auths = ryaQuery.getAuths();
+        Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
+        Long ttl = ryaQuery.getTtl();
+        Long currentTime = ryaQuery.getCurrentTime();
+        Long maxResults = ryaQuery.getMaxResults();
+        Integer batchSize = ryaQuery.getBatchSize();
+        String regexSubject = ryaQuery.getRegexSubject();
+        String regexPredicate = ryaQuery.getRegexPredicate();
+        String regexObject = ryaQuery.getRegexObject();
+        TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
+
+        try {
+            //find triple pattern range
+            TriplePatternStrategy strategy = ryaContext.retrieveStrategy(stmt);
+            TABLE_LAYOUT layout;
+            Range range;
+            RyaURI subject = stmt.getSubject();
+            RyaURI predicate = stmt.getPredicate();
+            RyaType object = stmt.getObject();
+            RyaURI context = stmt.getContext();
+            String qualifier = stmt.getQualifer();
+            TripleRowRegex tripleRowRegex = null;
+            if (strategy != null) {
+                //otherwise, full table scan is supported
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
+                        strategy.defineRange(subject, predicate, object, 
context, null);
+                layout = entry.getKey();
+                ByteRange byteRange = entry.getValue();
+                range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
+
+                byte[] objectTypeInfo = null;
+                if (object != null) {
+                    //TODO: Not good to serialize this twice
+                    if (object instanceof RyaRange) {
+                        objectTypeInfo = 
RyaContext.getInstance().serializeType(((RyaRange) object).getStart())[1];
+                    } else {
+                        objectTypeInfo = 
RyaContext.getInstance().serializeType(object)[1];
+                    }
+                }
+
+                tripleRowRegex = strategy.buildRegex(regexSubject, 
regexPredicate, regexObject, null, objectTypeInfo);
+            } else {
+                range = new Range();
+                layout = TABLE_LAYOUT.SPO;
+            }
+
+            //use range to set scanner
+            //populate scanner based on authorizations, ttl
+            String table = layoutToTable(layout, tableLayoutStrategy);
+            Scanner scanner = connector.createScanner(table, authorizations);
+            int itrLevel = 20;
+            if (context != null && qualifier != null) {
+                scanner.fetchColumn(new Text(context.getData()), new 
Text(qualifier));
+            } else if (context != null) {
+                scanner.fetchColumnFamily(new Text(context.getData()));
+            } else if (qualifier != null) {
+                scanner.setScanIterators(itrLevel++, 
RegExIterator.class.getName(), "riq");
+                scanner.setScanIteratorOption("riq", RegExFilter.COLQ_REGEX, 
qualifier);
+            }
+            if (ttl != null) {
+                scanner.setScanIterators(itrLevel++, 
FilteringIterator.class.getName(), "fi");
+                scanner.setScanIteratorOption("fi", "0", 
LimitingAgeOffFilter.class.getName());
+                scanner.setScanIteratorOption("fi", "0." + 
LimitingAgeOffFilter.TTL, ttl.toString());
+                if (currentTime != null)
+                    scanner.setScanIteratorOption("fi", "0." + 
LimitingAgeOffFilter.CURRENT_TIME, currentTime.toString());
+            }
+            scanner.setRange(range);
+            if (batchSize != null) {
+                scanner.setBatchSize(batchSize);
+            }
+            //TODO: Fill in context regex
+            if (tripleRowRegex != null) {
+                scanner.setScanIterators(itrLevel++, 
RegExIterator.class.getName(), "ri");
+                scanner.setScanIteratorOption("ri", RegExFilter.ROW_REGEX, 
tripleRowRegex.getRow());
+            }
+
+            FluentCloseableIterable<RyaStatement> results = 
FluentCloseableIterable.from(new ScannerCloseableIterable(scanner))
+                    .transform(keyValueToRyaStatementFunctionMap.get(layout));
+            if (maxResults != null) {
+                results = results.limit(maxResults.intValue());
+            }
+
+            return results;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public CloseableIterable<RyaStatement> query(BatchRyaQuery ryaQuery) 
throws RyaDAOException {
+        Preconditions.checkNotNull(ryaQuery);
+        Iterable<RyaStatement> stmts = ryaQuery.getQueries();
+        Preconditions.checkNotNull(stmts);
+
+        //query configuration
+        String[] auths = ryaQuery.getAuths();
+        final Authorizations authorizations = auths != null ? new 
Authorizations(auths) : configuration.getAuthorizations();
+        final Long ttl = ryaQuery.getTtl();
+        Long currentTime = ryaQuery.getCurrentTime();
+        Long maxResults = ryaQuery.getMaxResults();
+        Integer batchSize = ryaQuery.getBatchSize();
+        Integer numQueryThreads = ryaQuery.getNumQueryThreads();
+        String regexSubject = ryaQuery.getRegexSubject();
+        String regexPredicate = ryaQuery.getRegexPredicate();
+        String regexObject = ryaQuery.getRegexObject();
+        TableLayoutStrategy tableLayoutStrategy = 
configuration.getTableLayoutStrategy();
+        int maxRanges = ryaQuery.getMaxRanges();
+
+        //TODO: cannot span multiple tables here
+        try {
+            Collection<Range> ranges = new HashSet<Range>();
+            TABLE_LAYOUT layout = null;
+            RyaURI context = null;
+            TriplePatternStrategy strategy = null;
+            for (RyaStatement stmt : stmts) {
+                context = stmt.getContext(); //TODO: This will be overwritten
+                strategy = ryaContext.retrieveStrategy(stmt);
+                if (strategy == null) {
+                    throw new IllegalArgumentException("TriplePattern[" + stmt 
+ "] not supported");
+                }
+
+                Map.Entry<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
ByteRange> entry =
+                        strategy.defineRange(stmt.getSubject(), 
stmt.getPredicate(), stmt.getObject(), stmt.getContext(), null);
+
+                //use range to set scanner
+                //populate scanner based on authorizations, ttl
+                layout = entry.getKey();
+                ByteRange byteRange = entry.getValue();
+                Range range = new Range(new Text(byteRange.getStart()), new 
Text(byteRange.getEnd()));
+                ranges.add(range);
+            }
+            //no ranges
+            if (layout == null) throw new IllegalArgumentException("No table 
layout specified");
+
+            final TripleRowRegex tripleRowRegex = 
strategy.buildRegex(regexSubject, regexPredicate, regexObject, null, null);
+
+            final String table = layoutToTable(layout, tableLayoutStrategy);
+            boolean useBatchScanner = ranges.size() > maxRanges;
+            FluentCloseableIterable<RyaStatement> results = null;
+            if (useBatchScanner) {
+                BatchScanner scanner = connector.createBatchScanner(table, 
authorizations, numQueryThreads);
+                scanner.setRanges(ranges);
+                fillScanner(scanner, context, ttl, tripleRowRegex);
+                results = FluentCloseableIterable.from(new 
BatchScannerCloseableIterable(scanner)).transform(keyValueToRyaStatementFunctionMap.get(layout));
+            } else {
+                final RyaURI fcontext = context;
+                FluentIterable<RyaStatement> fluent = 
FluentIterable.from(ranges).transformAndConcat(new Function<Range, 
Iterable<Map.Entry<Key, Value>>>() {
+                    @Override
+                    public Iterable<Map.Entry<Key, Value>> apply(Range range) {
+                        try {
+                            Scanner scanner = connector.createScanner(table, 
authorizations);
+                            scanner.setRange(range);
+                            fillScanner(scanner, fcontext, ttl, 
tripleRowRegex);
+                            return scanner;
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }).transform(keyValueToRyaStatementFunctionMap.get(layout));
+
+                results = 
FluentCloseableIterable.from(CloseableIterables.wrap(fluent));
+            }
+            if (maxResults != null) {
+                results = results.limit(maxResults.intValue());
+            }
+            return results;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    protected void fillScanner(ScannerBase scanner, RyaURI context, Long ttl, 
TripleRowRegex tripleRowRegex) throws IOException {
+        if (context != null) {
+            scanner.fetchColumnFamily(new Text(context.getData()));
+        }
+        if (ttl != null) {
+            scanner.setScanIterators(9, FilteringIterator.class.getName(), 
"fi");
+            scanner.setScanIteratorOption("fi", "0", 
AgeOffFilter.class.getName());
+            scanner.setScanIteratorOption("fi", "0.ttl", ttl.toString());
+        }
+        if (tripleRowRegex != null) {
+            scanner.setScanIterators(11, RegExIterator.class.getName(), "ri");
+            scanner.setScanIteratorOption("ri", RegExFilter.ROW_REGEX, 
tripleRowRegex.getRow());
+        }
+    }
+
+    @Override
+    public void setConf(CloudbaseRdfConfiguration conf) {
+        this.configuration = conf;
+    }
+
+    @Override
+    public CloudbaseRdfConfiguration getConf() {
+        return configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java
new file mode 100644
index 0000000..bb92c23
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/KeyValueToRyaStatementFunction.java
@@ -0,0 +1,47 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.base.Function;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import java.util.Map;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+
+/**
+ * Date: 1/30/13
+ * Time: 2:09 PM
+ */
+public class KeyValueToRyaStatementFunction implements Function<Map.Entry<Key, 
Value>, RyaStatement> {
+
+    private TABLE_LAYOUT tableLayout;
+
+    public KeyValueToRyaStatementFunction(TABLE_LAYOUT tableLayout) {
+        this.tableLayout = tableLayout;
+    }
+
+    @Override
+    public RyaStatement apply(Map.Entry<Key, Value> input) {
+        Key key = input.getKey();
+        Value value = input.getValue();
+        RyaStatement statement = null;
+        try {
+            statement = RyaContext.getInstance().deserializeTriple(tableLayout,
+                    new TripleRow(key.getRowData().toArray(),
+                            key.getColumnFamilyData().toArray(),
+                            key.getColumnQualifierData().toArray(),
+                            key.getTimestamp(),
+                            key.getColumnVisibilityData().toArray(),
+                            (value != null) ? value.get() : null
+                    ));
+        } catch (TripleRowResolverException e) {
+            throw new RuntimeException(e);
+        }
+
+        return statement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java
new file mode 100644
index 0000000..69c6147
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RangeBindingSetEntries.java
@@ -0,0 +1,37 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Range;
+import org.openrdf.query.BindingSet;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Class RangeBindingSetCollection
+ * Date: Feb 23, 2011
+ * Time: 10:15:48 AM
+ */
+public class RangeBindingSetEntries {
+    public Collection<Map.Entry<Range, BindingSet>> ranges;
+
+    public RangeBindingSetEntries() {
+        this(new ArrayList<Map.Entry<Range, BindingSet>>());
+    }
+
+    public RangeBindingSetEntries(Collection<Map.Entry<Range, BindingSet>> 
ranges) {
+        this.ranges = ranges;
+    }
+
+    public Collection<BindingSet> containsKey(Key key) {
+        //TODO: need to find a better way to sort these and pull
+        //TODO: maybe fork/join here
+        Collection<BindingSet> bss = new ArrayList<BindingSet>();
+        for (Map.Entry<Range, BindingSet> entry : ranges) {
+            if (entry.getKey().contains(key))
+                bss.add(entry.getValue());
+        }
+        return bss;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java
new file mode 100644
index 0000000..cef9eb6
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementBindingSetKeyValueIterator.java
@@ -0,0 +1,129 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.client.BatchScanner;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.ScannerBase;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.cloudbase.BatchScannerIterator;
+import org.openrdf.query.BindingSet;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+
+/**
+ * Date: 7/17/12
+ * Time: 11:48 AM
+ */
+public class RyaStatementBindingSetKeyValueIterator implements 
CloseableIteration<Map.Entry<RyaStatement, BindingSet>, RyaDAOException> {
+    private Iterator<Map.Entry<Key, Value>> dataIterator;
+    private TABLE_LAYOUT tableLayout;
+    private Long maxResults = -1L;
+    private ScannerBase scanner;
+    private boolean isBatchScanner;
+    private RangeBindingSetEntries rangeMap;
+    private Iterator<BindingSet> bsIter;
+    private RyaStatement statement;
+
+    public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, 
ScannerBase scannerBase, RangeBindingSetEntries rangeMap) {
+        this(tableLayout, ((scannerBase instanceof BatchScanner) ? new 
BatchScannerIterator(((BatchScanner) scannerBase).iterator()) : ((Scanner) 
scannerBase).iterator()), rangeMap);
+        this.scanner = scannerBase;
+        isBatchScanner = scanner instanceof BatchScanner;
+    }
+
+    public RyaStatementBindingSetKeyValueIterator(TABLE_LAYOUT tableLayout, 
Iterator<Map.Entry<Key, Value>> dataIterator, RangeBindingSetEntries rangeMap) {
+        this.tableLayout = tableLayout;
+        this.rangeMap = rangeMap;
+        this.dataIterator = dataIterator;
+    }
+
+    @Override
+    public void close() throws RyaDAOException {
+        dataIterator = null;
+        if (scanner != null && isBatchScanner) {
+            ((BatchScanner) scanner).close();
+        }
+    }
+
+    public boolean isClosed() throws RyaDAOException {
+        return dataIterator == null;
+    }
+
+    @Override
+    public boolean hasNext() throws RyaDAOException {
+        if (isClosed()) {
+            throw new RyaDAOException("Closed Iterator");
+        }
+        if (maxResults != 0) {
+            if (bsIter != null && bsIter.hasNext()) {
+                return true;
+            }
+            if (dataIterator.hasNext()) {
+                return true;
+            } else {
+                maxResults = 0l;
+                return false;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Map.Entry<RyaStatement, BindingSet> next() throws RyaDAOException {
+        if (!hasNext()) {
+            return null;
+        }
+
+        try {
+            while (true) {
+                if (bsIter != null && bsIter.hasNext()) {
+                    maxResults--;
+                    return new 
RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(statement, 
bsIter.next());
+                }
+
+                if (dataIterator.hasNext()) {
+                    Map.Entry<Key, Value> next = dataIterator.next();
+                    Key key = next.getKey();
+                    statement = 
RyaContext.getInstance().deserializeTriple(tableLayout,
+                            new TripleRow(key.getRowData().toArray(), 
key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(),
+                                    key.getTimestamp(), 
key.getColumnVisibilityData().toArray(), next.getValue().get()));
+                    if (next.getValue() != null) {
+                        statement.setValue(next.getValue().get());
+                    }
+                    Collection<BindingSet> bindingSets = 
rangeMap.containsKey(key);
+                    if (!bindingSets.isEmpty()) {
+                        bsIter = bindingSets.iterator();
+                    }
+                } else {
+                    break;
+                }
+            }
+            return null;
+        } catch (TripleRowResolverException e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void remove() throws RyaDAOException {
+        next();
+    }
+
+    public Long getMaxResults() {
+        return maxResults;
+    }
+
+    public void setMaxResults(Long maxResults) {
+        this.maxResults = maxResults;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java
new file mode 100644
index 0000000..602affe
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/RyaStatementKeyValueIterator.java
@@ -0,0 +1,82 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+
+/**
+ * Date: 7/17/12
+ * Time: 11:48 AM
+ */
+public class RyaStatementKeyValueIterator implements 
CloseableIteration<RyaStatement, RyaDAOException> {
+    private Iterator<Map.Entry<Key, Value>> dataIterator;
+    private TABLE_LAYOUT tableLayout;
+    private Long maxResults = -1L;
+
+    public RyaStatementKeyValueIterator(TABLE_LAYOUT tableLayout, 
Iterator<Map.Entry<Key, Value>> dataIterator) {
+        this.tableLayout = tableLayout;
+        this.dataIterator = dataIterator;
+    }
+
+    @Override
+    public void close() throws RyaDAOException {
+        dataIterator = null;
+    }
+
+    public boolean isClosed() throws RyaDAOException {
+        return dataIterator == null;
+    }
+
+    @Override
+    public boolean hasNext() throws RyaDAOException {
+        if (isClosed()) {
+            throw new RyaDAOException("Closed Iterator");
+        }
+        return maxResults != 0 && dataIterator.hasNext();
+    }
+
+    @Override
+    public RyaStatement next() throws RyaDAOException {
+        if (!hasNext()) {
+            return null;
+        }
+
+        try {
+            Map.Entry<Key, Value> next = dataIterator.next();
+            Key key = next.getKey();
+            RyaStatement statement = 
RyaContext.getInstance().deserializeTriple(tableLayout,
+                    new TripleRow(key.getRowData().toArray(), 
key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(),
+                            key.getTimestamp(), 
key.getColumnVisibilityData().toArray(), next.getValue().get()));
+            if (next.getValue() != null) {
+                statement.setValue(next.getValue().get());
+            }
+            maxResults--;
+            return statement;
+        } catch (TripleRowResolverException e) {
+            throw new RyaDAOException(e);
+        }
+    }
+
+    @Override
+    public void remove() throws RyaDAOException {
+        next();
+    }
+
+    public Long getMaxResults() {
+        return maxResults;
+    }
+
+    public void setMaxResults(Long maxResults) {
+        this.maxResults = maxResults;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java
 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java
new file mode 100644
index 0000000..f9a51fc
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/query/ScannerCloseableIterable.java
@@ -0,0 +1,35 @@
+package mvm.rya.cloudbase.query;
+
+import cloudbase.core.client.Scanner;
+import cloudbase.core.data.Key;
+import cloudbase.core.data.Value;
+import com.google.common.base.Preconditions;
+import mango.collect.AbstractCloseableIterable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Date: 1/30/13
+ * Time: 2:15 PM
+ */
+public class ScannerCloseableIterable extends 
AbstractCloseableIterable<Map.Entry<Key, Value>> {
+
+    protected Scanner scanner;
+
+    public ScannerCloseableIterable(Scanner scanner) {
+        Preconditions.checkNotNull(scanner);
+        this.scanner = scanner;
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        //do nothing
+    }
+
+    @Override
+    protected Iterator<Map.Entry<Key, Value>> retrieveIterator() {
+        return scanner.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java 
b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java
new file mode 100644
index 0000000..2c1ea55
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseResearchMain.java
@@ -0,0 +1,77 @@
+//package mvm.mmrts.cloudbase;
+//
+//import cloudbase.core.CBConstants;
+//import cloudbase.core.client.BatchScanner;
+//import cloudbase.core.client.Connector;
+//import cloudbase.core.client.Scanner;
+//import cloudbase.core.client.ZooKeeperInstance;
+//import cloudbase.core.client.impl.MasterClient;
+//import cloudbase.core.data.Key;
+//import cloudbase.core.data.Range;
+//import cloudbase.core.data.Value;
+//import cloudbase.core.master.thrift.MasterClientService;
+//import cloudbase.core.master.thrift.MasterMonitorInfo;
+//import cloudbase.core.master.thrift.TableInfo;
+//import cloudbase.core.master.thrift.TabletServerStatus;
+//import cloudbase.core.security.thrift.AuthInfo;
+//import mvm.rya.cloudbase.utils.pri.PriorityIterator;
+//import org.apache.hadoop.io.Text;
+//
+//import java.util.Collections;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Map;
+//
+///**
+// * Created by IntelliJ IDEA.
+// * User: RoshanP
+// * Date: 3/28/12
+// * Time: 5:32 PM
+// * To change this template use File | Settings | File Templates.
+// */
+//public class CloudbaseResearchMain {
+//
+//
+//    public static void main(String[] args) {
+//        try {
+//            ZooKeeperInstance instance = new ZooKeeperInstance("stratus", 
"stratus13:2181");
+//
+//            MasterClientService.Iface client = 
MasterClient.getConnection(instance, false);
+//            MasterMonitorInfo mmi = client.getMasterStats(null, new 
AuthInfo("root", "password".getBytes(), "stratus"));
+//
+//            List<TabletServerStatus> tServerInfo = mmi.getTServerInfo();
+//            for (TabletServerStatus tstatus : tServerInfo) {
+//                System.out.println(tstatus.getName());
+//                System.out.println(tstatus.getOsLoad());
+//                Map<String, TableInfo> tableMap = tstatus.getTableMap();
+//                double ingestRate = 0;
+//                double queryRate = 0;
+//                for (Map.Entry<String, TableInfo> entry : 
tableMap.entrySet()) {
+//                    String tableName = entry.getKey();
+//                    TableInfo tableInfo = entry.getValue();
+//                    ingestRate += tableInfo.getIngestRate();
+//                    queryRate += tableInfo.getQueryRate();
+//                }
+//                System.out.println(ingestRate);
+//                System.out.println(queryRate);
+//            }
+//
+//            Connector connector = instance.getConnector("root", 
"password".getBytes());
+////            BatchScanner scanner = connector.createBatchScanner("l_spo", 
CBConstants.NO_AUTHS, 10);
+////            scanner.setRanges(Collections.singleton(new Range(new 
Text("\0"), new Text("\uFFFD"))));
+//            Scanner scanner = connector.createScanner("l_spo", 
CBConstants.NO_AUTHS);
+//            scanner.setScanIterators(20, PriorityIterator.class.getName(), 
"pi");
+//            Iterator<Map.Entry<Key,Value>> iter = scanner.iterator();
+//            int count = 0;
+//            while(iter.hasNext()) {
+//                iter.next();
+//                System.out.println(count++);
+////                if(count == 100) break;
+//            }
+////            scanner.close();
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java 
b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java
new file mode 100644
index 0000000..af3e9ab
--- /dev/null
+++ b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/CloudbaseRyaDAOTest.java
@@ -0,0 +1,588 @@
+package mvm.rya.cloudbase;
+
+import cloudbase.core.client.Connector;
+import cloudbase.core.client.Scanner;
+import cloudbase.core.client.TableNotFoundException;
+import cloudbase.core.client.mock.MockInstance;
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.cloudbase.query.CloudbaseRyaQueryEngine;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * Class CloudbaseRyaDAOTest
+ * Date: Mar 7, 2012
+ * Time: 9:42:28 AM
+ */
+public class CloudbaseRyaDAOTest {
+
+    private CloudbaseRyaDAO dao;
+    private ValueFactory vf = new ValueFactoryImpl();
+    static String litdupsNS = "urn:test:litdups#";
+    private Connector connector;
+    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
+
+    @Before
+    public void setUp() throws Exception {
+        dao = new CloudbaseRyaDAO();
+        connector = new MockInstance().getConnector("", "");
+        dao.setConnector(connector);
+        dao.setConf(conf);
+        dao.init();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        //dao.purge(conf);
+        dao.destroy();
+    }
+
+    @Test
+    public void testAdd() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        dao.add(new RyaStatement(cpu, loadPerc, uri1));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf);
+        int count = 0;
+        while (iter.hasNext()) {
+            assertTrue(uri1.equals(iter.next().getObject()));
+            count++;
+        }
+        iter.close();
+        assertEquals(1, count);
+
+        dao.delete(new RyaStatement(cpu, loadPerc, uri1), conf);
+
+        iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf);
+        count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(0, count);
+    }
+
+    @Test
+    public void testMaxResults() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri1")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri2")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri3")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri4")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        long limit = 3l;
+        queryConf.setLimit(limit);
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next().getObject();
+            count++;
+        }
+        iter.close();
+        assertEquals(limit, count);
+    }
+
+    @Test
+    public void testTTL() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        long current = System.currentTimeMillis();
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri1"), null, null, null, null, current));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri2"), null, null, null, null, current - 1000l));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri3"), null, null, null, null, current - 2000l));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri4"), null, null, null, null, current - 3000l));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri5"), null, null, null, null, current - 4000l));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setTtl(3000l);
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, loadPerc, null), queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next().getObject();
+            count++;
+        }
+        iter.close();
+        assertEquals(3, count);
+
+        queryConf.setStartTime(current - 3000l);
+        iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), 
queryConf);
+        count = 0;
+        while (iter.hasNext()) {
+            iter.next().getObject();
+            count++;
+        }
+        iter.close();
+        assertEquals(2, count);
+    }
+
+    @Test
+    public void testPredRegex() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2");
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri1")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri2")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri3")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri4")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setRegexPredicate(litdupsNS + "loadPerc[2]");
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, null, null), queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(2, count);
+    }
+
+    @Test
+    public void testSubjectRegex() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI cpu2 = new RyaURI(litdupsNS + "cpu2");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2");
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri1")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri2")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri3")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri4")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+        dao.add(new RyaStatement(cpu2, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setRegexSubject(cpu.getData());
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(null, loadPerc, null), queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(3, count);
+    }
+
+    @Test
+    public void testFullTableScan() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI cpu2 = new RyaURI(litdupsNS + "cpu2");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI loadPerc2 = new RyaURI(litdupsNS + "loadPerc2");
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri1")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri2")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri3")));
+        dao.add(new RyaStatement(cpu, loadPerc2, new RyaURI(litdupsNS + 
"uri4")));
+        dao.add(new RyaStatement(cpu, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+        dao.add(new RyaStatement(cpu2, loadPerc, new RyaURI(litdupsNS + 
"uri5")));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setRegexSubject(cpu.getData());
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(null, null, null), queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(7, count); //includes the rts:version
+    }
+
+    @Test
+    public void testAddValue() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        String myval = "myval";
+        dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, null, 
myval.getBytes()));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf);
+        assertTrue(iter.hasNext());
+        assertEquals(myval, new String(iter.next().getValue()));
+        iter.close();
+    }
+
+    @Test
+    public void testDeleteDiffVisibility() throws Exception {
+        RyaURI cpu = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, 
"cpu"));
+        RyaURI loadPerc = 
RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, "loadPerc"));
+        RyaURI uri1 = RdfToRyaConversions.convertURI(vf.createURI(litdupsNS, 
"uri1"));
+        RyaStatement stmt1 = new RyaStatement(cpu, loadPerc, uri1, null, "1", 
"vis1".getBytes());
+        dao.add(stmt1);
+        RyaStatement stmt2 = new RyaStatement(cpu, loadPerc, uri1, null, "2", 
"vis2".getBytes());
+        dao.add(stmt2);
+
+        CloudbaseRdfConfiguration cloneConf = conf.clone();
+        cloneConf.setAuth("vis1,vis2");
+
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, null), cloneConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(2, count);
+
+        dao.delete(stmt1, cloneConf);
+
+        iter = dao.getQueryEngine().query(new RyaStatement(cpu, loadPerc, 
null), cloneConf);
+        count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(1, count);
+    }
+
+    @Test
+    public void testAddCv() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        RyaURI uri2 = new RyaURI(litdupsNS + "uri2");
+        RyaURI uri3 = new RyaURI(litdupsNS + "uri3");
+        byte[] colVisABC = "A|B|C".getBytes();
+        byte[] colVisAB = "A|B".getBytes();
+        byte[] colVisA = "A".getBytes();
+        dao.add(new RyaStatement(cpu, loadPerc, uri1, null, null, colVisABC));
+        dao.add(new RyaStatement(cpu, loadPerc, uri2, null, null, colVisAB));
+        dao.add(new RyaStatement(cpu, loadPerc, uri3, null, null, colVisA));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        //query with no auth
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
queryEngine.query(new RyaStatement(cpu, loadPerc, null), conf);
+        int count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        assertEquals(0, count);
+        iter.close();
+
+        CloudbaseRdfConfiguration queryConf = new CloudbaseRdfConfiguration();
+        queryConf.setAuth("B");
+        iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), 
queryConf);
+        count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(2, count);
+
+        queryConf.setAuth("A");
+        iter = queryEngine.query(new RyaStatement(cpu, loadPerc, null), 
queryConf);
+        count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        iter.close();
+        assertEquals(3, count);
+    }
+
+    @Test
+    public void testGetNamespace() throws Exception {
+        dao.addNamespace("ns", litdupsNS);
+        assertEquals(litdupsNS, dao.getNamespace("ns"));
+        dao.removeNamespace("ns");
+        assertNull(dao.getNamespace("ns"));
+    }
+
+    //TOOD: Add test for set of queries
+    @Test
+    public void testQuery() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        RyaURI uri2 = new RyaURI(litdupsNS + "uri2");
+        RyaURI uri3 = new RyaURI(litdupsNS + "uri3");
+        RyaURI uri4 = new RyaURI(litdupsNS + "uri4");
+        RyaURI uri5 = new RyaURI(litdupsNS + "uri5");
+        RyaURI uri6 = new RyaURI(litdupsNS + "uri6");
+        dao.add(new RyaStatement(cpu, loadPerc, uri1));
+        dao.add(new RyaStatement(cpu, loadPerc, uri2));
+        dao.add(new RyaStatement(cpu, loadPerc, uri3));
+        dao.add(new RyaStatement(cpu, loadPerc, uri4));
+        dao.add(new RyaStatement(cpu, loadPerc, uri5));
+        dao.add(new RyaStatement(cpu, loadPerc, uri6));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList();
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri1), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri2), null));
+        CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, conf);
+        int count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(2, count);
+
+        //now use batchscanner
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setMaxRangesForScanner(2);
+
+        coll = new ArrayList();
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri1), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri2), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri3), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri4), null));
+        iter = queryEngine.queryWithBindingSet(coll, queryConf);
+        assertTrue(iter.hasNext()); //old code had a weird behaviour that 
could not perform hasNext consecutively
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasNext());
+        count = 0;
+        while (iter.hasNext()) {
+            count++;
+            assertTrue(iter.hasNext());
+            iter.next();
+        }
+        iter.close();
+        assertEquals(4, count);
+    }
+
+    @Test
+    public void testQueryCollectionRegex() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        RyaURI uri2 = new RyaURI(litdupsNS + "uri2");
+        RyaURI uri3 = new RyaURI(litdupsNS + "uri3");
+        RyaURI uri4 = new RyaURI(litdupsNS + "uri4");
+        RyaURI uri5 = new RyaURI(litdupsNS + "uri5");
+        RyaURI uri6 = new RyaURI(litdupsNS + "uri6");
+        dao.add(new RyaStatement(cpu, loadPerc, uri1));
+        dao.add(new RyaStatement(cpu, loadPerc, uri2));
+        dao.add(new RyaStatement(cpu, loadPerc, uri3));
+        dao.add(new RyaStatement(cpu, loadPerc, uri4));
+        dao.add(new RyaStatement(cpu, loadPerc, uri5));
+        dao.add(new RyaStatement(cpu, loadPerc, uri6));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList();
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri1), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri2), null));
+        conf.setRegexPredicate(loadPerc.getData());
+        CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, conf);
+        int count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(2, count);
+
+        conf.setRegexPredicate("notLoadPerc");
+        iter = queryEngine.queryWithBindingSet(coll, conf);
+        count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(0, count);
+    }
+
+    @Test
+    public void testQueryCollectionRegexWBatchScanner() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaURI uri1 = new RyaURI(litdupsNS + "uri1");
+        RyaURI uri2 = new RyaURI(litdupsNS + "uri2");
+        RyaURI uri3 = new RyaURI(litdupsNS + "uri3");
+        RyaURI uri4 = new RyaURI(litdupsNS + "uri4");
+        RyaURI uri5 = new RyaURI(litdupsNS + "uri5");
+        RyaURI uri6 = new RyaURI(litdupsNS + "uri6");
+        dao.add(new RyaStatement(cpu, loadPerc, uri1));
+        dao.add(new RyaStatement(cpu, loadPerc, uri2));
+        dao.add(new RyaStatement(cpu, loadPerc, uri3));
+        dao.add(new RyaStatement(cpu, loadPerc, uri4));
+        dao.add(new RyaStatement(cpu, loadPerc, uri5));
+        dao.add(new RyaStatement(cpu, loadPerc, uri6));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+        CloudbaseRdfConfiguration queryConf = new 
CloudbaseRdfConfiguration(conf);
+        queryConf.setMaxRangesForScanner(1);
+
+        Collection<Map.Entry<RyaStatement, BindingSet>> coll = new ArrayList();
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri1), null));
+        coll.add(new RdfCloudTripleStoreUtils.CustomEntry(new 
RyaStatement(null, loadPerc, uri2), null));
+        conf.setRegexPredicate(loadPerc.getData());
+        CloseableIteration<? extends Map.Entry<RyaStatement, BindingSet>, 
RyaDAOException> iter = queryEngine.queryWithBindingSet(coll, queryConf);
+        int count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(2, count);
+
+        queryConf.setRegexPredicate("notLoadPerc");
+        iter = queryEngine.queryWithBindingSet(coll, queryConf);
+        count = 0;
+        while (iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        iter.close();
+        assertEquals(0, count);
+    }
+
+    @Test
+    public void testLiteralTypes() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaType longLit = new RyaType(XMLSchema.LONG, "3");
+
+        dao.add(new RyaStatement(cpu, loadPerc, longLit));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        CloseableIteration<RyaStatement, RyaDAOException> query = 
queryEngine.query(new RyaStatement(cpu, null, null), conf);
+        assertTrue(query.hasNext());
+        RyaStatement next = query.next();
+        assertEquals(new Long(longLit.getData()), new 
Long(next.getObject().getData()));
+        query.close();
+
+        RyaType doubleLit = new RyaType(XMLSchema.DOUBLE, "2.0");
+
+        dao.add(new RyaStatement(cpu, loadPerc, doubleLit));
+
+        query = queryEngine.query(new RyaStatement(cpu, loadPerc, doubleLit), 
conf);
+        assertTrue(query.hasNext());
+        next = query.next();
+        assertEquals(Double.parseDouble(doubleLit.getData()), 
Double.parseDouble(next.getObject().getData()), 0.001);
+        query.close();
+    }
+
+    @Test
+    public void testSameLiteralStringTypes() throws Exception {
+        RyaURI cpu = new RyaURI(litdupsNS + "cpu");
+        RyaURI loadPerc = new RyaURI(litdupsNS + "loadPerc");
+        RyaType longLit = new RyaType(XMLSchema.LONG, "10");
+        RyaType strLit = new RyaType(XMLSchema.STRING, new 
String(RyaContext.getInstance().serializeType(longLit)[0]));
+
+        RyaStatement expected = new RyaStatement(cpu, loadPerc, longLit);
+        dao.add(expected);
+        dao.add(new RyaStatement(cpu, loadPerc, strLit));
+
+        CloudbaseRyaQueryEngine queryEngine = dao.getQueryEngine();
+
+        CloseableIteration<RyaStatement, RyaDAOException> query = 
queryEngine.query(new RyaStatement(cpu, loadPerc, longLit), conf);
+        assertTrue(query.hasNext());
+        RyaStatement next = query.next();
+        assertEquals(new Long(longLit.getData()), new 
Long(next.getObject().getData()));
+        assertEquals(longLit.getDataType(), next.getObject().getDataType());
+        assertFalse(query.hasNext());
+        query.close();
+    }
+
+    @Test
+    @Ignore("Purge does not work with the batch deleter in mock cloudbase 
being null")
+    public void testPurge() throws RyaDAOException, TableNotFoundException {
+        dao.add(newRyaStatement());
+        assertFalse("table should not be empty", areTablesEmpty());
+
+        dao.purge(conf);
+        assertTrue("table should be empty", areTablesEmpty());
+        //assertNotNull(dao.getVersion());
+    }
+
+    @Test
+    @Ignore("Purge does not work with the batch deleter in mock cloudbase 
being null")
+    public void testPurgeDoesNotBreakBatchWriters() throws 
TableNotFoundException, RyaDAOException {
+        dao.purge(conf);
+        assertTrue("table should be empty", areTablesEmpty());
+
+        dao.add(newRyaStatement());
+        assertFalse("table should not be empty", areTablesEmpty());
+    }
+
+    @Test
+    public void testDropAndDestroy() throws RyaDAOException {
+        assertTrue(dao.isInitialized());
+        dao.dropAndDestroy();
+        for (String tableName : dao.getTables()) {
+            assertFalse(tableExists(tableName));
+        }
+        assertFalse(dao.isInitialized());
+    }
+
+    private boolean areTablesEmpty() throws TableNotFoundException {
+        for (String table : dao.getTables()) {
+            if (tableExists(table)) {
+                // TODO: filter out version
+                if (createScanner(table).iterator().hasNext()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private boolean tableExists(String tableName) {
+        return connector.tableOperations().exists(tableName);
+    }
+
+    private Scanner createScanner(String tableName) throws 
TableNotFoundException {
+        return dao.getConnector().createScanner(tableName, 
conf.getAuthorizations());
+    }
+
+    private RyaStatement newRyaStatement() {
+        RyaURI subject = new RyaURI(litdupsNS + randomString());
+        RyaURI predicate = new RyaURI(litdupsNS + randomString());
+        RyaType object = new RyaType(randomString());
+
+        return new RyaStatement(subject, predicate, object);
+    }
+
+    private String randomString() {
+        return UUID.randomUUID().toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java
 
b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java
new file mode 100644
index 0000000..ddff532
--- /dev/null
+++ 
b/dao/cloudbase.rya/src/test/java/mvm/rya/cloudbase/DefineTripleQueryRangeFactoryTest.java
@@ -0,0 +1,242 @@
+//package mvm.rya.cloudbase;
+//
+//import cloudbase.core.data.Range;
+//import junit.framework.TestCase;
+//import mvm.rya.api.domain.RangeValue;
+//import mvm.rya.cloudbase.query.DefineTripleQueryRangeFactory;
+//import org.openrdf.model.URI;
+//import org.openrdf.model.Value;
+//import org.openrdf.model.ValueFactory;
+//import org.openrdf.model.impl.ValueFactoryImpl;
+//
+//import java.util.Map;
+//
+//import static mvm.rya.api.RdfCloudTripleStoreConstants.*;
+//
+///**
+// */
+//public class DefineTripleQueryRangeFactoryTest extends TestCase {
+//
+//    public static final String DELIM_BYTES_STR = new String(DELIM_BYTES);
+//    public static final String URI_MARKER_STR = "\u0007";
+//    public static final String RANGE_ENDKEY_SUFFIX = "\u0000";
+//    DefineTripleQueryRangeFactory factory = new 
DefineTripleQueryRangeFactory();
+//    ValueFactory vf = ValueFactoryImpl.getInstance();
+//    static String litdupsNS = "urn:test:litdups#";
+//
+//    private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration();
+//
+//    public void testSPOCases() throws Exception {
+//        URI cpu = vf.createURI(litdupsNS, "cpu");
+//        URI loadPerc = vf.createURI(litdupsNS, "loadPerc");
+//        URI obj = vf.createURI(litdupsNS, "uri1");
+//
+//        //spo
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(cpu, loadPerc, obj, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        String expected_start = URI_MARKER_STR + cpu.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + loadPerc.stringValue() + DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//
+//        //sp
+//        entry = factory.defineRange(cpu, loadPerc, null, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        expected_start = URI_MARKER_STR + cpu.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + loadPerc.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //s
+//        entry = factory.defineRange(cpu, null, null, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        expected_start = URI_MARKER_STR + cpu.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //all
+//        entry = factory.defineRange(null, null, null, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        assertEquals("",
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(new String(new byte[]{Byte.MAX_VALUE}) + DELIM_STOP + 
RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//    public void testSPOCasesWithRanges() throws Exception {
+//        URI subj_start = vf.createURI(litdupsNS, "subj_start");
+//        URI subj_end = vf.createURI(litdupsNS, "subj_stop");
+//        URI pred_start = vf.createURI(litdupsNS, "pred_start");
+//        URI pred_end = vf.createURI(litdupsNS, "pred_stop");
+//        URI obj_start = vf.createURI(litdupsNS, "obj_start");
+//        URI obj_end = vf.createURI(litdupsNS, "obj_stop");
+//
+//        Value subj = new RangeValue(subj_start, subj_end);
+//        Value pred = new RangeValue(pred_start, pred_end);
+//        Value obj = new RangeValue(obj_start, obj_end);
+//
+//        //spo - o has range
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(subj_start, pred_start, obj, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        String expected_start = URI_MARKER_STR + subj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        String expected_end = URI_MARKER_STR + subj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + pred_start.stringValue() + DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //sp - p has range
+//        entry = factory.defineRange(subj_start, pred, null, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        expected_start = URI_MARKER_STR + subj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + pred_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        expected_end = URI_MARKER_STR + subj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + pred_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //s - s has range
+//        entry = factory.defineRange(subj, null, null, conf);
+//        assertEquals(TABLE_LAYOUT.SPO, entry.getKey());
+//        expected_start = URI_MARKER_STR + subj_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        expected_end = URI_MARKER_STR + subj_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//    public void testPOCases() throws Exception {
+//        URI loadPerc = vf.createURI(litdupsNS, "loadPerc");
+//        URI obj = vf.createURI(litdupsNS, "uri1");
+//
+//        //po
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(null, loadPerc, obj, conf);
+//        assertEquals(TABLE_LAYOUT.PO, entry.getKey());
+//        String expected_start = URI_MARKER_STR + loadPerc.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //p
+//        entry = factory.defineRange(null, loadPerc, null, conf);
+//        assertEquals(TABLE_LAYOUT.PO, entry.getKey());
+//        expected_start = URI_MARKER_STR + loadPerc.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//    public void testPOCasesWithRanges() throws Exception {
+//        URI pred_start = vf.createURI(litdupsNS, "pred_start");
+//        URI pred_end = vf.createURI(litdupsNS, "pred_stop");
+//        URI obj_start = vf.createURI(litdupsNS, "obj_start");
+//        URI obj_end = vf.createURI(litdupsNS, "obj_stop");
+//
+//        Value pred = new RangeValue(pred_start, pred_end);
+//        Value obj = new RangeValue(obj_start, obj_end);
+//
+//        //po
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(null, pred_start, obj, conf);
+//        assertEquals(TABLE_LAYOUT.PO, entry.getKey());
+//        String expected_start = URI_MARKER_STR + pred_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        String expected_end = URI_MARKER_STR + pred_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + obj_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //p
+//        entry = factory.defineRange(null, pred, null, conf);
+//        assertEquals(TABLE_LAYOUT.PO, entry.getKey());
+//        expected_start = URI_MARKER_STR + pred_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        expected_end = URI_MARKER_STR + pred_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//    public void testOSPCases() throws Exception {
+//        URI cpu = vf.createURI(litdupsNS, "cpu");
+//        URI obj = vf.createURI(litdupsNS, "uri1");
+//
+//        //so
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(cpu, null, obj, conf);
+//        assertEquals(TABLE_LAYOUT.OSP, entry.getKey());
+//        String expected_start = URI_MARKER_STR + obj.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + cpu.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //o
+//        entry = factory.defineRange(null, null, obj, conf);
+//        assertEquals(TABLE_LAYOUT.OSP, entry.getKey());
+//        expected_start = URI_MARKER_STR + obj.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        assertEquals(expected_start + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//
+//    public void testOSPCasesWithRanges() throws Exception {
+//        URI subj_start = vf.createURI(litdupsNS, "subj_start");
+//        URI subj_end = vf.createURI(litdupsNS, "subj_stop");
+//        URI obj_start = vf.createURI(litdupsNS, "obj_start");
+//        URI obj_end = vf.createURI(litdupsNS, "obj_stop");
+//
+//        Value subj = new RangeValue(subj_start, subj_end);
+//        Value obj = new RangeValue(obj_start, obj_end);
+//
+//        //so - s should be the range
+//        Map.Entry<TABLE_LAYOUT, Range> entry =
+//                factory.defineRange(subj, null, obj_start, conf);
+//        assertEquals(TABLE_LAYOUT.OSP, entry.getKey());
+//        String expected_start = URI_MARKER_STR + obj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + subj_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        String expected_end = URI_MARKER_STR + obj_start.stringValue() + 
DELIM_BYTES_STR +
+//                URI_MARKER_STR + subj_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//
+//        //o - o is range
+//        entry = factory.defineRange(null, null, obj, conf);
+//        assertEquals(TABLE_LAYOUT.OSP, entry.getKey());
+//        expected_start = URI_MARKER_STR + obj_start.stringValue();
+//        assertEquals(expected_start,
+//                entry.getValue().getStartKey().getRow().toString());
+//        expected_end = URI_MARKER_STR + obj_end.stringValue();
+//        assertEquals(expected_end + DELIM_STOP + RANGE_ENDKEY_SUFFIX,
+//                entry.getValue().getEndKey().getRow().toString());
+//    }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml
new file mode 100644
index 0000000..30b9a4c
--- /dev/null
+++ b/dao/mongodb.rya/pom.xml
@@ -0,0 +1,30 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>mongodb.rya</artifactId>
+       <name>${project.groupId}.${project.artifactId}</name>
+       <properties>
+
+       </properties>
+       <parent>
+               <groupId>mvm.rya</groupId>
+               <artifactId>rya.dao</artifactId>
+               <version>3.2.9</version>
+       </parent>
+       <dependencies>
+               <dependency>
+                       <groupId>mvm.rya</groupId>
+                       <artifactId>rya.api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongo-java-driver</artifactId>
+                       <version>2.13.0-rc0</version>
+               </dependency>
+               <dependency>
+                       <groupId>de.flapdoodle.embed</groupId>
+                       <artifactId>de.flapdoodle.embed.mongo</artifactId>
+                       <version>1.50.0</version>
+               </dependency>
+       </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java
new file mode 100644
index 0000000..c215184
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBQueryEngine.java
@@ -0,0 +1,187 @@
+package mvm.rya.mongodb;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.persist.query.BatchRyaQuery;
+import mvm.rya.api.persist.query.RyaQuery;
+import mvm.rya.api.persist.query.RyaQueryEngine;
+import mvm.rya.mongodb.dao.MongoDBStorageStrategy;
+import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import mvm.rya.mongodb.iter.NonCloseableRyaStatementCursorIterator;
+import mvm.rya.mongodb.iter.RyaStatementBindingSetCursorIterator;
+import mvm.rya.mongodb.iter.RyaStatementCursorIterable;
+import mvm.rya.mongodb.iter.RyaStatementCursorIterator;
+
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+
+/**
+ * Date: 7/17/12
+ * Time: 9:28 AM
+ */
+public class MongoDBQueryEngine implements 
RyaQueryEngine<MongoDBRdfConfiguration>, Closeable {
+
+    private MongoDBRdfConfiguration configuration;
+       private MongoClient mongoClient;
+       private DBCollection coll;
+       private MongoDBStorageStrategy strategy;
+    
+    public MongoDBQueryEngine(MongoDBRdfConfiguration conf) throws 
NumberFormatException, UnknownHostException{
+               mongoClient = new 
MongoClient(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE),
+                               
Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT)));
+               DB db = mongoClient.getDB( 
conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
+               coll = db.getCollection(conf.getTriplesCollectionName());
+               this.strategy = new SimpleMongoDBStorageStrategy();
+    }
+    
+    
+       @Override
+       public void setConf(MongoDBRdfConfiguration conf) {
+               configuration = conf;
+       }
+       
+       @Override
+       public MongoDBRdfConfiguration getConf() {
+               return configuration;
+       }
+       
+       @Override
+       public CloseableIteration<RyaStatement, RyaDAOException> query(
+                       RyaStatement stmt, MongoDBRdfConfiguration conf)
+                       throws RyaDAOException {
+        if (conf == null) {
+            conf = configuration;
+        }
+        Long maxResults = conf.getLimit();
+        Set<DBObject> queries = new HashSet<DBObject>();
+        DBObject query = strategy.getQuery(stmt);
+        queries.add(query);
+        RyaStatementCursorIterator iterator = new 
RyaStatementCursorIterator(coll, queries, strategy);
+        
+        if (maxResults != null) {
+            iterator.setMaxResults(maxResults);
+        }
+        return iterator;
+       }
+       @Override
+       public CloseableIteration<? extends Entry<RyaStatement, BindingSet>, 
RyaDAOException> queryWithBindingSet(
+                       Collection<Entry<RyaStatement, BindingSet>> stmts,
+                       MongoDBRdfConfiguration conf) throws RyaDAOException {
+        if (conf == null) {
+            conf = configuration;
+        }
+        Long maxResults = conf.getLimit();
+        Map<DBObject, BindingSet> rangeMap = new HashMap<DBObject, 
BindingSet>();
+ 
+        //TODO: cannot span multiple tables here
+        try {
+            for (Map.Entry<RyaStatement, BindingSet> stmtbs : stmts) {
+                RyaStatement stmt = stmtbs.getKey();
+                BindingSet bs = stmtbs.getValue();
+                DBObject query = strategy.getQuery(stmt);
+                rangeMap.put(query, bs);
+            }
+            
+            // TODO not sure what to do about regex ranges?
+            RyaStatementBindingSetCursorIterator iterator = new 
RyaStatementBindingSetCursorIterator(coll, rangeMap, strategy);
+            
+            if (maxResults != null) {
+                iterator.setMaxResults(maxResults);
+            }
+            return iterator;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+
+       }
+       @Override
+       public CloseableIteration<RyaStatement, RyaDAOException> batchQuery(
+                       Collection<RyaStatement> stmts, MongoDBRdfConfiguration 
conf)
+                       throws RyaDAOException {
+        if (conf == null) {
+            conf = configuration;
+        }
+        Long maxResults = conf.getLimit();
+        Set<DBObject> queries = new HashSet<DBObject>();
+ 
+        try {
+            for (RyaStatement stmt : stmts) {
+                queries.add( strategy.getQuery(stmt));
+             }
+            
+            // TODO not sure what to do about regex ranges?
+            RyaStatementCursorIterator iterator = new 
RyaStatementCursorIterator(coll, queries, strategy);
+            
+            if (maxResults != null) {
+                iterator.setMaxResults(maxResults);
+            }
+            return iterator;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+
+       }
+       @Override
+       public CloseableIterable<RyaStatement> query(RyaQuery ryaQuery)
+                       throws RyaDAOException {
+               Set<DBObject> queries = new HashSet<DBObject>();
+        
+               try {
+                queries.add( strategy.getQuery(ryaQuery));
+                   
+                   // TODO not sure what to do about regex ranges?
+                // TODO this is gross
+                RyaStatementCursorIterable iterator = new 
RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new 
RyaStatementCursorIterator(coll, queries, strategy)));
+                   
+                   return iterator;
+               } catch (Exception e) {
+                   throw new RyaDAOException(e);
+               }
+       }
+       @Override
+       public CloseableIterable<RyaStatement> query(BatchRyaQuery 
batchRyaQuery)
+                       throws RyaDAOException {
+         try {
+             Set<DBObject> queries = new HashSet<DBObject>();
+            for (RyaStatement statement : batchRyaQuery.getQueries()){
+                queries.add( strategy.getQuery(statement));
+                               
+            }
+            
+            // TODO not sure what to do about regex ranges?
+            // TODO this is gross
+            RyaStatementCursorIterable iterator = new 
RyaStatementCursorIterable(new NonCloseableRyaStatementCursorIterator(new 
RyaStatementCursorIterator(coll, queries, strategy)));
+            
+            return iterator;
+        } catch (Exception e) {
+            throw new RyaDAOException(e);
+        }
+       }
+
+    @Override
+    public void close() throws IOException {
+        if (mongoClient != null){ mongoClient.close(); }
+    }
+       
+       
+       
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java
----------------------------------------------------------------------
diff --git 
a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java 
b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java
new file mode 100644
index 0000000..0054847
--- /dev/null
+++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/MongoDBRdfConfiguration.java
@@ -0,0 +1,101 @@
+package mvm.rya.mongodb;
+
+
+import java.util.List;
+
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.persist.index.RyaSecondaryIndexer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration {
+    public static final String MONGO_INSTANCE = "mongo.db.instance";
+    public static final String MONGO_INSTANCE_PORT = "mongo.db.port";
+    public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist";
+    public static final String MONGO_DB_NAME = "mongo.db.name";
+    public static final String MONGO_COLLECTION_PREFIX = 
"mongo.db.collectionprefix";
+    public static final String MONGO_USER = "mongo.db.user";
+    public static final String  MONGO_USER_PASSWORD = "mongo.db.userpassword";
+    public static final String USE_TEST_MONGO = "mongo.db.test";
+    public static final String CONF_ADDITIONAL_INDEXERS = 
"ac.additional.indexers";
+
+    public MongoDBRdfConfiguration() {
+        super();
+    }
+
+    public MongoDBRdfConfiguration(Configuration other) {
+        super(other);
+    }
+
+    @Override
+    public MongoDBRdfConfiguration clone() {
+        return new MongoDBRdfConfiguration(this);
+    }
+
+    public boolean getUseTestMongo() {
+        return this.getBoolean(USE_TEST_MONGO, false);
+    }
+
+    public void setUseTestMongo(boolean useTestMongo) {
+        this.setBoolean(USE_TEST_MONGO, useTestMongo);
+    }
+
+    public String getTriplesCollectionName() {
+        return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_triples";
+    }
+
+    public String getCollectionName() {
+        return this.get(MONGO_COLLECTION_PREFIX, "rya");
+    }
+
+    public void setCollectionName(String name) {
+        this.set(MONGO_COLLECTION_PREFIX, name);
+    }
+
+    public String getMongoInstance() {
+        return this.get(MONGO_INSTANCE, "localhost");
+    }
+
+    public void setMongoInstance(String name) {
+        this.set(MONGO_INSTANCE, name);
+    }
+
+    public String getMongoPort() {
+        return this.get(MONGO_INSTANCE_PORT, "27017");
+    }
+
+    public void setMongoPort(String name) {
+        this.set(MONGO_INSTANCE_PORT, name);
+    }
+
+    public String getMongoDBName() {
+        return this.get(MONGO_DB_NAME, "rya");
+    }
+
+    public void setMongoDBName(String name) {
+        this.set(MONGO_DB_NAME, name);
+    }
+
+    public String getNameSpacesCollectionName() {
+        return this.get(MONGO_COLLECTION_PREFIX, "rya") + "_ns";
+    }
+    
+    public void setAdditionalIndexers(Class<? extends RyaSecondaryIndexer>... 
indexers) {
+        List<String> strs = Lists.newArrayList();
+        for (Class ai : indexers){
+            strs.add(ai.getName());
+        }
+        
+        setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{}));
+    }
+
+    public List<RyaSecondaryIndexer> getAdditionalIndexers() {
+        return getInstances(CONF_ADDITIONAL_INDEXERS, 
RyaSecondaryIndexer.class);
+    }
+    
+    
+    
+
+}

Reply via email to